生产者

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. /**
  8. * 类说明:Topic类型的生产者
  9. * 假设有交换器 topic_course,
  10. * 讲课老师有king,mark,james,
  11. * 技术专题有kafka,jvm,redis,
  12. * 课程章节有 A、B、C,
  13. * 路由键的规则为 讲课老师+“.”+技术专题+“.”+课程章节,如:king.kafka.A。
  14. * 生产者--生产全部的消息3*3*3=27条消息
  15. */
  16. public class TopicProducer {
  17. public final static String EXCHANGE_NAME = "topic_course";
  18. public static void main(String[] args)
  19. throws IOException, TimeoutException {
  20. /**
  21. * 创建连接连接到RabbitMQ
  22. */
  23. ConnectionFactory factory = new ConnectionFactory();
  24. //连接rabbitMq的地址
  25. factory.setHost("zjj101");
  26. /*设置登陆用户名和密码*/
  27. factory.setUsername("guest");
  28. factory.setPassword("guest");
  29. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  30. factory.setVirtualHost("/");
  31. // 创建一个连接
  32. Connection connection = factory.newConnection();
  33. // 创建一个信道
  34. Channel channel = connection.createChannel();
  35. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
  36. /*我们的课程,路由键最终格式类似于:king.kafka.A king.kafka.B*/
  37. String[] techers = {"king", "mark", "james"};
  38. for (int i = 0; i < 3; i++) {
  39. String[] modules = {"kafka", "jvm", "redis"};
  40. for (int j = 0; j < 3; j++) {
  41. String[] servers = {"A", "B", "C"};
  42. for (int k = 0; k < 3; k++) {
  43. // 发送的消息
  44. String message = "Hello Topic_[" + i + "," + j + "," + k + "]";
  45. String routeKey = techers[i % 3] + "." + modules[j % 3]
  46. + "." + servers[k % 3];
  47. channel.basicPublish(EXCHANGE_NAME, routeKey,
  48. null, message.getBytes());
  49. System.out.println(" [x] Sent '" + routeKey + ":'"
  50. + message + "'");
  51. }
  52. }
  53. }
  54. // 关闭频道和连接
  55. channel.close();
  56. connection.close();
  57. }
  58. }

输出:

  1. [x] Sent 'king.kafka.A:'Hello Topic_[0,0,0]'
  2. [x] Sent 'king.kafka.B:'Hello Topic_[0,0,1]'
  3. [x] Sent 'king.kafka.C:'Hello Topic_[0,0,2]'
  4. [x] Sent 'king.jvm.A:'Hello Topic_[0,1,0]'
  5. [x] Sent 'king.jvm.B:'Hello Topic_[0,1,1]'
  6. [x] Sent 'king.jvm.C:'Hello Topic_[0,1,2]'
  7. [x] Sent 'king.redis.A:'Hello Topic_[0,2,0]'
  8. [x] Sent 'king.redis.B:'Hello Topic_[0,2,1]'
  9. [x] Sent 'king.redis.C:'Hello Topic_[0,2,2]'
  10. [x] Sent 'mark.kafka.A:'Hello Topic_[1,0,0]'
  11. [x] Sent 'mark.kafka.B:'Hello Topic_[1,0,1]'
  12. [x] Sent 'mark.kafka.C:'Hello Topic_[1,0,2]'
  13. [x] Sent 'mark.jvm.A:'Hello Topic_[1,1,0]'
  14. [x] Sent 'mark.jvm.B:'Hello Topic_[1,1,1]'
  15. [x] Sent 'mark.jvm.C:'Hello Topic_[1,1,2]'
  16. [x] Sent 'mark.redis.A:'Hello Topic_[1,2,0]'
  17. [x] Sent 'mark.redis.B:'Hello Topic_[1,2,1]'
  18. [x] Sent 'mark.redis.C:'Hello Topic_[1,2,2]'
  19. [x] Sent 'james.kafka.A:'Hello Topic_[2,0,0]'
  20. [x] Sent 'james.kafka.B:'Hello Topic_[2,0,1]'
  21. [x] Sent 'james.kafka.C:'Hello Topic_[2,0,2]'
  22. [x] Sent 'james.jvm.A:'Hello Topic_[2,1,0]'
  23. [x] Sent 'james.jvm.B:'Hello Topic_[2,1,1]'
  24. [x] Sent 'james.jvm.C:'Hello Topic_[2,1,2]'
  25. [x] Sent 'james.redis.A:'Hello Topic_[2,2,0]'
  26. [x] Sent 'james.redis.B:'Hello Topic_[2,2,1]'
  27. [x] Sent 'james.redis.C:'Hello Topic_[2,2,2]'

