1. 模式说明
    image.png
    image.png
    image.png
    image.png
    代码模拟大批量短信发送场景
    1.sms类

    1. public class SMS {
    2. private String name;
    3. private String mobile;
    4. private String content;
    5. public SMS(String name, String mobile, String content) {
    6. this.name = name;
    7. this.mobile = mobile;
    8. this.content = content;
    9. }
    10. public String getName() {
    11. return name;
    12. }
    13. public void setName(String name) {
    14. this.name = name;
    15. }
    16. public String getMobile() {
    17. return mobile;
    18. }
    19. public void setMobile(String mobile) {
    20. this.mobile = mobile;
    21. }
    22. public String getContent() {
    23. return content;
    24. }
    25. public void setContent(String content) {
    26. this.content = content;
    27. }
    28. }

    2.短信生产者

    1. import com.baiqi.rabbitmq.utils.RabbitConstant;
    2. import com.baiqi.rabbitmq.utils.RabbitUtils;
    3. import com.google.gson.Gson;
    4. import com.rabbitmq.client.Channel;
    5. import com.rabbitmq.client.Connection;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. public class OrderSystem {
    9. public static void main(String[] args) throws IOException, TimeoutException {
    10. Connection connection = RabbitUtils.getConnection();
    11. Channel channel = connection.createChannel();
    12. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
    13. for(int i = 1 ; i <= 100 ; i++) {
    14. SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");
    15. String jsonSMS = new Gson().toJson(sms);
    16. channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes());
    17. }
    18. System.out.println("发送数据成功");
    19. channel.close();
    20. connection.close();
    21. }
    22. }

    3.短信消费者1

    1. import com.baiqi.rabbitmq.utils.RabbitConstant;
    2. import com.baiqi.rabbitmq.utils.RabbitUtils;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. /**
    6. * 消费者
    7. */
    8. public class SMSSender1 {
    9. public static void main(String[] args) throws IOException {
    10. Connection connection = RabbitUtils.getConnection();
    11. final Channel channel = connection.createChannel();
    12. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
    13. //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
    14. //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
    15. channel.basicQos(1);//处理完一个取一个
    16. channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    19. String jsonSMS = new String(body);
    20. System.out.println("SMSSender1-短信发送成功:" + jsonSMS);
    21. try {
    22. Thread.sleep(10);
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. channel.basicAck(envelope.getDeliveryTag(), false);
    27. }
    28. });
    29. }
    30. }

    4.短信消费者2

    1. import com.baiqi.rabbitmq.utils.RabbitConstant;
    2. import com.baiqi.rabbitmq.utils.RabbitUtils;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. /**
    6. * 消费者
    7. */
    8. public class SMSSender2 {
    9. public static void main(String[] args) throws IOException {
    10. Connection connection = RabbitUtils.getConnection();
    11. final Channel channel = connection.createChannel();
    12. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
    13. //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
    14. //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
    15. channel.basicQos(1);//处理完一个取一个
    16. channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    19. String jsonSMS = new String(body);
    20. System.out.println("SMSSender2-短信发送成功:" + jsonSMS);
    21. try {
    22. Thread.sleep(100);
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. channel.basicAck(envelope.getDeliveryTag(), false);
    27. }
    28. });
    29. }
    30. }

    5.短信消费者3

    1. import com.baiqi.rabbitmq.utils.RabbitConstant;
    2. import com.baiqi.rabbitmq.utils.RabbitUtils;
    3. import com.rabbitmq.client.*;
    4. import java.io.IOException;
    5. /**
    6. * 消费者
    7. */
    8. public class SMSSender3 {
    9. public static void main(String[] args) throws IOException {
    10. Connection connection = RabbitUtils.getConnection();
    11. final Channel channel = connection.createChannel();
    12. channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);
    13. //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者
    14. //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的
    15. channel.basicQos(1);//处理完一个取一个
    16. channel.basicConsume(RabbitConstant.QUEUE_SMS, false, new DefaultConsumer(channel) {
    17. @Override
    18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    19. String jsonSMS = new String(body);
    20. System.out.println("SMSSender3-短信发送成功:" + jsonSMS);
    21. try {
    22. Thread.sleep(500);
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. channel.basicAck(envelope.getDeliveryTag(), false);
    27. }
    28. });
    29. }
    30. }

    运行结果:
    image.png