生产者
import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 类说明:Topic类型的生产者* 假设有交换器 topic_course,* 讲课老师有king,mark,james,* 技术专题有kafka,jvm,redis,* 课程章节有 A、B、C,* 路由键的规则为 讲课老师+“.”+技术专题+“.”+课程章节,如:king.kafka.A。* 生产者--生产全部的消息3*3*3=27条消息*/public class TopicProducer {public final static String EXCHANGE_NAME = "topic_course";public static void main(String[] args)throws IOException, TimeoutException {/*** 创建连接连接到RabbitMQ*/ConnectionFactory factory = new ConnectionFactory();//连接rabbitMq的地址factory.setHost("zjj101");/*设置登陆用户名和密码*/factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 创建一个连接Connection connection = factory.newConnection();// 创建一个信道Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);/*我们的课程,路由键最终格式类似于:king.kafka.A king.kafka.B*/String[] techers = {"king", "mark", "james"};for (int i = 0; i < 3; i++) {String[] modules = {"kafka", "jvm", "redis"};for (int j = 0; j < 3; j++) {String[] servers = {"A", "B", "C"};for (int k = 0; k < 3; k++) {// 发送的消息String message = "Hello Topic_[" + i + "," + j + "," + k + "]";String routeKey = techers[i % 3] + "." + modules[j % 3]+ "." + servers[k % 3];channel.basicPublish(EXCHANGE_NAME, routeKey,null, message.getBytes());System.out.println(" [x] Sent '" + routeKey + ":'"+ message + "'");}}}// 关闭频道和连接channel.close();connection.close();}}
输出:
[x] Sent 'king.kafka.A:'Hello Topic_[0,0,0]'[x] Sent 'king.kafka.B:'Hello Topic_[0,0,1]'[x] Sent 'king.kafka.C:'Hello Topic_[0,0,2]'[x] Sent 'king.jvm.A:'Hello Topic_[0,1,0]'[x] Sent 'king.jvm.B:'Hello Topic_[0,1,1]'[x] Sent 'king.jvm.C:'Hello Topic_[0,1,2]'[x] Sent 'king.redis.A:'Hello Topic_[0,2,0]'[x] Sent 'king.redis.B:'Hello Topic_[0,2,1]'[x] Sent 'king.redis.C:'Hello Topic_[0,2,2]'[x] Sent 'mark.kafka.A:'Hello Topic_[1,0,0]'[x] Sent 'mark.kafka.B:'Hello Topic_[1,0,1]'[x] Sent 'mark.kafka.C:'Hello Topic_[1,0,2]'[x] Sent 'mark.jvm.A:'Hello Topic_[1,1,0]'[x] Sent 'mark.jvm.B:'Hello Topic_[1,1,1]'[x] Sent 'mark.jvm.C:'Hello Topic_[1,1,2]'[x] Sent 'mark.redis.A:'Hello Topic_[1,2,0]'[x] Sent 'mark.redis.B:'Hello Topic_[1,2,1]'[x] Sent 'mark.redis.C:'Hello Topic_[1,2,2]'[x] Sent 'james.kafka.A:'Hello Topic_[2,0,0]'[x] Sent 'james.kafka.B:'Hello Topic_[2,0,1]'[x] Sent 'james.kafka.C:'Hello Topic_[2,0,2]'[x] Sent 'james.jvm.A:'Hello Topic_[2,1,0]'[x] Sent 'james.jvm.B:'Hello Topic_[2,1,1]'[x] Sent 'james.jvm.C:'Hello Topic_[2,1,2]'[x] Sent 'james.redis.A:'Hello Topic_[2,2,0]'[x] Sent 'james.redis.B:'Hello Topic_[2,2,1]'[x] Sent 'james.redis.C:'Hello Topic_[2,2,2]'
消费者- 消费所有的消息
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 1、要关注所有的课程,怎么做? #* # 可以匹配多个* 类说明:所有的课程内容*/public class AllConsumer {public static void main(String[] argv) throws IOException,InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//连接rabbitMq的地址factory.setHost("zjj101");/*设置登陆用户名和密码*/factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,BuiltinExchangeType.TOPIC);// 声明一个随机队列String queueName = channel.queueDeclare().getQueue();//!!!!!!!!!消费了所有的消息 #channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "#");System.out.println(" [*] Waiting for messages:");// 创建队列消费者final Consumer consumerA = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" AllConsumer Received "+ envelope.getRoutingKey()+ "':'" + message + "'");}};channel.basicConsume(queueName, true, consumerA);}}
启动完了消费者后,启动生产者, 输出日志结果:
所有的消息都被消费了.
[*] Waiting for messages:AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'AllConsumer Received king.kafka.C':'Hello Topic_[0,0,2]'AllConsumer Received king.jvm.A':'Hello Topic_[0,1,0]'AllConsumer Received king.jvm.B':'Hello Topic_[0,1,1]'AllConsumer Received king.jvm.C':'Hello Topic_[0,1,2]'AllConsumer Received king.redis.A':'Hello Topic_[0,2,0]'AllConsumer Received king.redis.B':'Hello Topic_[0,2,1]'AllConsumer Received king.redis.C':'Hello Topic_[0,2,2]'AllConsumer Received mark.kafka.A':'Hello Topic_[1,0,0]'AllConsumer Received mark.kafka.B':'Hello Topic_[1,0,1]'AllConsumer Received mark.kafka.C':'Hello Topic_[1,0,2]'AllConsumer Received mark.jvm.A':'Hello Topic_[1,1,0]'AllConsumer Received mark.jvm.B':'Hello Topic_[1,1,1]'AllConsumer Received mark.jvm.C':'Hello Topic_[1,1,2]'AllConsumer Received mark.redis.A':'Hello Topic_[1,2,0]'AllConsumer Received mark.redis.B':'Hello Topic_[1,2,1]'AllConsumer Received mark.redis.C':'Hello Topic_[1,2,2]'AllConsumer Received james.kafka.A':'Hello Topic_[2,0,0]'AllConsumer Received james.kafka.B':'Hello Topic_[2,0,1]'AllConsumer Received james.kafka.C':'Hello Topic_[2,0,2]'AllConsumer Received james.jvm.A':'Hello Topic_[2,1,0]'AllConsumer Received james.jvm.B':'Hello Topic_[2,1,1]'AllConsumer Received james.jvm.C':'Hello Topic_[2,1,2]'AllConsumer Received james.redis.A':'Hello Topic_[2,2,0]'AllConsumer Received james.redis.B':'Hello Topic_[2,2,1]'AllConsumer Received james.redis.C':'Hello Topic_[2,2,2]'
消费者-指定前缀打头
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/**2、关注king老师的所有课程,怎么办?*类说明:*/public class King_AllConsumer {public static void main(String[] argv) throws IOException,InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//连接rabbitMq的地址factory.setHost("zjj101");/*设置登陆用户名和密码*/factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,BuiltinExchangeType.TOPIC);// 声明一个随机队列String queueName = channel.queueDeclare().getQueue();//TODOchannel.queueBind(queueName,TopicProducer.EXCHANGE_NAME, "king.#");System.out.println(" [*] Waiting for messages:");// 创建队列消费者final Consumer consumerA = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" AllConsumer Received "+ envelope.getRoutingKey()+ "':'" + message + "'");}};channel.basicConsume(queueName, true, consumerA);}}
启动完这一个消费者再启动生产者后输出结果:
king打头的routingKey都被消费了
[*] Waiting for messages:AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'AllConsumer Received king.kafka.C':'Hello Topic_[0,0,2]'AllConsumer Received king.jvm.A':'Hello Topic_[0,1,0]'AllConsumer Received king.jvm.B':'Hello Topic_[0,1,1]'AllConsumer Received king.jvm.C':'Hello Topic_[0,1,2]'AllConsumer Received king.redis.A':'Hello Topic_[0,2,0]'AllConsumer Received king.redis.B':'Hello Topic_[0,2,1]'AllConsumer Received king.redis.C':'Hello Topic_[0,2,2]'
消费者-指定前缀和结尾
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/**3、关注king老师所有的A章节,怎么办?*类说明:*/public class King_All_AConsumer {public static void main(String[] argv) throws IOException,InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//连接rabbitMq的地址factory.setHost("zjj101");/*设置登陆用户名和密码*/factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,BuiltinExchangeType.TOPIC);// 声明一个随机队列String queueName = channel.queueDeclare().getQueue();//!!!!!!!!!!!!配置匹配channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME, "king.*.A");System.out.println(" [*] Waiting for messages:");// 创建队列消费者final Consumer consumerA = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" AllConsumer Received "+ envelope.getRoutingKey()+ "':'" + message + "'");}};channel.basicConsume(queueName, true, consumerA);}}
启动完这一个消费者再启动生产者后输出结果:
king打头的并且 A结尾的 routingKey都被消费了
[*] Waiting for messages:AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'AllConsumer Received king.jvm.A':'Hello Topic_[0,1,0]'AllConsumer Received king.redis.A':'Hello Topic_[0,2,0]'
消费者-指定中间内容
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/**4、关注kafka所有的课程,怎么办?*类说明:*/public class Kafka_AllConsumer {public static void main(String[] argv) throws IOException,InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//连接rabbitMq的地址factory.setHost("zjj101");/*设置登陆用户名和密码*/factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,BuiltinExchangeType.TOPIC);// 声明一个随机队列String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME,"#.kafka.#");System.out.println(" [*] Waiting for messages:");// 创建队列消费者final Consumer consumerA = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" AllConsumer Received "+ envelope.getRoutingKey()+ "':'" + message + "'");}};channel.basicConsume(queueName, true, consumerA);}}
启动完这一个消费者再启动生产者后输出结果:
中间为Kafka的的 routingKey都被消费了
[*] Waiting for messages:AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'AllConsumer Received king.kafka.C':'Hello Topic_[0,0,2]'AllConsumer Received mark.kafka.A':'Hello Topic_[1,0,0]'AllConsumer Received mark.kafka.B':'Hello Topic_[1,0,1]'AllConsumer Received mark.kafka.C':'Hello Topic_[1,0,2]'AllConsumer Received james.kafka.A':'Hello Topic_[2,0,0]'AllConsumer Received james.kafka.B':'Hello Topic_[2,0,1]'AllConsumer Received james.kafka.C':'Hello Topic_[2,0,2]'
消费者-指定结尾
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 5、关注所有的B章节,怎么办?* 类说明:*/public class All_BConsumer {public static void main(String[] argv) throws IOException,InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//连接rabbitMq的地址factory.setHost("zjj101");/*设置登陆用户名和密码*/factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,BuiltinExchangeType.TOPIC);// 声明一个随机队列String queueName = channel.queueDeclare().getQueue();//前面怎么样的无所谓,最后一个是B章节就可以了,//#匹配无数个channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "#.B");System.out.println(" [*] Waiting for messages:");// 创建队列消费者final Consumer consumerA = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" AllConsumer Received "+ envelope.getRoutingKey()+ "':'" + message + "'");}};channel.basicConsume(queueName, true, consumerA);}}
启动完这一个消费者再启动生产者后输出结果:
所有结尾为b的 routingKey都被消费了
[*] Waiting for messages:AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'AllConsumer Received king.jvm.B':'Hello Topic_[0,1,1]'AllConsumer Received king.redis.B':'Hello Topic_[0,2,1]'AllConsumer Received mark.kafka.B':'Hello Topic_[1,0,1]'AllConsumer Received mark.jvm.B':'Hello Topic_[1,1,1]'AllConsumer Received mark.redis.B':'Hello Topic_[1,2,1]'AllConsumer Received james.kafka.B':'Hello Topic_[2,0,1]'AllConsumer Received james.jvm.B':'Hello Topic_[2,1,1]'AllConsumer Received james.redis.B':'Hello Topic_[2,2,1]'
消费者 -RoutingKey写全了
import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 6、关注king老师kafka的A章节,怎么办?*类说明:*/public class King_kafka_AConsumer {public static void main(String[] argv) throws IOException,InterruptedException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();//连接rabbitMq的地址factory.setHost("zjj101");/*设置登陆用户名和密码*/factory.setUsername("guest");factory.setPassword("guest");/*设置虚拟主机地址,不能设置错了,不然会报错误*/factory.setVirtualHost("/");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,BuiltinExchangeType.TOPIC);// 声明一个随机队列String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME, "king.kafka.A");System.out.println(" [*] Waiting for messages:");// 创建队列消费者final Consumer consumerA = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println(" AllConsumer Received "+ envelope.getRoutingKey()+ "':'" + message + "'");}};channel.basicConsume(queueName, true, consumerA);}}
启动完这一个消费者再启动生产者后输出结果:
只有routingKey为king.kafka.A的 routingKey都被消费了
[*] Waiting for messages:AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'
