生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 类说明:Topic类型的生产者
* 假设有交换器 topic_course,
* 讲课老师有king,mark,james,
* 技术专题有kafka,jvm,redis,
* 课程章节有 A、B、C,
* 路由键的规则为 讲课老师+“.”+技术专题+“.”+课程章节,如:king.kafka.A。
* 生产者--生产全部的消息3*3*3=27条消息
*/
public class TopicProducer {
public final static String EXCHANGE_NAME = "topic_course";
public static void main(String[] args)
throws IOException, TimeoutException {
/**
* 创建连接连接到RabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
//连接rabbitMq的地址
factory.setHost("zjj101");
/*设置登陆用户名和密码*/
factory.setUsername("guest");
factory.setPassword("guest");
/*设置虚拟主机地址,不能设置错了,不然会报错误*/
factory.setVirtualHost("/");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个信道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
/*我们的课程,路由键最终格式类似于:king.kafka.A king.kafka.B*/
String[] techers = {"king", "mark", "james"};
for (int i = 0; i < 3; i++) {
String[] modules = {"kafka", "jvm", "redis"};
for (int j = 0; j < 3; j++) {
String[] servers = {"A", "B", "C"};
for (int k = 0; k < 3; k++) {
// 发送的消息
String message = "Hello Topic_[" + i + "," + j + "," + k + "]";
String routeKey = techers[i % 3] + "." + modules[j % 3]
+ "." + servers[k % 3];
channel.basicPublish(EXCHANGE_NAME, routeKey,
null, message.getBytes());
System.out.println(" [x] Sent '" + routeKey + ":'"
+ message + "'");
}
}
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
输出:
[x] Sent 'king.kafka.A:'Hello Topic_[0,0,0]'
[x] Sent 'king.kafka.B:'Hello Topic_[0,0,1]'
[x] Sent 'king.kafka.C:'Hello Topic_[0,0,2]'
[x] Sent 'king.jvm.A:'Hello Topic_[0,1,0]'
[x] Sent 'king.jvm.B:'Hello Topic_[0,1,1]'
[x] Sent 'king.jvm.C:'Hello Topic_[0,1,2]'
[x] Sent 'king.redis.A:'Hello Topic_[0,2,0]'
[x] Sent 'king.redis.B:'Hello Topic_[0,2,1]'
[x] Sent 'king.redis.C:'Hello Topic_[0,2,2]'
[x] Sent 'mark.kafka.A:'Hello Topic_[1,0,0]'
[x] Sent 'mark.kafka.B:'Hello Topic_[1,0,1]'
[x] Sent 'mark.kafka.C:'Hello Topic_[1,0,2]'
[x] Sent 'mark.jvm.A:'Hello Topic_[1,1,0]'
[x] Sent 'mark.jvm.B:'Hello Topic_[1,1,1]'
[x] Sent 'mark.jvm.C:'Hello Topic_[1,1,2]'
[x] Sent 'mark.redis.A:'Hello Topic_[1,2,0]'
[x] Sent 'mark.redis.B:'Hello Topic_[1,2,1]'
[x] Sent 'mark.redis.C:'Hello Topic_[1,2,2]'
[x] Sent 'james.kafka.A:'Hello Topic_[2,0,0]'
[x] Sent 'james.kafka.B:'Hello Topic_[2,0,1]'
[x] Sent 'james.kafka.C:'Hello Topic_[2,0,2]'
[x] Sent 'james.jvm.A:'Hello Topic_[2,1,0]'
[x] Sent 'james.jvm.B:'Hello Topic_[2,1,1]'
[x] Sent 'james.jvm.C:'Hello Topic_[2,1,2]'
[x] Sent 'james.redis.A:'Hello Topic_[2,2,0]'
[x] Sent 'james.redis.B:'Hello Topic_[2,2,1]'
[x] Sent 'james.redis.C:'Hello Topic_[2,2,2]'
消费者- 消费所有的消息
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 1、要关注所有的课程,怎么做? #
* # 可以匹配多个
* 类说明:所有的课程内容
*/
public class AllConsumer {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//连接rabbitMq的地址
factory.setHost("zjj101");
/*设置登陆用户名和密码*/
factory.setUsername("guest");
factory.setPassword("guest");
/*设置虚拟主机地址,不能设置错了,不然会报错误*/
factory.setVirtualHost("/");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
BuiltinExchangeType.TOPIC);
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//!!!!!!!!!消费了所有的消息 #
channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "#");
System.out.println(" [*] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" AllConsumer Received "
+ envelope.getRoutingKey()
+ "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
启动完了消费者后,启动生产者, 输出日志结果:
所有的消息都被消费了.
[*] Waiting for messages:
AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'
AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'
AllConsumer Received king.kafka.C':'Hello Topic_[0,0,2]'
AllConsumer Received king.jvm.A':'Hello Topic_[0,1,0]'
AllConsumer Received king.jvm.B':'Hello Topic_[0,1,1]'
AllConsumer Received king.jvm.C':'Hello Topic_[0,1,2]'
AllConsumer Received king.redis.A':'Hello Topic_[0,2,0]'
AllConsumer Received king.redis.B':'Hello Topic_[0,2,1]'
AllConsumer Received king.redis.C':'Hello Topic_[0,2,2]'
AllConsumer Received mark.kafka.A':'Hello Topic_[1,0,0]'
AllConsumer Received mark.kafka.B':'Hello Topic_[1,0,1]'
AllConsumer Received mark.kafka.C':'Hello Topic_[1,0,2]'
AllConsumer Received mark.jvm.A':'Hello Topic_[1,1,0]'
AllConsumer Received mark.jvm.B':'Hello Topic_[1,1,1]'
AllConsumer Received mark.jvm.C':'Hello Topic_[1,1,2]'
AllConsumer Received mark.redis.A':'Hello Topic_[1,2,0]'
AllConsumer Received mark.redis.B':'Hello Topic_[1,2,1]'
AllConsumer Received mark.redis.C':'Hello Topic_[1,2,2]'
AllConsumer Received james.kafka.A':'Hello Topic_[2,0,0]'
AllConsumer Received james.kafka.B':'Hello Topic_[2,0,1]'
AllConsumer Received james.kafka.C':'Hello Topic_[2,0,2]'
AllConsumer Received james.jvm.A':'Hello Topic_[2,1,0]'
AllConsumer Received james.jvm.B':'Hello Topic_[2,1,1]'
AllConsumer Received james.jvm.C':'Hello Topic_[2,1,2]'
AllConsumer Received james.redis.A':'Hello Topic_[2,2,0]'
AllConsumer Received james.redis.B':'Hello Topic_[2,2,1]'
AllConsumer Received james.redis.C':'Hello Topic_[2,2,2]'
消费者-指定前缀打头
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
2、关注king老师的所有课程,怎么办?
*类说明:
*/
public class King_AllConsumer {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//连接rabbitMq的地址
factory.setHost("zjj101");
/*设置登陆用户名和密码*/
factory.setUsername("guest");
factory.setPassword("guest");
/*设置虚拟主机地址,不能设置错了,不然会报错误*/
factory.setVirtualHost("/");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
BuiltinExchangeType.TOPIC);
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//TODO
channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME, "king.#");
System.out.println(" [*] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" AllConsumer Received "
+ envelope.getRoutingKey()
+ "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
启动完这一个消费者再启动生产者后输出结果:
king打头的routingKey都被消费了
[*] Waiting for messages:
AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'
AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'
AllConsumer Received king.kafka.C':'Hello Topic_[0,0,2]'
AllConsumer Received king.jvm.A':'Hello Topic_[0,1,0]'
AllConsumer Received king.jvm.B':'Hello Topic_[0,1,1]'
AllConsumer Received king.jvm.C':'Hello Topic_[0,1,2]'
AllConsumer Received king.redis.A':'Hello Topic_[0,2,0]'
AllConsumer Received king.redis.B':'Hello Topic_[0,2,1]'
AllConsumer Received king.redis.C':'Hello Topic_[0,2,2]'
消费者-指定前缀和结尾
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
3、关注king老师所有的A章节,怎么办?
*类说明:
*/
public class King_All_AConsumer {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//连接rabbitMq的地址
factory.setHost("zjj101");
/*设置登陆用户名和密码*/
factory.setUsername("guest");
factory.setPassword("guest");
/*设置虚拟主机地址,不能设置错了,不然会报错误*/
factory.setVirtualHost("/");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
BuiltinExchangeType.TOPIC);
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//!!!!!!!!!!!!配置匹配
channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME, "king.*.A");
System.out.println(" [*] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" AllConsumer Received "
+ envelope.getRoutingKey()
+ "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
启动完这一个消费者再启动生产者后输出结果:
king打头的并且 A结尾的 routingKey都被消费了
[*] Waiting for messages:
AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'
AllConsumer Received king.jvm.A':'Hello Topic_[0,1,0]'
AllConsumer Received king.redis.A':'Hello Topic_[0,2,0]'
消费者-指定中间内容
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
4、关注kafka所有的课程,怎么办?
*类说明:
*/
public class Kafka_AllConsumer {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//连接rabbitMq的地址
factory.setHost("zjj101");
/*设置登陆用户名和密码*/
factory.setUsername("guest");
factory.setPassword("guest");
/*设置虚拟主机地址,不能设置错了,不然会报错误*/
factory.setVirtualHost("/");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
BuiltinExchangeType.TOPIC);
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME,
"#.kafka.#");
System.out.println(" [*] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" AllConsumer Received "
+ envelope.getRoutingKey()
+ "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
启动完这一个消费者再启动生产者后输出结果:
中间为Kafka的的 routingKey都被消费了
[*] Waiting for messages:
AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'
AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'
AllConsumer Received king.kafka.C':'Hello Topic_[0,0,2]'
AllConsumer Received mark.kafka.A':'Hello Topic_[1,0,0]'
AllConsumer Received mark.kafka.B':'Hello Topic_[1,0,1]'
AllConsumer Received mark.kafka.C':'Hello Topic_[1,0,2]'
AllConsumer Received james.kafka.A':'Hello Topic_[2,0,0]'
AllConsumer Received james.kafka.B':'Hello Topic_[2,0,1]'
AllConsumer Received james.kafka.C':'Hello Topic_[2,0,2]'
消费者-指定结尾
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 5、关注所有的B章节,怎么办?
* 类说明:
*/
public class All_BConsumer {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//连接rabbitMq的地址
factory.setHost("zjj101");
/*设置登陆用户名和密码*/
factory.setUsername("guest");
factory.setPassword("guest");
/*设置虚拟主机地址,不能设置错了,不然会报错误*/
factory.setVirtualHost("/");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
BuiltinExchangeType.TOPIC);
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//前面怎么样的无所谓,最后一个是B章节就可以了,
//#匹配无数个
channel.queueBind(queueName, TopicProducer.EXCHANGE_NAME, "#.B");
System.out.println(" [*] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" AllConsumer Received "
+ envelope.getRoutingKey()
+ "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
启动完这一个消费者再启动生产者后输出结果:
所有结尾为b的 routingKey都被消费了
[*] Waiting for messages:
AllConsumer Received king.kafka.B':'Hello Topic_[0,0,1]'
AllConsumer Received king.jvm.B':'Hello Topic_[0,1,1]'
AllConsumer Received king.redis.B':'Hello Topic_[0,2,1]'
AllConsumer Received mark.kafka.B':'Hello Topic_[1,0,1]'
AllConsumer Received mark.jvm.B':'Hello Topic_[1,1,1]'
AllConsumer Received mark.redis.B':'Hello Topic_[1,2,1]'
AllConsumer Received james.kafka.B':'Hello Topic_[2,0,1]'
AllConsumer Received james.jvm.B':'Hello Topic_[2,1,1]'
AllConsumer Received james.redis.B':'Hello Topic_[2,2,1]'
消费者 -RoutingKey写全了
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 6、关注king老师kafka的A章节,怎么办?
*类说明:
*/
public class King_kafka_AConsumer {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//连接rabbitMq的地址
factory.setHost("zjj101");
/*设置登陆用户名和密码*/
factory.setUsername("guest");
factory.setPassword("guest");
/*设置虚拟主机地址,不能设置错了,不然会报错误*/
factory.setVirtualHost("/");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(TopicProducer.EXCHANGE_NAME,
BuiltinExchangeType.TOPIC);
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,TopicProducer.EXCHANGE_NAME, "king.kafka.A");
System.out.println(" [*] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" AllConsumer Received "
+ envelope.getRoutingKey()
+ "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
启动完这一个消费者再启动生产者后输出结果:
只有routingKey为king.kafka.A的 routingKey都被消费了
[*] Waiting for messages:
AllConsumer Received king.kafka.A':'Hello Topic_[0,0,0]'