多进程
multiprocessing模块
与threading模块用法相似,用于创建多进程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import multiprocessing import threading import time import os def thread(): print("Thread: ", os.getpid(), " PPID: ", os.getppid()) # 打印PID和PPID(父进程PID) def func(): print("Process: ", threading.get_ident()) # 获取当前线程程ID time.sleep(2) sub = threading.Thread(target=thread) sub.start() if __name__ == "__main__": for i in range(5): process = multiprocessing.Process(target=func) # 定义新进程 process.start() # 启动进程 |
进程间通信
Queue
用于进程间传递数据的队列,与线程队列用法类似,通过put()和get()来存取队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob from multiprocessing import Process, Queue def func1(queue): queue.put("World") # 往队列存放数据 def func2(queue): while queue.qsize() > 0: print("Get item from queue: ", queue.get()) # 从队列取数据 if __name__ == "__main__": q = Queue() q.put("Hello") p1 = Process(target=func1, args=(q, )) p2 = Process(target=func2, args=(q, )) p1.start() p2.start() |
Pipe
Pipe对象是一个可以实现进程间通信的管道,包含两个收发节点,通过send()和recv()进行发送和接收,类似于socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob from multiprocessing import Process, Pipe def func(con): print("Pipe in sub: ", con.recv()) # 从管道一端接收信息 con.send("World") if __name__ == "__main__": conn1, conn2 = Pipe() # 创建管道,返回管道的两个节点 p = Process(target=func, args=(conn2, )) # 将其中一个节点传递给子进程 p.start() conn1.send("Hello") # 从管道一端发送信息 print("Pipe in main: ",conn1.recv()) |
Manager
Manager对象可以管理支持进程间共享的各种数据类型,包括: 列表, 字典, 变量, 锁, 递归锁, 信号量, 事件, 队列等等(list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array.)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob from multiprocessing import Process, Manager def func(d, l, n): d[n] = n + 1 l.append(n + 1) if __name__ == "__main__": manager = Manager() # 创建Manager dic = manager.dict() # 使用Manager创建一个可进程间共享的字典 lst = manager.list() # 使用Manager创建一个可进程间共享的列表 process_list = [] for i in range(5): process = Process(target=func, args=(dic, lst, i)) # 创建进程,传递Manager创建的字典和列表 process.start() process_list.append(process) for p in process_list: p.join() print(dic) print(lst) |
进程锁
不同进程对标准输出的占用可能引起混乱,通过锁机制避免同时输出
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob from multiprocessing import Process, Lock, freeze_support def func(lock, num): lock.acquire() # 申请锁 print(num) lock.release() # 释放锁 if __name__ == "__main__": plock = Lock() # 创建进程锁 for i in range(10): process = Process(target=func, args=(plock, i)) # 创建进程,传递锁 process.start() |
进程池
限制同时运行的进程数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob from multiprocessing import Lock, Pool import time import os def func1(num): """ 提供给新进程调用的方法 :param num: 传参 :return: 返回子进程PID和执行时间 """ start_time = time.time() pid = os.getpid() time.sleep(1) print("[%s] output %s" % (pid, num)) return [pid, time.time() - start_time] def call_back1(args): """ 回调函数 :param args: 进程执行的返回值作为回调函数的传参 :return: None """ print("进程[%s]用时[%s]秒" % (args[0], args[1])) if __name__ == "__main__": pool = Pool(5) # 创建进程池,指定进程池大小(即单次能执行的进程数) print("apply".center(30, "=")) for i in range(2): pool.apply(func=func1, args=(i,)) # 串行 print("apply_async".center(30, "=")) for i in range(10, 20): pool.apply_async(func=func1, args=(i,), callback=call_back1) # 异步并行,callback回调函数 pool.close() pool.join() # 进程池必须先close再join |
协程
协程是一种用户态的轻量级线程
yield协程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import time # 定义消费者生成器 def consumer(name): print("【%s】准备吃包子啦" % name) while True: baozi = yield print("包子【%s】号上桌了,被【%s】吃了一口" % (baozi, name)) def producer(): # 生成两个消费者并初始化 c1 = consumer("张三") c2 = consumer("李四") c1.__next__() c2.__next__() # 循环制作10个包子 for i in range(1,10): print("包子【%s】号制作完成" % i) time.sleep(1) # send方法将包子传递给yield作为值 c1.send(i) c2.send(i) producer() |
greenlet协程
greenlet是一个用C实现的协程模块,相比与python自带的yield,它可以使你在任意函数之间随意切换,而不需把这个函数先声明为generator
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob from greenlet import greenlet def func1(): print("From func1") gr2.switch() print("From func1 again") gr2.switch() def func2(): print("From func2") gr1.switch() print("From func2 again") gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch() |
gevent
Gevent 是一个第三方库,可以轻松通过gevent实现并发同步或异步编程,在gevent中用到的主要模式是Greenlet, 它是以C扩展模块形式接入Python的轻量级协程。 Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob from greenlet import greenlet def func1(): print("From func1") gr2.switch() print("From func1 again") gr2.switch() def func2(): print("From func2") gr1.switch() print("From func2 again") gr1 = greenlet(func1) gr2 = greenlet(func2) gr1.switch() |
gevent自动IO切换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import gevent def func1(): print("func1 first") gevent.sleep(2) # 模拟IO延迟 print("func1 second") gevent.sleep(1) print("func1 third") def func2(): print("func2 first") gevent.sleep(1) print("func2 second") gevent.sleep(1) print("func2 third") def func3(): print("func3 first") gevent.sleep(0) print("func3 second") gevent.sleep(1) print("func3 third") # 等待所有函数执行结束 gevent.joinall([ gevent.spawn(func1), # 将函数加入greenlet调度 gevent.spawn(func2), gevent.spawn(func3), ]) |
利用gevent实现并发下载
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob from gevent import monkey # 引入monkey补丁,是标准库对gevent友好 import gevent from urllib import request import time monkey.patch_all() def get_url(url): print("开始从[%s]下载资源" % url) data = request.urlopen(url).read() print("下载资源[%s]完成,总下载大小[%s]" % (url, len(data))) # 顺序执行 start_time = time.time() get_url("https://www.163.com") get_url("https://www.github.com") get_url("https://docs.ansible.com") print("顺序执行所用时间[%s]" % (time.time() - start_time)) # gevent协程 start_time = time.time() gevent.joinall([ gevent.spawn(get_url, "https://www.163.com"), gevent.spawn(get_url, "https://www.github.com"), gevent.spawn(get_url, "https://docs.ansible.com"), ]) print("协程执行所用时间[%s]" % (time.time() - start_time)) |
gevent版socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import socket import gevent from gevent import monkey monkey.patch_all() # # 引入monkey补丁,使标准库对gevent友好 def handler(conn, addr): """ 处理客户端连接 :param conn: 客户端连接对象 :param addr: 客户端地址 :return: None """ print("Accept connection from ", addr) try: while True: data = conn.recv(1024).decode() print(data) conn.send("OK".encode()) except Exception as e: print("连接断开 ", e) finally: conn.close() def server(port): """ 在指定端口建立socket监听,并使用gevent协程非阻塞地处理客户端连接 :param port: 服务器监听端口 :return: None """ s_server = socket.socket() # 创建socket对象 s_server.bind(("0.0.0.0", port)) s_server.listen(100) # 监听 print("Listening on port %s ..." % port) while True: conn, addr = s_server.accept() gevent.spawn(handler, conn, addr) # 利用gevent创建客户端连接处理协程 server(8888) |
事件驱动与异步IO
事件驱动模型
目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:
1. 有一个事件(消息)队列;
2. 鼠标按下时,往这个队列中增加一个点击事件(消息);
3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数。
异步IO和IO多路复用
常用的IO类型:
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)Select\Poll\Epoll(IO多路复用)的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
asynchronous IO(异步IO):用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
IO多路复用实例:select socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import socket import select import queue def data_handler(source_data): """ 处理接收数据 :param source_data: 源数据 :return: 处理后的数据 """ return str(source_data).upper() server = socket.socket() server.bind(("0.0.0.0", 8888)) server.listen(1000) server.setblocking(False) # 设为非阻塞的socket print("服务器开始监听...") inputs = [] # 接收监测 outputs = [] # 发送监测 send_data = {} # 待发数据 inputs.append(server) # 将socket服务加入监测列表 while inputs: # select参数为别为,读监测列表(接收),写监测列表(发送),异常列表 read_list, write_list, exception_list = select.select(inputs, outputs, inputs) # 处理接收列表 for item in read_list: if item is server: # 服务器自身连接 conn, addr = server.accept() print("客户端连接已建立", addr) inputs.append(conn) # 将已建立的连接加入监测列表,等待客户端发数据时再接收 else: # 客户端连接 try: data = item.recv(1024).decode() if not data: print("客户端连接已断开", item.getpeername()) continue send_data[item] = queue.Queue() # 创建本连接待发数据队列 send_data[item].put(data) # 将待发送数据放入队列 outputs.append(item) # 将当前连接放入待发送列表 except ConnectionResetError: print("客户端连接已断开", item.getpeername()) inputs.remove(item) # 将断开的连接从监测列表删除 # 处理待发送列表 for item in write_list: try: data = send_data[item].get() # 从队列提取数据 r_data = data_handler(data) # 处理数据 item.sendall(r_data.encode()) # 发送数据给客户端 except queue.Empty(): pass except ConnectionResetError: print("客户端连接已断开", item.getpeername()) del send_data[item] finally: outputs.remove(item) # 处理完成后从待发列表删除当前连接 # 处理异常列表 for item in exception_list: inputs.remove(item) if item in outputs: outputs.remove(item) |
IO多路复用实例:selectors socket
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import socket import selectors def data_handler(source_data): """ 处理接收数据 :param source_data: 源数据 :return: 处理后的数据 """ return str(source_data).upper() def connect(_server): """ 处理客户端连接 :param _server: 接收连接的socket服务器对象 :return: None """ conn, addr = _server.accept() print("客户端连接已建立", addr) conn.setblocking(False) selector.register(conn, selectors.EVENT_READ, read) # 将已建立的连接加入到selector事件监听列表,触发事件时调用read def read(_conn): """ 接收并处理客户端数据 :param _conn: 已与客户建立的连接对象 :return: None """ try: data = _conn.recv(1024).decode() if data: r_data = data_handler(data) # 处理数据 _conn.sendall(r_data.encode()) # 发送数据 else: print("客户端连接已断开", _conn.getpeername()) selector.unregister(_conn) # 取消当前连接的注册 except ConnectionResetError: print("客户端连接已断开", _conn.getpeername()) selector.unregister(_conn) # 取消当前连接的注册 selector = selectors.DefaultSelector() # 初始化一个selectors对象 server = socket.socket() server.bind(("0.0.0.0", 8888)) server.listen(1000) server.setblocking(False) print("服务器开始监听...") selector.register(server, selectors.EVENT_READ, connect) # 将socket对象注册到selectors,当触发事件时调用connect while True: events = selector.select() # 开始监听事件 print(events) for key, mask in events: # callback = key.data # data为注册事件监听时传入的函数 callback(key.fileobj) # fileobj为传入的需要监听的对象,这里为socket服务器对象 |
原文链接:Python 从入门到放弃 - Lesson 10 多进程、协程、异步IO,转载请注明来源!