Work Queues
工作队列(任务队列)主要思想是避免立即执行资源密集型任务,而不得不等待它完成。我们将任务封装成消息发送到队列中,在后台的工作线程(消费者)弹出任务并执行。当有多个工作线程一起处理任务时,一个消息只能被消费一次,默认轮询策略分发机制。
RabbitMQ连接工具类
public class RabbitMqUtils { //获取连接 public static Connection getConnection() { //创建一个连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置工厂IP 连接RabbitMQ的队列 factory.setHost("47.172.193.131"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123"); factory.setVirtualHost("/"); Connection connection = null; try { connection = factory.newConnection(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } return connection; } //得到一个信道 public static Channel getChannel() { Channel channel = null; try { channel = getConnection().createChannel(); } catch (IOException e) { e.printStackTrace(); } return channel; } //资源关闭 public static void close(Connection connection, Channel channel) { if (channel != null) { try { channel.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } }}
消息轮询消费
工作线程1 工作线程2task0 task1task2 task3task4 task5
消息应答
RabbitMQ一但向消费者投递一条消息便立即标记为删除,如果消费者在处理消息的过程中宕机了,将丢失正在处理的消息。为了保证消息在发送过程中不丢失,RabbitMQ引入了应答机制,在消费者处理消息完毕后,告诉RabbitMQ可以删除消息了。自动应答: 不在乎消费者对消息处理是否成功,都会告诉队列删除消息。如果处理消息失败,TCP连接断开, RabbitMQ将实现自动补偿,重新将消息发送到其他消费者。手动应答: 消费者处理完业务逻辑,手动返回Ack(通知)告诉队列处理完了,队列进而删除消息。//手动应答 参数1:消息的唯一标识 参数2:是否批量应答channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
消息持久化
RabbitMQ的消息和队列默认是在内存中的,如果Rabbit Server宕机会导致消息丢失,我们可以通过标记消息和队列为持久化避免这个问题。#创建队列第二个参数为是否创建持久化队列queueDeclare(QUEUE_NAME, true, false, false, null);#MessageProperties.PERSISTENT_TEXT_PLAIN:标记消息为持久化#但不能完全保证消息不会丢失,如果消息在内存写入磁盘的那一瞬间宕机依然会导致消息丢失。channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
不公平分发
RabbitMQ默认采用轮询分发,如果消费者处理消息能力不同,会导致处理速度快的消费者和处理速度慢的消费者得到同样数量的消息,这样会使处理能力快的消费者大部分时间都处于空闲状态,造成资源浪费。#消费方设置分发策略为不公平分发channel.basicQos(1)
预取值
可以手动设置某个消费者单次接收多少条消息进行处理,设置信道的容量。int prefetchCount = 5;#设置预取值为5channel.basicQos(prefetchCount);
发布确认
在消息持久化的过程中仍然存在极小概率出现缓存写入磁盘宕机导致数据丢失的可能,需要采用发布确认保证消息不丢失。发布确认就是在消息从内存写入到磁盘完毕后,需要响应给消息提供者持久化完成,才算持久化成功。#发布确认默认是不开启的,需要调用channel的confirmSelect()方法channel.confirmSelect();#1.单个确认发布同步确认发布,发布一个消息后只有在它被确认发布后,后续发布才能进行,如果在指定的时间内没有返回确认消息就会抛异常。缺点:发布速度比较慢,因为消息的确认过程是阻塞的,一般适用于每秒不超过数百条发布消息吞吐量的小型应用程序。channel.confirmSelect();#开启单个发布确认channel.waitForConfirms();#确认消息发布,返回true为发布成功#2.批量确认发布先发布一批消息然后一起确认,相比于单个发布确认提高了吞吐量,缺点就是如果发生故障导致发布出现问题,无法定位到出现问题的消息,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布,同时也是阻塞的。在单个确认发布的基础上指定一个量,达到这个量就进行一次确认发布。#3.异步确认发布可靠性和性能都能得到极大的保证,需要一个Map标记消息的key和value,利用回调函数来达到消息可靠性传递,ack确认应答回调和nack未确认应答回调函数,非阻塞型。//消息确认成功回调函数 ackConfirmCallback ackCallback = (deliveryTag, multiple) -> { //消息成功回调函数 System.out.println("消息确认成功" + deliveryTag);};//消息确认失败回调函数 nackConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("消息确认失败" + deliveryTag);};channel.addConfirmListener(ackCallback, nackCallback);#处理异步未确认消息最好就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,例如用ConcurrentSkipListMap可以在confirm callbacks与发布线程之间进行消息的传递。[单次确认发布耗时(1800,2000)毫秒][批量确认发布耗时(200,300)毫秒][异步确认发布耗时(90,110)毫秒]
//记录所有发布的消息 key:序号 value:消息体outstandingConfirms.put(channel.getNextPublishSeqNo(), message + i);
/* * 线程安全有序的一个哈希表,适用于高并发的情况下 * 1.轻松的将序号和消息关联 * 2.可以轻松的批量删除 * 3.支持高并发 */ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();//消息确认成功回调函数 ackConfirmCallback ackCallback = (deliveryTag, multiple) -> { //消息成功回调函数 - 删除确认消息编号(剩下的是未被确认的消息) if (multiple) { //批量删除 ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(deliveryTag); confirmed.clear(); } else { outstandingConfirms.remove(deliveryTag); }};//消息确认失败回调函数 nackConfirmCallback nackCallback = (deliveryTag, multiple) -> { System.out.println("未确认消息" + outstandingConfirms.get(deliveryTag));};channel.addConfirmListener(ackCallback, nackCallback);