消费者- 消费所有的消息

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 1、要关注所有的课程,怎么做? #
  6. * # 可以匹配多个
  7. * 类说明:所有的课程内容
  8. */
  9. public class AllConsumer {
  10. public static void main(String[] argv) throws IOException,
  11. InterruptedException, TimeoutException {
  12. ConnectionFactory factory = new ConnectionFactory();
  13. //连接rabbitMq的地址
  14. factory.setHost("zjj101");
  15. /*设置登陆用户名和密码*/
  16. factory.setUsername("guest");
  17. factory.setPassword("guest");
  18. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  19. factory.setVirtualHost("/");
  20. // 打开连接和创建频道,与发送端一样
  21. Connection connection = factory.newConnection();
  22. final Channel channel = connection.createChannel();
  23. channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
  24. BuiltinExchangeType.TOPIC);
  25. // 声明一个随机队列
  26. String queueName = channel.queueDeclare().getQueue();
  27. //!!!!!!!!!消费了所有的消息 #
  28. channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "#");
  29. System.out.println(" [*] Waiting for messages:");
  30. // 创建队列消费者
  31. final Consumer consumerA = new DefaultConsumer(channel) {
  32. @Override
  33. public void handleDelivery(String consumerTag,
  34. Envelope envelope,
  35. AMQP.BasicProperties properties,
  36. byte[] body)
  37. throws IOException {
  38. String message = new String(body, "UTF-8");
  39. System.out.println(" AllConsumer Received "
  40. + envelope.getRoutingKey()
  41. + "':'" + message + "'");
  42. }
  43. };
  44. channel.basicConsume(queueName, true, consumerA);
  45. }
  46. }

启动完了消费者后,启动生产者, 输出日志结果:

所有的消息都被消费了.

  1. [*] Waiting for messages:
  2. AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'
  3. AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'
  4. AllConsumer Received king.kafka.C':'Hello Topic_[0,0,2]'
  5. AllConsumer Received king.jvm.A':'Hello Topic_[0,1,0]'
  6. AllConsumer Received king.jvm.B':'Hello Topic_[0,1,1]'
  7. AllConsumer Received king.jvm.C':'Hello Topic_[0,1,2]'
  8. AllConsumer Received king.redis.A':'Hello Topic_[0,2,0]'
  9. AllConsumer Received king.redis.B':'Hello Topic_[0,2,1]'
  10. AllConsumer Received king.redis.C':'Hello Topic_[0,2,2]'
  11. AllConsumer Received mark.kafka.A':'Hello Topic_[1,0,0]'
  12. AllConsumer Received mark.kafka.B':'Hello Topic_[1,0,1]'
  13. AllConsumer Received mark.kafka.C':'Hello Topic_[1,0,2]'
  14. AllConsumer Received mark.jvm.A':'Hello Topic_[1,1,0]'
  15. AllConsumer Received mark.jvm.B':'Hello Topic_[1,1,1]'
  16. AllConsumer Received mark.jvm.C':'Hello Topic_[1,1,2]'
  17. AllConsumer Received mark.redis.A':'Hello Topic_[1,2,0]'
  18. AllConsumer Received mark.redis.B':'Hello Topic_[1,2,1]'
  19. AllConsumer Received mark.redis.C':'Hello Topic_[1,2,2]'
  20. AllConsumer Received james.kafka.A':'Hello Topic_[2,0,0]'
  21. AllConsumer Received james.kafka.B':'Hello Topic_[2,0,1]'
  22. AllConsumer Received james.kafka.C':'Hello Topic_[2,0,2]'
  23. AllConsumer Received james.jvm.A':'Hello Topic_[2,1,0]'
  24. AllConsumer Received james.jvm.B':'Hello Topic_[2,1,1]'
  25. AllConsumer Received james.jvm.C':'Hello Topic_[2,1,2]'
  26. AllConsumer Received james.redis.A':'Hello Topic_[2,2,0]'
  27. AllConsumer Received james.redis.B':'Hello Topic_[2,2,1]'
  28. AllConsumer Received james.redis.C':'Hello Topic_[2,2,2]'

消费者-指定前缀打头

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. 2、关注king老师的所有课程,怎么办?
  6. *类说明:
  7. */
  8. public class King_AllConsumer {
  9. public static void main(String[] argv) throws IOException,
  10. InterruptedException, TimeoutException {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. //连接rabbitMq的地址
  13. factory.setHost("zjj101");
  14. /*设置登陆用户名和密码*/
  15. factory.setUsername("guest");
  16. factory.setPassword("guest");
  17. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  18. factory.setVirtualHost("/");
  19. // 打开连接和创建频道,与发送端一样
  20. Connection connection = factory.newConnection();
  21. final Channel channel = connection.createChannel();
  22. channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
  23. BuiltinExchangeType.TOPIC);
  24. // 声明一个随机队列
  25. String queueName = channel.queueDeclare().getQueue();
  26. //TODO
  27. channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME, "king.#");
  28. System.out.println(" [*] Waiting for messages:");
  29. // 创建队列消费者
  30. final Consumer consumerA = new DefaultConsumer(channel) {
  31. @Override
  32. public void handleDelivery(String consumerTag,
  33. Envelope envelope,
  34. AMQP.BasicProperties properties,
  35. byte[] body)
  36. throws IOException {
  37. String message = new String(body, "UTF-8");
  38. System.out.println(" AllConsumer Received "
  39. + envelope.getRoutingKey()
  40. + "':'" + message + "'");
  41. }
  42. };
  43. channel.basicConsume(queueName, true, consumerA);
  44. }
  45. }

启动完这一个消费者再启动生产者后输出结果:

king打头的routingKey都被消费了

  1. [*] Waiting for messages:
  2. AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'
  3. AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'
  4. AllConsumer Received king.kafka.C':'Hello Topic_[0,0,2]'
  5. AllConsumer Received king.jvm.A':'Hello Topic_[0,1,0]'
  6. AllConsumer Received king.jvm.B':'Hello Topic_[0,1,1]'
  7. AllConsumer Received king.jvm.C':'Hello Topic_[0,1,2]'
  8. AllConsumer Received king.redis.A':'Hello Topic_[0,2,0]'
  9. AllConsumer Received king.redis.B':'Hello Topic_[0,2,1]'
  10. AllConsumer Received king.redis.C':'Hello Topic_[0,2,2]'

