文章目录
进程与线程简介
python多线程
threading模块
python使用threading模块定义多线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import threading import time def hello(name): print("Hello %s" % name) time.sleep(1) print("Bye %s" % name) # 创建线程 t1 = threading.Thread(target=hello, args=("Bob",)) # 创建线程 t2 = threading.Thread(target=hello, args=("Tom",)) # 启动线程 t1.start() # 启动线程 t2.start() |
join
主线程等待子线程结束后才会继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import threading import time def hello(n): print("Hello num %s" % n, threading.currentThread()) # currentThread或current_thread获取当前线程, time.sleep(1) print("Bye num %s" % n, threading.current_thread()) start_time = time.time() thread_list = [] for i in range(50): thread = threading.Thread(target=hello, args=(i,)) # 创建线程 thread.start() # 启动线程 thread_list.append(thread) for t in thread_list: t.join() # 等待子线程结束再往下执行 print("Jobs done in %s seconds" % (time.time() - start_time), threading.currentThread()) |
deamon
deamon用于指定线程为守护线程,主程序结束时无需等待守护线程结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import threading import time def hello(n): print("Hello num %s" % n, threading.currentThread()) time.sleep(1) print("Bye num %s" % n, threading.current_thread()) start_time = time.time() thread_list = [] for i in range(50): thread = threading.Thread(target=hello, args=(i,), daemon=True) # 设置为守护线程,主程序不用等子线程结束 # thread.setDaemon(True) thread.start() thread_list.append(thread) print("Jobs done in %s seconds" % (time.time() - start_time), threading.currentThread()) |
全局解释器锁GIL
任一时刻,只允许一个线程处于运行状态,即使是多核CPU系统。
参考资料:http://www.dabeaz.com/python/UnderstandingGIL.pdf
互斥锁
防止多个线程同时修改一份数据造成数据不一致的问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import threading def hello(): thread_lock.acquire() # 申请锁 global n n += 1 thread_lock.release() # 释放锁 n = 0 thread_list = [] thread_lock = threading.Lock() # 创建锁 for i in range(1000): thread = threading.Thread(target=hello) # 创建线程 thread.start() thread_list.append(thread) for thread in thread_list: thread.join() # 等待线程结束 print("Number: ", n) |
递归锁
可以多次进行锁的申请,即在锁状态下还可以申请子锁,避免释放锁时错乱
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import threading import time def func1(): lock.acquire() global n n += 1 print("func1 done") lock.release() def func2(): lock.acquire() global n n += 1 print("func2 done") lock.release() def main(): lock.acquire() func1() func2() lock.release() n = 0 lock = threading.RLock() # 递归锁 # lock = threading.Lock() # 如果是非递归锁则子程序会死锁 t = threading.Thread(target=main) t.start() while threading.active_count() != 1: print(threading.active_count()) # 打印活跃线程数 time.sleep(0.5) else: print("Num: ", n) |
信号量
信号量是一个可以供多个线程同时申请的锁,即带数量的锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import threading import time def func(n): semaphore.acquire() # 申请一个信号量 print("Thread%s running" % n) time.sleep(1) semaphore.release() # 释放一个信号量 semaphore = threading.BoundedSemaphore(3) # 创建信号量 for i in range(10): thread = threading.Thread(target=func, args=(i,)) thread.start() while threading.active_count() != 1: print("Thread number ", threading.active_count()) time.sleep(1) else: print("Jobs done.") |
事件
通过event事件来实现线程间的交互
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import threading import time def light(): """ 模拟红绿灯切换 :return: None """ event.set() # 设置绿灯标志 clock = 0 # 设置计数器 while True: if clock < 5: # 5秒内为绿灯时间 print("\033[1;32mGreen light on...\033[0m") time.sleep(1) clock += 1 elif 10 > clock >= 5: # 5到10秒内清除绿灯标志 event.clear() # 清除绿灯,红灯亮 print("\033[1;31mRed ligth on...\033[0m") time.sleep(1) clock += 1 else: # 超过10秒,重置计数器,重置绿灯 clock = 0 event.set() # 设置红绿标志 def car(name): """ 模拟汽车等红绿灯 :param name: 车名 :return: None """ while True: if event.is_set(): # 绿灯行 print("\033[1;42m%s is running..." % name) time.sleep(1) else: # 红灯停 print("\033[1;41m%s stopped...\033[0m" % name) event.wait() # 等绿灯亮 print("\033[1;43m%s is ready to go..." % name) event = threading.Event() # 创建事件对象 thread_light = threading.Thread(target=light) # 红绿灯线程 car_BMW = threading.Thread(target=car, args=("BMW", )) # 车辆线程 thread_light.start() car_BMW.start() |
队列
Queue的基本方法
put() # 存放元素,如果么有设置timeout参数,在队列满时会阻塞 put_nowait() # 存放元素,如果队列已满则会抛出queue.Full异常 get() # 取出元素,如果没有timeout参数,在队列空时会阻塞 get_nowait() # 取出元素,如果队列已空则会抛出queue.Empty异常 qsize() # 获取队列长度
Queue类
先进先出队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import queue print("Queue".center(50, "=")) q = queue.Queue(maxsize=3) # 先进先出队列, maxsize设置队列最大长度 q.put("A") # 存放元素 q.put("B") q.put("C") try: q.put("D", timeout=1) # 如果不设置超时时间,向一个满的队列插入元素会阻塞 except queue.Full: print("Queue is full") try: q.put_nowait("D") # 不等待满队列空间释放,抛出queue.Full异常 except queue.Full: print("Queue is full") print("Queue size: ", q.qsize()) # 查询队列大小 print(q.get()) # 获取队列中的元素 print(q.get()) print(q.get()) print("Queue size: ", q.qsize()) try: print(q.get(timeout=1)) # 如果不设置超时时间,取空队列会阻塞程序 except queue.Empty: print("Queue is empty") try: print(q.get_nowait()) # 不等待空队列,抛出queue.Empty异常 except queue.Empty: print("Queue is empty") |
LifoQueue类
后进先出队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import queue print("LifoQueue".center(50, "=")) ql = queue.LifoQueue() # 后进先出队列 ql.put("A") ql.put("B") ql.put("C") print(ql.get()) # 获取队列中的元素 print(ql.get()) print(ql.get()) |
PriorityQueue类
带优先级的队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import queue print("PriorityQueue".center(50, "=")) qp = queue.PriorityQueue() # 带优先级队列 qp.put((3, "A")) # 通过插入元组方式指定优先级,数字越小,优先级越高,越快被取出 qp.put((1, "B")) qp.put((2, "C")) qp.put((-1, "D")) print(qp.get()) # 获取队列中的元素 print(qp.get()) print(qp.get()) print(qp.get()) |
paramiko模块
用于远程管理SSH主机
基于用户名密码的远程连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import paramiko # =======================执行命令=========================== # 实例化对象 ssh_client = paramiko.SSHClient() # 定义密钥策略 ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接SSH服务器 ssh_client.connect(hostname="192.168.80.10", port=22, username="root", password="123456") # 执行命令并接收输出 stdin, stdout, stderr = ssh_client.exec_command("free -h && ff") print(stdout.read().decode()) # 标准输出 print("==============================") print(stderr.read().decode()) # 标准错误 # =======================文件传输=========================== # 实例化Transport对象 pipe = paramiko.Transport(("192.168.80.10", 22)) # sock参数可选类型(地址, 端口),地址:端口,已经实例化的socket对象 # 连接服务器 pipe.connect(username="root", password="123456") # 从Transport实例创建SFTP对象 sftp = paramiko.SFTPClient.from_transport(pipe) # 上传文件 sftp.put("test_sftp.txt", "/root/test_sftp.txt") # 下载文件 sftp.get("/root/test_sftp.txt", "test_sftp2.txt") pipe.close() |
基于密钥的远程连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import paramiko # 创建SSH Key对象 ssh_key = paramiko.RSAKey.from_private_key_file("id_rsa") # =======================执行命令=========================== # 实例化对象 ssh_client = paramiko.SSHClient() # 定义密钥策略 ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接SSH服务器 ssh_client.connect(hostname="192.168.80.10", port=22, username="root", pkey=ssh_key) # 执行命令并接收输出 stdin, stdout, stderr = ssh_client.exec_command("free -h && ff") print(stdout.read().decode()) print("==============================") print(stderr.read().decode()) # =======================文件传输=========================== # 实例化Transport对象 pipe = paramiko.Transport(("192.168.80.10", 22)) # sock参数可选类型(地址, 端口),地址:端口,已经实例化的socket对象 # 连接服务器 pipe.connect(username="root", pkey=ssh_key) # 从Transport实例创建SFTP对象 sftp = paramiko.SFTPClient.from_transport(pipe) # 上传文件 sftp.put("test_sftp.txt", "/root/test_sftp.txt") # 下载文件 sftp.get("/root/test_sftp.txt", "test_sftp2.txt") pipe.close() |
原文链接:Python 从入门到放弃 - Lesson 9 paramiko模块与多线程,转载请注明来源!