RabbitMQ
RabbitMQ生产消费者
最简单的通信队列: P为生产者,C为消费者
消费者可以是多个,这时队列会采用轮询的方式分发消息给消费者
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.queue_declare(queue="hello", durable=True) # durable参数声明队列为持久化队列 data = "" while not data == "bye": data = input("Something you want to publish: ") channel.basic_publish(exchange="", routing_key="hello", body=data, properties=pika.BasicProperties(delivery_mode=2)) # delivery_mode指定消息持久化 print("[INFO] - Message sent.") connection.close() |
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika import time """ 如果RabbitMQ服务器需要验证,需要指定用户密码 credential = pika.PlainCredentials("user", "password") connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10", credentials=credential)) """ connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.queue_declare(queue="hello", durable=True) # durable参数声明队列为持久化队列 def callback(ch, method, properties, body): print("Received message:") print("""ch: %s method: %s properties: %s body: %s""" % (ch, method, properties, body)) time.sleep(5) # 模拟处理消息所需时间 ch.basic_ack(delivery_tag=method.delivery_tag) # 如果没有自动确认则需手工调用确认方法 print("Jod done") """ basic_consume参数: queue: 要从哪个队列里接收消息 on_message_callback: 收到消息后的执行的回调函数,会传入四个参数channel,method,properties,body auto_ack: 是否自定确认消息,如果不确认,消息会一直存在队列里不删除 """ channel.basic_qos(prefetch_count=5) # 指定缓存窗口大小,只有当缓存窗口未满时才会分发消息给消费者 channel.basic_consume(queue="hello", on_message_callback=callback, auto_ack=False) print("Waiting for message.") channel.start_consuming() connection.close() |
队列和消息持久化
默认队列和消息在RabbitMQ服务器重启后会失效,如果如需是队列一直有效,并且未消费的消息得到保留,需要进行声明
队列持久化
1 2 3 |
connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.queue_declare(queue="hello", durable=True) # durable参数声明队列为持久化队列 |
消息持久化
1 2 |
channel.basic_publish(exchange="", routing_key="hello", body=data, properties=pika.BasicProperties(delivery_mode=2)) # delivery_mode指定消息持久化 |
公平分发
默认模式下,队列会把消息轮流发送给每个消费者,不管消费者当前是否繁忙,如果需要更加公平有效的分配,需要指定消费者缓存窗口大小
1 |
channel.basic_qos(prefetch_count=5) # 指定缓存窗口大小,只有当缓存窗口未满时才会分发消息给消费者 |
订阅发布
默认的分发模式一条消息只能发送给一个消费者,如果需要实现广播的效果,需要用到exchange,excahnge会负责把满足条件的消息分发到绑定到该exchange的所有队列中,消费者再从队列中消费
一般分为以下三种分发(绑定)模式:
fanout
队列直接与exchange绑定来获取消息
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) # 创建到RabbitMQ服务器的连接 channel = connection.channel() # 创建通道 channel.exchange_declare(exchange="msg", exchange_type="fanout") # 声明exchange名称和类型(fanout为广播模式) message = "" while message != "bye": message = input("Message to publish:>>") channel.basic_publish(exchange="msg", routing_key="", body=message) # 发送消息到exchange进行分发 print("Message sent") connection.close() |
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.exchange_declare(exchange="msg", exchange_type="fanout") # 声明一个exchange并指定类型为fonout result = channel.queue_declare(queue="", exclusive=True) # 通过指定空白的队列名称来生成一个随机的queue,exclusive声明为唯一 queue_name = result.method.queue # 获取队列名称 print("Queue name:", queue_name) channel.queue_bind(exchange="msg", queue=queue_name) # 将queue绑定到exchange def callback(ch, method, properties, body): print("Received message:") print("""ch: %s method: %s properties: %s body: %s""" % (ch, method, properties, body)) ch.basic_ack(delivery_tag=method.delivery_tag) # 如果没有自动确认则需手工调用确认方法 print("Jod done") channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False) print("Waiting for message.") channel.start_consuming() connection.close() |
direct
direct模式会根据绑定到exchange时所声明的关键字(routing_key)来分发消息到指定队列
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type="direct") # 声明一个exchange,类型为direct try: while True: level = input("Level of log: >>").strip() # 消息级别,用作routing_key message = input("Message: >>").strip() channel.basic_publish(exchange="logs", routing_key=level, body=message) print("Message sent") finally: connection.close() |
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.exchange_declare(exchange="logs", exchange_type="direct") # 声明一个exchange,类型为direct result = channel.queue_declare(queue="", exclusive=True) # 通过指定空白的队列名称来生成一个随机的queue,exclusive声明为唯一 queue_name = result.method.queue # 获取队列名称 print("Queue name: ", queue_name) levels = input("Log levels for subscriber: >>").strip().split() # 根据输入的监听级别,循环绑定routing_key到exchange中 for level in levels: channel.queue_bind(queue=queue_name, exchange="logs", routing_key=level) def callback(ch, method, properties, body): print("Received message:") print("""ch: %s method: %s properties: %s body: %s""" % (ch, method, properties, body)) ch.basic_ack(delivery_tag=method.delivery_tag) # 如果没有自动确认则需手工调用确认方法,method.delivery_tag为消息编号 channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False) print("Waiting for message.") channel.start_consuming() connection.close() |
topic
topic模式时更为细致的过滤模式,例如unix系统中的日志来源一般为source.level格式,如sys.info, kernel.warn,这时可以使用topic模式来实现更灵活的配置
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.exchange_declare(exchange="info", exchange_type="topic") # 声明一个exchange,类型为topic try: while True: key = input("Key of the message: >>").strip() message = input("Something to send: >>").strip() channel.basic_publish(exchange="info", routing_key=key, body=message) print("Message sent") finally: connection.close() |
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.exchange_declare(exchange="info", exchange_type="topic") # 声明一个exchange,类型为topic result = channel.queue_declare(queue="", exclusive=True) # 通过指定空白的队列名称来生成一个随机的queue,exclusive声明为唯一 queue_name = result.method.queue # 获取队列名称 print("Queue name: ", queue_name) keys = input("Your focus: >>").strip().split() # 输入需要监听的内容,如*.info,、ssh.*、*.utils.*(单独一个#代表所有) # 根据输入的监听内容,循环绑定routing_key到exchange中 for key in keys: channel.queue_bind(queue=queue_name, exchange="info", routing_key=key) def callback(ch, method, properties, body): print("Received message:") print("""ch: %s method: %s properties: %s body: %s""" % (ch, method, properties, body)) ch.basic_ack(delivery_tag=method.delivery_tag) # 如果没有自动确认则需手工调用确认方法,method.delivery_tag为消息编号 channel.basic_consume(queue=queue_name, on_message_callback=callback) print("Waiting for message.") channel.start_consuming() connection.close() |
RabbitMQ RPC
为了说明如何使用RPC服务,我们将创建一个简单的客户机类。它将公开一个名为run的方法,该方法发送RPC请求并阻塞,直到收到答案:
RPC Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika def fib(n): """ :param n: 位数 :return: 返回第n位斐波那契数列 """ if n == 0: return 0 elif n == 1: return 1 else: return fib(n-1) + fib(n-2) def response(ch, method, pros, body): result = fib(int(body)) # 计算斐波那契数列 # 发送消息到客户端指定的返回队列 ch.basic_publish(exchange="", routing_key=pros.reply_to, properties=pika.BasicProperties(correlation_id=pros.correlation_id), body=str(result)) connection = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) channel = connection.channel() channel.queue_declare(queue="rpc_queue") channel.basic_qos(prefetch_count=1) # 指定缓存窗口大小,只有当缓存窗口未满时才会分发消息给消费者 channel.basic_consume(queue="rpc_queue", on_message_callback=response, auto_ack=True) # 收到消息后交给response函数处理 print("Server started.") channel.start_consuming() # 接收客户端消息 connection.close() |
RPC Client
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import pika import uuid import time class RpcClient(object): """ RCP客户端 """ def __init__(self): self.uid = "" self.response = None self.conn = pika.BlockingConnection(pika.ConnectionParameters(host="192.168.80.10")) self.channel = self.conn.channel() queue_tmp = self.channel.queue_declare(queue="", exclusive=True) # 通过指定空白的队列名称来生成一个随机的queue self.callback_queue = queue_tmp.method.queue # 获取队列名称 self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.get_result, auto_ack=True) def get_result(self, ch, method, properties, body): """ 获取队列中的消息内容 :param ch: 返回消息的通道名 :param method: :param properties: 消息的附加属性 :param body: 消息体 :return: 返回数字形式的消息体 """ if properties.correlation_id == self.uid: self.response = int(body) def run(self, n): self.response = None # 重置状态 self.uid = str(uuid.uuid4()) # 生成本次消息的uid self.channel.basic_publish(exchange="", routing_key="rpc_queue", properties=pika.BasicProperties(reply_to=self.callback_queue, correlation_id=self.uid), body=str(n)) while self.response is None: time.sleep(0.5) self.conn.process_data_events() # 查询是否有新消息 return self.response Fib_RpcClient = RpcClient() while True: try: num = int(input(">> ")) print("获取第[%s]位斐波那契数列是[%s]" % (num, Fib_RpcClient.run(num))) except ValueError: print("非法输入,请输入一个正整数") |
Redis
redis基本操作(redis-cli)
字符串操作
# 在Redis中设置值,默认,不存在则创建,存在则修改 set key value [expiration EX seconds|PX milliseconds] [NX|XX] # 参数 key: 键名 value: 键值 ex: 设置过期时间(秒) px: 过期时间(毫秒) nx: 如果设置nx,则只有name不存在时,当前set操作才执行(仅新建) xx: 如果设置xx,则只有name存在时,当前set操作才执行(仅更新) 例:set 1 a ex 60 nx # 获取键值 get key # 参数 key: 键名 # 设置值,只有name不存在时,执行设置操作(添加) setnx key, value # 更新键值和过期时间,只有当name存在时,才执行设置操作 setex key second value # 参数 second: 过期时间(秒) # 以毫秒为过期时间设置值 psetex name milliseconds value # 参数 milliseconds: 过期时间(毫秒) # 批量设置值 mset key value [key value...] 例: mset 1 a 2 b # 批量获取值 mget key [key ...] 例: mset 1 2 # 为键赋与新值同时返回旧值 getset key value # 获取键值的指定位置的文本 getrange key start end # 参数 start: 开始位置,0表示第一位 end: 结束位置 # 从键值指定位置开始修改值 setrange key offset value # 参数 offset: 偏移位置 # 设置二进制位置的值 setbit key offset value # 参数 offset: 要设置的二进制位置(一个字节占8位) # 获取指定位置的二进制值 getbit key offset # 返回指定键值中的二进制1的个数 bitcount key [start end] # 参数 start: 开始计数的二进制位置 end: 结束计数的二进制位置 # 返回指定键值的长度(UTF-8编码字节数) strlen key # 在指定键值后面追加字符串 append key value # 对数字类型的键值进行递增(加1) incr key # 对数字类型的键值进行递减(减1) decr key # 对数字类型的键值增加指定数值 incrby key increment # 参数 increment: 增加的值 # 对数字类型的键值减少指定数值 decrby key decrement # 参数 decrement: 减少的值 # 对指定键值加上浮点数值 incrbyfloat key increment # 参数 increment: 要增加的值(浮点数) # 搜索符合条件的键值 scan cursor [MATCH pattern] [COUNT count] # 参数 cursor: 游标位置,配合count参数使用,当结果数量大于count数量时,返回一个非0的游标位置, 下次scan时可以从返回的游标位置继续往后搜索 pattern: 匹配条件(可选) count: 返回结果数量(可选)
hash操作(类似于字典)
# 设置一个hash键值对 hset key field value # 参数 key: 键名 field: 键值中的字段名 value: 键值中的字段值 # 批量设置hash键值对 hmset key field value [field value ...] # 例子: hmset info name Bob age 22 # 获取hash键值的指定字段的值 hget key field # 获取hash键值的所有字段 hgetall key # 获取hash键值的字段个数 hlen key # 获取hash键值中的所有字段名 hkyes key # 获取hash键值中的所有字段值 hvals key # 检查hash键值中是否存在指定的字段 hexists key field # 删除hash键值中的指定字段 hdel key field [field ...] # 对hash键值中的指定字段增加指定数值 hincrby key field increment # 参数 increment: 要增加的值 # 对hash键值中的指定字段增加指定浮点数 hincrbyfloat key field increment # 参数 increment: 要增加的值(浮点数) # 从hash键值中搜索指定的字段名 hscan key cursor [MATCH pattern] [COUNT count] # 参数 cursor: 游标位置,配合count参数使用,当结果数量大于count数量时,返回一个非0的游标位置, 下次scan时可以从返回的游标位置继续往后搜索 pattern: 匹配条件(可选) count: 返回结果数量(可选) # 例: 查找n开头的字段 hscan info 0 n*
List操作
# 存放一个List类型的键值,一次可以放多个value(从右往左放) lpush key value [value ...] # 例: lpush list1 1 2 3 # 列表结果:3 2 1 # 存放一个List类型的键值,一次可以放多个value(从左往右放) rpush key value [value ...] # 例: lpush list2 1 2 3 # 列表结果:1 2 3 # 仅当key存在时,才在左边添加value lpushx key value # 仅当key存在时,才在右边添加value rpushx key value # 获取List键值的内容 lrange key start stop # 参数 start: 开始index,0 表示开头 stop: 结束index,-1表是到结尾 # 获取指定list键值的长度(列表元素个数) llen key # 在指定值的前面或后面插入值 linsert key BEFORE|AFTER pivot value # 参数 BEFORE|AFTER: 插入方式 pivot: 参考元素 # 例 linsert list1 after 2 a # 在元素2的后面插入a # 修改list键值指定位置元素的值 lset key index value # 参数 index: 元素在列表中的的位置 # 删除list键值中指定内容的元素 lrem key count value # 参数 count: 要删除的个数(多个相同元素删除),为正数时从左往右删,为负数时从右往左删 # 从list键值中获取最左侧的元素返回并删除 lpop key # 从list键值中获取最右侧的元素返回并删除 rpop key # 获取list指定位置的值 lindex key index # 从list删除指定范围以外的值 ltrim key start stop # 参数 start: 保留开始的位置 stop: 暴力结束的位置 # 将一个list键值的最右边的元素移到另一个list键值最左边 rpoplpush source destination # 参数 source: 要从最右边删除元素的list destination: 要在左边添加元素的list # 从list键值中取出最左边的元素 blpop key [key ...] timeout # 参数 timeout: 当没有元素可取时等待超时的时间(秒) # 从list键值中取出最右边的元素 brpop key [key ...] timeout # 参数 timeout: 当没有元素可取时等待超时的时间(秒) # 将一个list键值的最右边的元素移到另一个list键值最左边 brpoplpush source destination timeout # 参数 source: 要从最右边删除元素的list destination: 要在左边添加元素的list timeout: 当源list没有元素可取时等待超时的时间(秒)
set操作(集合)
# 添加一个集合类型的键值,如果集合已经存在,在追加新的值 sadd key member [member ...] # 参数 key: 键名 member: 集合成员(自动去重) # 获取集合中改动全部成员 smembers key # 获取集合键中的成员数量 scard key # 求差集(在第一个集合中不在第二个集合中的值) sdiff key [key ...] # 求差集,并将结果保存到另外一个集合 sdiffstore destination key [key ...] # 参数 destination: 要保存结果的目标集合 # 求两个集合的交集 sinter key [key ...] # 求两个集合的交集,并将结果保存到另外一个集合 sinterstore destination key [key ...] # 求两个集合的并集 sunion key [key ...] # 求两个集合的并集,并将结果保存到另外一个集合 sunionstore destination key [key ...] # 判断值是否是集合成员 sismember key member # 参数 member: 要进行判断的值 # 将集合中的一个成员转移到另外一个集合 smove source destination member # 参数 source: 源集合 destination: 目标集合 member: 成员值 # 从集合尾部取出值并删除 spop key [count] # 参数 count: 要取出的值的个数(可选) # 从集合中随机取出值 srandmember key [count] # 从集合中删除指定的值 srem key member [member ...] # 从集合键值中搜索符合条件的值,与string和hash的scan用法一致 sscan key cursor [MATCH pattern] [COUNT count]
有序集合
有序集合是在集合的基础上,为每个元素指定一个分数(权重)以进行排序(分数高的靠右)
# 添加一个有序集合,如果键已经存在,则添加或修改集合键中的值(权重) zadd key [NX|XX] [CH] [INCR] score member [score member ...] # 参数 NX: 只有当member不存在时才执行操作(用于添加新成员) XX: 只有当member存在时才执行操作(用于修改现有成员分数) CH: 指定返回值为发生变化(包括新增和修改)的成员总数,默认返回值只是新增的成员数 INCR: 在原有的分数基础上增加 score: 分数,即权值 member: 成员值 # 返回集合中成员数量 zcard key # 统计集合中在指定分数范围内的数字 zcount key min max # 参数 min: 分数下限 max: 分数上限 # 增加集合中指定成员的分数 zincrby key increment member # 获取集合中成员 zrange key start stop [WITHSCORES] # 参数 start: 开始位置(0表示开头) stop: 结束位置(-1表示末尾) WITHSCORES: 展示成员的分数(可选) # 获取集合中指定成员的位置 zrank key member # 删除集合中指定的值 zrem key member [member ...] # 根据位置范围删除集合中的值 zremrangebyrank key start stop # 根据分数范围删除集合中的值 zremrangebyscore key min max # 查找集合中指定值的分数 zscore key member # 求两个有序集合的并集 zunionstore destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX] # 参数 destination: 要保存交集的目标集合 numkeys: 将要计算交集的集合数 weight: 为每个键指定权重,即进行分数合并时前面的key中元素的分数要乘以权重 AGGREGATE: 对两个集合中元素的分数的聚合方式 SUM: 合并所有集合中该元素的分数(默认) MIN: 取集合中该元素最小的分数 MAX: 取集合中该元素最大的分数 # 例 zunionstore zset3 2 zset1 zset2 WEIGHTS 0.5 1 # 求zset1和zset2的并集,保存到zset3,zset1中元素的分数要乘以0.5 # 求两个有序集合的交集,参数与求并集一样 zinterstore destination numkeys key [key ...] [WEIGHTS weight] [AGGREGATE SUM|MIN|MAX] # 从有序集合中搜索符合条件的元素 zscan key cursor [MATCH pattern] [COUNT count]
通用操作
# 删除指定键 del key [key ...] # 判断键是否存在 exists key [key ...] # 为一个键指定过期时间(过期自动删除) expire key seconds # 参数 seconds: 多少秒后过期 # 重命名键 rename key newkey # 参数 newkey: 新键名 # 将指定的键移动到新的db下 move key db # 参数 db: 要移动到的目标db编号 # 随机返回一个存在的键名 randomkey # 查询指定键的类型 type key
redis管道(pipeline)
实现一次请求指定多个命令,默认情况下一次pipline是原子性操作(相当于sql的事务)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import redis import time pool = redis.ConnectionPool(host="192.168.80.10", port=6379, db=6) # 创建一个redis连接池 r = redis.Redis(connection_pool=pool) # 从连接池中申请连接 p = r.pipeline(transaction=True) # 声明一个管道,可以将多个命令合并执行 p.set("1001", "aaa") # 在管道中加入一个set命令 time.sleep(10) p.set("1002", "bbb") p.execute() # 最终执行,将两次set一起完成,在此之前不会有值 |
redis发布订阅
发布者(publish)
1 2 3 4 5 6 7 8 9 10 11 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import redis conn = redis.Redis(host="192.168.80.10") channel = "ch1" # 发布的通道名 while True: msg = input(">> ") conn.publish(channel, msg) # 发布消息到通道 |
订阅者(pubsub)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
#!/usr/bin/env python36 # -*- coding: utf-8 -*- # Author: Bob import redis conn = redis.Redis(host="192.168.80.10") channel = "ch1" # 订阅的通道名 pub = conn.pubsub() # 创建订阅者 pub.subscribe(channel) # 订阅通道 pub.parse_response() # 准备接收(处理第一条系统消息[b'subscribe', b'ch1', 1]) while True: msg = pub.parse_response() # 接收数据 print(msg) |
原文链接:Python 从入门到放弃 - Lesson 11 redis缓存、rabbitMQ队列,转载请注明来源!