消费者-指定前缀和结尾

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. 3、关注king老师所有的A章节,怎么办?
  6. *类说明:
  7. */
  8. public class King_All_AConsumer {
  9. public static void main(String[] argv) throws IOException,
  10. InterruptedException, TimeoutException {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. //连接rabbitMq的地址
  13. factory.setHost("zjj101");
  14. /*设置登陆用户名和密码*/
  15. factory.setUsername("guest");
  16. factory.setPassword("guest");
  17. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  18. factory.setVirtualHost("/");
  19. // 打开连接和创建频道,与发送端一样
  20. Connection connection = factory.newConnection();
  21. final Channel channel = connection.createChannel();
  22. channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
  23. BuiltinExchangeType.TOPIC);
  24. // 声明一个随机队列
  25. String queueName = channel.queueDeclare().getQueue();
  26. //!!!!!!!!!!!!配置匹配
  27. channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME, "king.*.A");
  28. System.out.println(" [*] Waiting for messages:");
  29. // 创建队列消费者
  30. final Consumer consumerA = new DefaultConsumer(channel) {
  31. @Override
  32. public void handleDelivery(String consumerTag,
  33. Envelope envelope,
  34. AMQP.BasicProperties properties,
  35. byte[] body)
  36. throws IOException {
  37. String message = new String(body, "UTF-8");
  38. System.out.println(" AllConsumer Received "
  39. + envelope.getRoutingKey()
  40. + "':'" + message + "'");
  41. }
  42. };
  43. channel.basicConsume(queueName, true, consumerA);
  44. }
  45. }

启动完这一个消费者再启动生产者后输出结果:

king打头的并且 A结尾的 routingKey都被消费了

  1. [*] Waiting for messages:
  2. AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'
  3. AllConsumer Received king.jvm.A':'Hello Topic_[0,1,0]'
  4. AllConsumer Received king.redis.A':'Hello Topic_[0,2,0]'

消费者-指定中间内容

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. 4、关注kafka所有的课程,怎么办?
  6. *类说明:
  7. */
  8. public class Kafka_AllConsumer {
  9. public static void main(String[] argv) throws IOException,
  10. InterruptedException, TimeoutException {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. //连接rabbitMq的地址
  13. factory.setHost("zjj101");
  14. /*设置登陆用户名和密码*/
  15. factory.setUsername("guest");
  16. factory.setPassword("guest");
  17. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  18. factory.setVirtualHost("/");
  19. // 打开连接和创建频道,与发送端一样
  20. Connection connection = factory.newConnection();
  21. final Channel channel = connection.createChannel();
  22. channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
  23. BuiltinExchangeType.TOPIC);
  24. // 声明一个随机队列
  25. String queueName = channel.queueDeclare().getQueue();
  26. channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME,
  27. "#.kafka.#");
  28. System.out.println(" [*] Waiting for messages:");
  29. // 创建队列消费者
  30. final Consumer consumerA = new DefaultConsumer(channel) {
  31. @Override
  32. public void handleDelivery(String consumerTag,
  33. Envelope envelope,
  34. AMQP.BasicProperties properties,
  35. byte[] body)
  36. throws IOException {
  37. String message = new String(body, "UTF-8");
  38. System.out.println(" AllConsumer Received "
  39. + envelope.getRoutingKey()
  40. + "':'" + message + "'");
  41. }
  42. };
  43. channel.basicConsume(queueName, true, consumerA);
  44. }
  45. }

启动完这一个消费者再启动生产者后输出结果:

中间为Kafka的的 routingKey都被消费了

  1. [*] Waiting for messages:
  2. AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'
  3. AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'
  4. AllConsumer Received king.kafka.C':'Hello Topic_[0,0,2]'
  5. AllConsumer Received mark.kafka.A':'Hello Topic_[1,0,0]'
  6. AllConsumer Received mark.kafka.B':'Hello Topic_[1,0,1]'
  7. AllConsumer Received mark.kafka.C':'Hello Topic_[1,0,2]'
  8. AllConsumer Received james.kafka.A':'Hello Topic_[2,0,0]'
  9. AllConsumer Received james.kafka.B':'Hello Topic_[2,0,1]'
  10. AllConsumer Received james.kafka.C':'Hello Topic_[2,0,2]'

消费者-指定结尾

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 5、关注所有的B章节,怎么办?
  6. * 类说明:
  7. */
  8. public class All_BConsumer {
  9. public static void main(String[] argv) throws IOException,
  10. InterruptedException, TimeoutException {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. //连接rabbitMq的地址
  13. factory.setHost("zjj101");
  14. /*设置登陆用户名和密码*/
  15. factory.setUsername("guest");
  16. factory.setPassword("guest");
  17. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  18. factory.setVirtualHost("/");
  19. // 打开连接和创建频道,与发送端一样
  20. Connection connection = factory.newConnection();
  21. final Channel channel = connection.createChannel();
  22. channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
  23. BuiltinExchangeType.TOPIC);
  24. // 声明一个随机队列
  25. String queueName = channel.queueDeclare().getQueue();
  26. //前面怎么样的无所谓,最后一个是B章节就可以了,
  27. //#匹配无数个
  28. channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "#.B");
  29. System.out.println(" [*] Waiting for messages:");
  30. // 创建队列消费者
  31. final Consumer consumerA = new DefaultConsumer(channel) {
  32. @Override
  33. public void handleDelivery(String consumerTag,
  34. Envelope envelope,
  35. AMQP.BasicProperties properties,
  36. byte[] body)
  37. throws IOException {
  38. String message = new String(body, "UTF-8");
  39. System.out.println(" AllConsumer Received "
  40. + envelope.getRoutingKey()
  41. + "':'" + message + "'");
  42. }
  43. };
  44. channel.basicConsume(queueName, true, consumerA);
  45. }
  46. }

启动完这一个消费者再启动生产者后输出结果:

所有结尾为b的 routingKey都被消费了

  1. [*] Waiting for messages:
  2. AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'
  3. AllConsumer Received king.jvm.B':'Hello Topic_[0,1,1]'
  4. AllConsumer Received king.redis.B':'Hello Topic_[0,2,1]'
  5. AllConsumer Received mark.kafka.B':'Hello Topic_[1,0,1]'
  6. AllConsumer Received mark.jvm.B':'Hello Topic_[1,1,1]'
  7. AllConsumer Received mark.redis.B':'Hello Topic_[1,2,1]'
  8. AllConsumer Received james.kafka.B':'Hello Topic_[2,0,1]'
  9. AllConsumer Received james.jvm.B':'Hello Topic_[2,1,1]'
  10. AllConsumer Received james.redis.B':'Hello Topic_[2,2,1]'

消费者 -RoutingKey写全了

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. /**
  5. * 6、关注king老师kafka的A章节,怎么办?
  6. *类说明:
  7. */
  8. public class King_kafka_AConsumer {
  9. public static void main(String[] argv) throws IOException,
  10. InterruptedException, TimeoutException {
  11. ConnectionFactory factory = new ConnectionFactory();
  12. //连接rabbitMq的地址
  13. factory.setHost("zjj101");
  14. /*设置登陆用户名和密码*/
  15. factory.setUsername("guest");
  16. factory.setPassword("guest");
  17. /*设置虚拟主机地址,不能设置错了,不然会报错误*/
  18. factory.setVirtualHost("/");
  19. // 打开连接和创建频道,与发送端一样
  20. Connection connection = factory.newConnection();
  21. final Channel channel = connection.createChannel();
  22. channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
  23. BuiltinExchangeType.TOPIC);
  24. // 声明一个随机队列
  25. String queueName = channel.queueDeclare().getQueue();
  26. channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME, "king.kafka.A");
  27. System.out.println(" [*] Waiting for messages:");
  28. // 创建队列消费者
  29. final Consumer consumerA = new DefaultConsumer(channel) {
  30. @Override
  31. public void handleDelivery(String consumerTag,
  32. Envelope envelope,
  33. AMQP.BasicProperties properties,
  34. byte[] body)
  35. throws IOException {
  36. String message = new String(body, "UTF-8");
  37. System.out.println(" AllConsumer Received "
  38. + envelope.getRoutingKey()
  39. + "':'" + message + "'");
  40. }
  41. };
  42. channel.basicConsume(queueName, true, consumerA);
  43. }
  44. }

启动完这一个消费者再启动生产者后输出结果:

只有routingKey为king.kafka.A的 routingKey都被消费了

  1. [*] Waiting for messages:
  2. AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'