Redis适合做消息队列吗? https://mp.weixin.qq.com/s/rPc05bYUhRJ_fW9-GjpSHw

四种实现方法

  • 基于List实现
  • PUB/SUB,订阅/发布模式
  • 基于Sorted-Set的实现
  • 基于Stream类型的实现

原理

在Redis中,List类型是按照插入顺序排序的字符串链表。和数据结构中的普通链表一样,我们可以在其头部(left)和尾部(right)添加新的元素。在插入时,如果该键并不存在,Redis将为该键创建一个新的链表。与此相反,如果链表中所有的元素均被移除,那么该键也将会被从数据库中删除。List中可以包含的最大元素数量是4294967295。
从元素插入和删除的效率视角来看,如果我们是在链表的两头插入或删除元素,这将会是非常高效的操作,即使链表中已经存储了百万条记录,该操作也可以在常量时间内完成。然而需要说明的是,如果元素插入或删除操作是作用于链表中间,那将会是非常低效的。相信对于有良好数据结构基础的开发者而言,这一点并不难理解。
使用过程中,rpush生产消息,lpop消费消息。当lpop没有消息的时候,要适当sleep一会再重试。如果不用sleep,list还有个指令叫blpop,在没有消息的时候,它会阻塞住直到消息到来。

能不能生产一次消费多次呢?
使用pub/sub主题订阅者模式,可以实现1:N的消息队列。如果对方追问pub/sub有什么缺点?在消费者下线的情况下,生产的消息会丢失,得使用专业的消息队列如rabbitmq等。

redis如何实现延时队列?
使用sortedset,拿时间戳作为score,消息内容作为key调用zadd来生产消息,消费者用zrangebyscore指令获取N秒之前的数据轮询进行处理。

场景

消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式。利用redis这两种场景的消息队列都能够实现。
消息队列 - 图1

生产者消费者模式示例

生产者生产消息放到队列里,多个消费者同时监听队列,谁先抢到消息谁就会从队列中取走消息;即对于每个消息只能被最多一个消费者拥有。
生产者

  1. import time
  2. import redis
  3. pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=2, decode_responses=True)
  4. r = redis.Redis(connection_pool=pool)
  5. def product(i):
  6. length = r.llen("goods2")
  7. print(length)
  8. if length > 5000:
  9. print("长度过大睡一会")
  10. time.sleep(1)
  11. product(i)
  12. else:
  13. # 生产者
  14. r.lpush("goods2", "good1" + str(i))
  15. print("加入一个值睡一会")
  16. # time.sleep(5)
  17. if __name__ == '__main__':
  18. # 此处表示循环10000次,往redis里面放10000次数据
  19. for i in range(10000):
  20. product(i)

消费者
消费者开启多进程,在消费者开启多进程的时候我们会遇到一个问题,就是多个进程同时去抢同一个资源的情况,这个时候我们可以选择加锁到资源,也就是redis会话队列上,当某个进程拿资源的时候redis会话队列加上锁,保证其他进程拿不到这个资源,当这个进程拿完资源后,释放锁,让其他进程去抢占资源:

  1. import time
  2. import redis
  3. from multiprocessing import Process, Lock
  4. pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=2, decode_responses=True)
  5. r = redis.Redis(connection_pool=pool)
  6. def users(lock):
  7. length = r.llen("goods2")
  8. print(length)
  9. while True:
  10. # print(1)
  11. # 对资源进行加锁
  12. lock.acquire()
  13. if length > 0:
  14. goods = r.lpop("goods2")
  15. # 获得资源后释放锁
  16. lock.release()
  17. # 以下可以写自己的业务逻辑操作
  18. try:
  19. data = goods
  20. print(data)
  21. if str(goods) == "None":
  22. print("无值多等等")
  23. time.sleep(2)
  24. except:
  25. print("无值等等")
  26. time.sleep(2)
  27. users()
  28. else:
  29. print("无值等等")
  30. time.sleep(10)
  31. users(lock)
  32. if __name__ == '__main__':
  33. lock = Lock()
  34. processes = []
  35. for i in range(20):
  36. p = Process(target=users, args=(lock,))
  37. p.start()
  38. processes.append(p)
  39. for p in processes:
  40. p.join()
  41. print('处理完成')

发布者订阅者模式示例

发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息;即正常情况下每个消费者收到的消息应该都是一样的。
redis发布订阅功能用于消息的传输;redis发布订阅机制包含3个部分:发布者,订阅者,channel(频道)

消息队列 - 图2

# redis_helper.py
import redis


class RedisHelper(object):
    def __init__(self):
        self.pool = redis.ConnectionPool(host='127.0.0.1', port=6379, db=1, decode_responses=True)
        self.__conn = redis.Redis(connection_pool=self.pool)
        # 频道名称
        self.chan_sub = "orders"

    def public(self, msg):
        """
        在指定频道上发布消息
        :param msg:
        :return:
        """
        # publish(): 在指定频道上发布消息,返回订阅者的数量
        self.__conn.publish(self.chan_sub, msg)
        return True

    def subscribe(self):
        # 返回发布订阅对象,通过这个对象你能1)订阅频道 2)监听频道中的消息
        pub = self.__conn.pubsub()
        # 订阅某个频道,与publish()中指定的频道一样。消息会发布到这个频道中
        pub.subscribe(self.chan_sub)
        return pub

发布者

# redis_pub.py
from redis_helper import RedisHelper

obj = RedisHelper()
for i in range(5):
    obj.public("hello_%s" % i)

订阅者

# redis_sub.py
from redis_helper import RedisHelper

obj = RedisHelper()
redis_sub = obj.subscribe()
while True:
    # listen()函数封装了parse_response()函数
    msg = redis_sub.listen()
    for i in msg:
        if i["type"] == "message":
            print(str(i["channel"]) + ":" + str(i["data"]))
        elif i["type"] == "subscrube":
            print(str(i["chennel"]))