1. 模式说明



代码模拟大批量短信发送场景
1.sms类
public class SMS {private String name;private String mobile;private String content;public SMS(String name, String mobile, String content) {this.name = name;this.mobile = mobile;this.content = content;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMobile() {return mobile;}public void setMobile(String mobile) {this.mobile = mobile;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}}
2.短信生产者
import com.baiqi.rabbitmq.utils.RabbitConstant;import com.baiqi.rabbitmq.utils.RabbitUtils;import com.google.gson.Gson;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import java.io.IOException;import java.util.concurrent.TimeoutException;public class OrderSystem {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);for(int i = 1 ; i <= 100 ; i++) {SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");String jsonSMS = new Gson().toJson(sms);channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());}System.out.println("发送数据成功");channel.close();connection.close();}}
3.短信消费者1
import com.baiqi.rabbitmq.utils.RabbitConstant;import com.baiqi.rabbitmq.utils.RabbitUtils;import com.rabbitmq.client.*;import java.io.IOException;/*** 消费者*/public class SMSSender1 {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender1-短信发送成功:" + jsonSMS);try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}});}}
4.短信消费者2
import com.baiqi.rabbitmq.utils.RabbitConstant;import com.baiqi.rabbitmq.utils.RabbitUtils;import com.rabbitmq.client.*;import java.io.IOException;/*** 消费者*/public class SMSSender2 {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender2-短信发送成功:" + jsonSMS);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}});}}
5.短信消费者3
import com.baiqi.rabbitmq.utils.RabbitConstant;import com.baiqi.rabbitmq.utils.RabbitUtils;import com.rabbitmq.client.*;import java.io.IOException;/*** 消费者*/public class SMSSender3 {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的channel.basicQos(1);//处理完一个取一个channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String jsonSMS = new String(body);System.out.println("SMSSender3-短信发送成功:" + jsonSMS);try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}channel.basicAck(envelope.getDeliveryTag(), false);}});}}
运行结果:
