创建自己的网站,哈尔滨建设工程招标网,wordpress 修改主题页面,wordpress 会员支付宝进程间通信 进程彼此之间互相隔离#xff0c;要实现进程间通信#xff08;IPC#xff09;#xff0c;multiprocessing模块支持两种形式#xff1a;队列和管道#xff0c;这两种方式都是使用消息传递的。 进程队列queue 不同于线程queue#xff0c;进程queue的生成是用mu…进程间通信 进程彼此之间互相隔离要实现进程间通信IPCmultiprocessing模块支持两种形式队列和管道这两种方式都是使用消息传递的。 进程队列queue 不同于线程queue进程queue的生成是用multiprocessing模块生成的。 在生成子进程的时候会将代码拷贝到子进程中执行一遍及子进程拥有和主进程内容一样的不同的名称空间。 示例1 1 import multiprocessing2 def foo():3 q.put([11,hello,True])4 print(q.qsize())5 6 qmultiprocessing.Queue() #全局定义一个q进程队列在产生子进程时候会在子进程里生成可以指定最大数限制队列长度7 if __name__ __main__:8 pmultiprocessing.Process(targetfoo,args()) #因为名称空间不同子进程的主线程创建的q队列主进程get不到所以会阻塞住9 p.start()
10 # foo() #主进程执行一下函数就可以访问到了
11 print(q.get()) 示例2 1 import multiprocessing2 3 def foo():4 q.put([11,hello,True])5 print(q.qsize())6 7 if __name__ __main__:8 q multiprocessing.Queue() #主进程创建一个q进程队列9 pmultiprocessing.Process(targetfoo,args()) #因为名称空间不同子进程的主线程找不到q队列所以会报错提示没有q
10 p.start()
11 print(q.get()) 示例3 1 import multiprocessing2 3 def foo(argument): #定义函数处理进程队列4 argument.put([11,hello,True])5 print(argument.qsize())6 q multiprocessing.Queue() #全局定义一个进程队列7 print(test)8 9 if __name__ __main__:
10 x multiprocessing.Queue() #主进程定义一个进程队列
11 pmultiprocessing.Process(targetfoo,args(x,)) #主进程把值传给子进程就可以处理了
12 p.start()
13 print(x.get())
14 # foo(q)
15 # print(q.get()) 常用方法 q.put方法用以插入数据到队列中put方法还有两个可选参数blocked和timeout。如果blocked为True默认值并且timeout为正值该方法会阻塞timeout指定的时间直到该队列有剩余的空间。如果超时会抛出Queue.Full异常。如果blocked为False但该Queue已满会立即抛出Queue.Full异常。
q.get方法可以从队列读取并且删除一个元素。同样get方法有两个可选参数blocked和timeout。如果blocked为True默认值并且timeout为正值那么在等待时间内没有取到任何元素会抛出Queue.Empty异常。如果blocked为False有两种情况存在如果Queue有一个值可用则立即返回该值否则如果队列为空则立即抛出Queue.Empty异常.
q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)
q.empty():调用此方法时q为空则返回True该结果不可靠比如在返回True的过程中如果队列中又加入了项目。
q.full()调用此方法时q已满则返回True该结果不可靠比如在返回True的过程中如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量结果也不可靠理由同q.empty()和q.full()一样 其他方法 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
q.close():关闭队列防止队列中加入更多数据。调用此方法后台线程将继续写入那些已经入队列但尚未写入的数据但将在此方法完成时马上关闭。如果q被垃圾收集将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如如果某个使用者正在被阻塞在get()操作上关闭生产者中的队列不会导致get()方法返回错误。
q.join_thread()连接队列的后台线程。此方法用于在调用q.close()方法之后等待所有队列项被消耗。默认情况下此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为 另一个创建进程队列的类 http://www.cnblogs.com/zero527/p/7211909.html 管道pipe 管道就是管道就像生活中的管道两头都能进能出 默认管道是全双工的如果创建管道的时候映射成False左边只能用于接收右边只能用于发送类似于单行道 最简单的管道双向通信示例 1 import multiprocessing2 3 def foo(sk):4 sk.send(hello world)5 print(sk.recv())6 7 if __name__ __main__:8 conn1,conn2multiprocessing.Pipe() #开辟两个口都是能进能出括号中如果False即单向通信9 pmultiprocessing.Process(targetfoo,args(conn1,)) #子进程使用sock口调用foo函数
10 p.start()
11 print(conn2.recv()) #主进程使用conn口接收
12 conn2.send(hi son) #主进程使用conn口发送 常用方法 conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收recv方法会一直阻塞。如果连接的另外一端已经关闭那么recv方法会抛出EOFError。
conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象注意send()和recv()方法使用pickle模块对对象进行序列化 其他方法 conn1.close():关闭连接。如果conn1被垃圾回收将自动调用此方法,不用的时候两边都要closeconn1.fileno():返回连接使用的整数文件描述符conn1.poll([timeout]):如果连接上的数据可用返回True。timeout指定等待的最长时限。如果省略此参数方法将立即返回结果。如果将timeout射成None操作将无限期地等待数据到达。conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大值将引发IOError异常并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭再也不存在任何数据将引发EOFError异常。conn.send_bytes(buffer [, offset [, size]])通过连接发送字节数据缓冲区buffer是支持缓冲区接口的任意对象offset是缓冲区中的字节偏移量而size是要发送字节数。结果数据以单条消息的形式发出然后调用c.recv_bytes()函数进行接收 conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息并把它保存在buffer对象中该对象支持可写入的缓冲区接口即bytearray对象或类似的对象。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间将引发BufferTooShort异常。 注意生产者和消费者都没有使用管道的某个端点就应该将其关闭如在生产者中关闭管道的右端在消费者中关闭管道的左端。如果忘记执行这些步骤程序可能再消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生产EOFError异常。因此在生产者中关闭管道不会有任何效果付费消费者中也关闭了相同的管道端点。 1 from multiprocessing import Process,Pipe2 3 import time,os4 def consumer(p,name):5 left,rightp6 left.close()7 while True:8 try:9 baoziright.recv()
10 print(%s 收到包子:%s %(name,baozi))
11 except EOFError:
12 right.close()
13 break
14 def producer(seq,p):
15 left,rightp
16 right.close()
17 for i in seq:
18 left.send(i)
19 # time.sleep(1)
20 else:
21 left.close()
22 if __name__ __main__:
23 left,rightPipe()
24 c1Process(targetconsumer,args((left,right),c1))
25 c1.start()
26 seq(i for i in range(10))
27 producer(seq,(left,right))
28 right.close()
29 left.close()
30 c1.join()
31 print(主进程) 1 from multiprocessing import Process,Pipe2 3 import time,os4 def consumer(p,name):5 left,rightp6 left.close()7 while True:8 try:9 baoziright.recv()
10 print(%s 收到包子:%s %(name,baozi))
11 except EOFError:
12 right.close()
13 break
14 def producer(seq,p):
15 left,rightp
16 right.close()
17 for i in seq:
18 left.send(i)
19 # time.sleep(1)
20 else:
21 left.close()
22 if __name__ __main__:
23 left,rightPipe()
24 c1Process(targetconsumer,args((left,right),c1))
25 c1.start()
26 seq(i for i in range(10))
27 producer(seq,(left,right))
28 right.close()
29 left.close()
30 c1.join()
31 print(主进程) 共享数据manage Queue和pipe只是实现了数据交互并没实现数据共享即一个进程去更改另一个进程的数据。 注进程间通信应该尽量避免使用共享数据的方式 共享数据列表 1 from multiprocessing import Manager,Process2 def foo(l,i):3 l.append(i**i)4 if __name__ __main__:5 manManager()6 mlman.list([11,22,33])7 l[]8 for i in range(5):9 pProcess(targetfoo,args(ml,i))
10 p.start()
11 l.append(p)
12 for i in l: #必须要join不然会执行报错处理一个数据必须要一个个来不能同时处理一个数据
13 i.join()
14 print(ml) 共享数据字典 1 from multiprocessing import Manager,Process2 def foo(d,k,v):3 d[k]v4 if __name__ __main__:5 manManager()6 mdman.dict({name:bob})7 l[]8 for i in range(5):9 pProcess(targetfoo,args(md,i,a))
10 p.start()
11 l.append(p)
12 for i in l: #必须要join不然会执行报错处理一个数据必须要一个个来不能同时处理一个数据
13 i.join()
14 print(md) 进程池 开多进程是为了并发通常有几个cpu核心就开几个进程但是进程开多了会影响效率主要体现在切换的开销所以引入进程池限制进程的数量。 进程池内部维护一个进程序列当使用时则去进程池中获取一个进程如果进程池序列中没有可供使用的进进程那么程序就会等待直到进程池中有可用进程为止。 示例 1 from multiprocessing import Pool2 import time3 4 def foo(n):5 print(n)6 time.sleep(1)7 8 if __name__ __main__:9 pool_objPool(5) #
10 for i in range(47):
11 # pool_obj.apply_async(funcfoo,args(i,))
12 pool_obj.apply(funcfoo,args(i,)) #子进程的生成是靠进程池对象维护的
13 # apply同步子进程一个个执行
14 # apply_async异步多个子进程一起执行
15 pool_obj.close()
16 pool_obj.join()
17 print(ending) 常用方法 pool_obj.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数必须从不同线程调用p.apply()函数或者使用p.apply_async()
pool_obj.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例callback是可调用对象接收输入参数。当func的结果变为可用时将理解传递给callback。callback禁止执行任何阻塞操作否则将接收其他异步操作中的结果。
pool_obj.close():关闭进程池防止进一步操作。如果所有操作持续挂起它们将在工作进程终止前完成
pool_obj.jion():等待所有工作进程退出。此方法只能在close或teminate()之后调用 其他方法 方法apply_async()和map_async的返回值是AsyncResul的实例obj。实例具有以下方法
obj.get():返回结果如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达将引发一场。如果远程操作中引发了异常它将在调用此方法时再次被引发。
obj.ready():如果调用完成返回True
obj.successful():如果调用完成且没有引发异常返回True如果在结果就绪之前调用此方法引发异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate()立即终止所有工作进程同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收将自动调用此函数 协程 协程是单线程下的并发又称微线程纤程。英文名Coroutine。 一句话说明什么是线程协程是一种用户态的轻量级线程即协程是由用户程序自己控制调度的。 协程能保留上一次调用时的状态即所有局部状态的一个特定组合每次过程重入时就相当于进入上一次调用的状态换种说法进入上一次离开时所处逻辑流的位置。 注意 1. python的线程属于内核级别的即由操作系统控制调度如单线程一旦遇到io就被迫交出cpu执行权限切换其他线程运行 2. 单线程内开启协程一旦遇到io从应用程序级别而非操作系统控制切换 协程优点 1. 协程的切换开销更小属于程序级别的切换操作系统完全感知不到因而更加轻量级 2. 单线程内就可以实现并发的效果最大限度地利用cpu 协程缺点 1.协程的本质是单线程下无法利用多核可以是一个程序开启多个进程每个进程内开启多个线程每个线程内开启协程 2.协程指的是单个线程因而一旦协程出现阻塞将会阻塞整个线程 yield实现协程并发 1 import time2 def consumer():3 r4 while True:5 nyield r6 if not n:7 return8 print([CONSUMER] ←← Consuming %s... % n)9 time.sleep(1)
10 r200 Ok
11
12 def produce(c):
13 next(c) #1.启动生成器
14 n0
15 while n 5:
16 nn1
17 print([PRODUCER] →→ Producing %s... % n)
18 crc.send(n)
19 #2.将n传入到consumer的对象yield接收到传入值开始执行代码遇到yield执行代码返回r的值
20 print([PRODUCER] Consumer return: %s % cr)
21 #3.produce没有值了关闭整个过程
22 c.close()
23
24 if __name__ __main__:
25 cconsumer() #生成生成器对象
26 produce(c) #执行调用 greenlet框架实现协程封装yield的基础库 greenlet机制的主要思想是生成器函数或者协程函数中的yield语句挂起函数的执行直到稍后使用next()或send()操作进行恢复为止。可以使用一个调度器循环在一组生成器函数之间协作多个任务。greentlet是python中实现我们所谓的Coroutine(协程)的一个基础库。 示例1 1 from greenlet import greenlet2 def foo():3 print(ok1)4 g2.switch() #阻断5 print(ok3)6 g2.switch()7 def bar():8 print(ok2)9 g1.switch()
10 print(ok4)
11
12 g1greenlet(foo) #生成foo函数的greenlet对象
13 g2greenlet(bar) #生成bar函数的greenlet对象
14 g1.switch() #1、执行g1对象打印ok1
15 #2、遇到g2.switch()转到g2执行打印ok2
16 #3、遇到g1.switch(),转到g1的阻断处继续执行打印ok3
17 #4、遇到g2.switch()转到g2执行打印ok4 示例2 1 def eat(name):2 print(%s eat food 1 %name)3 gr2.switch(bob)4 print(%s eat food 2 %name)5 gr2.switch()6 def play_phone(name):7 print(%s play 1 %name)8 gr1.switch()9 print(%s play 2 %name)
10
11 gr1greenlet(eat)
12 gr2greenlet(play_phone)
13 gr1.switch(namenatasha)#可以在第一次switch时传入参数以后都不需要 这种方法不会节省时间因为不是io操作而greenlet遇到io操作不会跳转仍然要io阻断 基于greenlet框架的高级库gevent模块 gevent是第三方库通过greenlet实现协程其基本思想是 当一个greenlet遇到IO操作时比如访问网络就自动切换到其他的greenlet等到IO操作完成再在适当的时候切换回来继续执行。由于IO操作非常耗时经常使程序处于等待状态有了gevent为我们自动切换协程就保证总有greenlet在运行而不是等待IO。 由于切换是在IO操作时自动完成所以gevent需要修改Python自带的一些标准库这一过程在启动时通过monkey patch完成 简单示例 1 import gevent2 def foo():3 print(ok1)4 gevent.sleep(4) #模拟io操作5 print(ok3)6 def bar():7 print(ok2)8 gevent.sleep(2)9 print(ok4)
10
11 g1gevent.spawn(foo)
12 g2gevent.spawn(bar)
13 gevent.joinall([g1,g2]) #全部阻塞或者单独一个个join spawn括号内第一个参数是函数名如foo后面可以有多个参数可以是位置实参或关键字实参都是传给函数foo的 注意 gevent.sleep(4)模拟的是gevent可以识别的io阻塞, 而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用下面一行代码,打补丁,就可以识别了 1 #补丁
2 from gevent import monkey
3 monkey.patch_all() 必须放到被打补丁者的前面如timesocket模块之前 或者我们干脆记忆成要用gevent需要将补丁放到文件的开头 爬虫示例 1 from gevent import monkey;monkey.patch_all()2 import gevent3 import requests4 import time5 6 def get_page(url):7 print(GET: %s %url)8 responserequests.get(url)9 if response.status_code 200:
10 print(%d bytes received from %s %(len(response.text),url))
11
12
13 start_timetime.time()
14 gevent.joinall([
15 gevent.spawn(get_page,https://www.python.org/),
16 gevent.spawn(get_page,https://www.yahoo.com/),
17 gevent.spawn(get_page,https://github.com/),
18 ])
19 stop_timetime.time()
20 print(run time is %s %(stop_time-start_time)) gevent是一个基于协程coroutine的Python网络函数库通过使用greenlet提供了一个在libev事件循环顶部的高级别并发API。主要特性有以下几点1 基于libev的快速事件循环Linux上面的是epoll机制2 基于greenlet的轻量级执行单元3 API复用了Python标准库里的内容4 支持SSL的协作式sockets5 可通过线程池或c-ares实现DNS查询6 通过monkey patching功能来使得第三方模块变成协作式gevent.spawn()方法spawn一些jobs然后通过gevent.joinall将jobs加入到微线程执行队列中等待其完成设置超时为2秒。执行后的结果通过检查gevent.Greenlet.value值来收集。二
1、关于Linux的epoll机制epoll是Linux内核为处理大批量文件描述符而作了改进的poll是Linux下多路复用IO接口select/poll的
增强版本它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。epoll的优点1支持一个进程打开大数目的socket描述符。select的一个进程所打开的FD由FD_SETSIZE的设置来限定而epoll没有这个限制它所支持的FD上限是
最大可打开文件的数目远大于2048。2IO效率不随FD数目增加而线性下降由于epoll只会对“活跃”的socket进行操作于是只有”活跃”的socket才会主动去调用 callback函数其他
idle状态的socket则不会。3使用mmap加速内核与用户空间的消息传递。epoll是通过内核于用户空间mmap同一块内存实现的。4内核微调。2、libev机制提供了指定文件描述符事件发生时调用回调函数的机制。libev是一个事件循环器向libev注册感兴趣的事件比如socket可读事件libev会对所注册的事件
的源进行管理并在事件发生时触发相应的程序。三‘’‘import geventfrom gevent import socketurls [‘www.google.com.hk’,’www.example.com’, ‘www.python.org’ ]jobs [gevent.spawn(socket.gethostbyname, url) for url in urls]gevent.joinall(jobs, timeout2)[job.value for job in jobs][‘74.125.128.199’, ‘208.77.188.166’, ‘82.94.164.162’]’‘’gevent.spawn()方法spawn一些jobs然后通过gevent.joinall将jobs加入到微线程执行队列中等待其完成设置超时为2秒。执行后的结果通过检查gevent.Greenlet.value值来收集。gevent.socket.gethostbyname()函数与标准的socket.gethotbyname()有相同的接口但它不会阻塞整个解释器因此会使得其他的greenlets跟随着无阻的请求而执行。Monket patchingPython的运行环境允许我们在运行时修改大部分的对象包括模块、类甚至函数。虽然这样做会产生“隐式的副作用”而且出现问题很难调试但在需要修改Python本身的基础行为时Monkey patching就派上用场了。Monkey patching能够使得gevent修改标准库里面大部分的阻塞式系统调用包括socket,ssl,threading和select等模块而变成协作式运行。from gevent import monkey ;monkey . patch_socket ()import urllib2通过monkey.patch_socket()方法urllib2模块可以使用在多微线程环境达到与gevent共同工作的目的。事件循环不像其他网络库gevent和eventlet类似 在一个greenlet中隐式开始事件循环。没有必须调用run()或dispatch()的反应器(reactor)在twisted中是有 reactor的。当gevent的API函数想阻塞时它获得Hub实例(执行时间循环的greenlet),并切换过去。如果没有集线器实例则会动态 创建。libev提供的事件循环默认使用系统最快轮询机制设置LIBEV_FLAGS环境变量可指定轮询机制。LIBEV_FLAGS1为select LIBEV_FLAGS 2为poll LIBEV_FLAGS 4为epoll,LIBEV_FLAGS 8为kqueue。Libev的API位于gevent.core下。注意libev API的回调在Hub的greenlet运行因此使用同步greenlet的API。可以使用spawn()和Event.set()等异步API。 转载于:https://www.cnblogs.com/chenqizhou/p/7359689.html