1. 模式说明
代码模拟大批量短信发送场景
1.sms类
public class SMS {
private String name;
private String mobile;
private String content;
public SMS(String name, String mobile, String content) {
this.name = name;
this.mobile = mobile;
this.content = content;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
2.短信生产者
import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class OrderSystem {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
for(int i = 1 ; i <= 100 ; i++) {
SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
String jsonSMS = new Gson().toJson(sms);
channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
}
System.out.println("发送数据成功");
channel.close();
connection.close();
}
}
3.短信消费者1
import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消费者
*/
public class SMSSender1 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
4.短信消费者2
import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消费者
*/
public class SMSSender2 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender2-短信发送成功:" + jsonSMS);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
5.短信消费者3
import com.baiqi.rabbitmq.utils.RabbitConstant;
import com.baiqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消费者
*/
public class SMSSender3 {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
//如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
//basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
channel.basicQos(1);//处理完一个取一个
channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("SMSSender3-短信发送成功:" + jsonSMS);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
运行结果: