Redis适合做消息队列吗? https://mp.weixin.qq.com/s/rPc05bYUhRJ_fW9-GjpSHw
四种实现方法
- 基于List实现
- PUB/SUB,订阅/发布模式
- 基于Sorted-Set的实现
- 基于Stream类型的实现
原理
在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是4294967295。
从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。然而需要说明的是,如果元素插入或删除操作是作用于链表中间,那将会是非常低效的。相信对于有良好数据结构基础的开发者而言,这一点并不难理解。
使用过程中,rpush生产消息,lpop消费消息。当lpop没有消息的时候,要适当sleep一会再重试。如果不用sleep,list还有个指令叫blpop,在没有消息的时候,它会阻塞住直到消息到来。
能不能生产一次消费多次呢?
使用pub/sub主题订阅者模式,可以实现1:N的消息队列。如果对方追问pub/sub有什么缺点?在消费者下线的情况下,生产的消息会丢失,得使用专业的消息队列如rabbitmq等。
redis如何实现延时队列?
使用sortedset,拿时间戳作为score,消息内容作为key调用zadd来生产消息,消费者用zrangebyscore指令获取N秒之前的数据轮询进行处理。
场景
消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式。利用redis这两种场景的消息队列都能够实现。
生产者消费者模式示例
生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。
生产者
import timeimport redispool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=2, decode_responses=True)r = redis.Redis(connection_pool=pool)def product(i):length = r.llen("goods2")print(length)if length > 5000:print("长度过大睡一会")time.sleep(1)product(i)else:# 生产者r.lpush("goods2", "good1" + str(i))print("加入一个值睡一会")# time.sleep(5)if __name__ == '__main__':# 此处表示循环10000次,往redis里面放10000次数据for i in range(10000):product(i)
消费者
消费者开启多进程,在消费者开启多进程的时候我们会遇到一个问题,就是多个进程同时去抢同一个资源的情况,这个时候我们可以选择加锁到资源,也就是redis会话队列上,当某个进程拿资源的时候redis会话队列加上锁,保证其他进程拿不到这个资源,当这个进程拿完资源后,释放锁,让其他进程去抢占资源:
import timeimport redisfrom multiprocessing import Process, Lockpool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=2, decode_responses=True)r = redis.Redis(connection_pool=pool)def users(lock):length = r.llen("goods2")print(length)while True:# print(1)# 对资源进行加锁lock.acquire()if length > 0:goods = r.lpop("goods2")# 获得资源后释放锁lock.release()# 以下可以写自己的业务逻辑操作try:data = goodsprint(data)if str(goods) == "None":print("无值多等等")time.sleep(2)except:print("无值等等")time.sleep(2)users()else:print("无值等等")time.sleep(10)users(lock)if __name__ == '__main__':lock = Lock()processes = []for i in range(20):p = Process(target=users, args=(lock,))p.start()processes.append(p)for p in processes:p.join()print('处理完成')
发布者订阅者模式示例
发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。
redis发布订阅功能用于消息的传输;redis发布订阅机制包含3个部分:发布者,订阅者,channel(频道)

# redis_helper.py
import redis
class RedisHelper(object):
def __init__(self):
self.pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1, decode_responses=True)
self.__conn = redis.Redis(connection_pool=self.pool)
# 频道名称
self.chan_sub = "orders"
def public(self, msg):
"""
在指定频道上发布消息
:param msg:
:return:
"""
# publish(): 在指定频道上发布消息,返回订阅者的数量
self.__conn.publish(self.chan_sub, msg)
return True
def subscribe(self):
# 返回发布订阅对象,通过这个对象你能1)订阅频道 2)监听频道中的消息
pub = self.__conn.pubsub()
# 订阅某个频道,与publish()中指定的频道一样。消息会发布到这个频道中
pub.subscribe(self.chan_sub)
return pub
发布者
# redis_pub.py
from redis_helper import RedisHelper
obj = RedisHelper()
for i in range(5):
obj.public("hello_%s" % i)
订阅者
# redis_sub.py
from redis_helper import RedisHelper
obj = RedisHelper()
redis_sub = obj.subscribe()
while True:
# listen()函数封装了parse_response()函数
msg = redis_sub.listen()
for i in msg:
if i["type"] == "message":
print(str(i["channel"]) + ":" + str(i["data"]))
elif i["type"] == "subscrube":
print(str(i["chennel"]))
