简述
ZeroMQ(ØMQ)也叫ZMQ,是一个github开源项目,他为可伸缩的分布式或并发应用程序设计的高性能异步消息库。它提供一个消息队列,但是与面向消息的中间件不同,ZeroMQ的运行不需要专门的消息代理(message broker)。该库设计成常见的套接字(Socket)风格的API。
消息模型
请求响应模式(Request-Reply)
将一组客户端连接到一组服务器。这是一种远程过程调用和任务分发模式。
public class ReqZeroMQ {
public static void main(String[] args) {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to server
System.out.println("Connecting to hello world server…");
ZMQ.Socket requester = context.socket(SocketType.REQ);
requester.connect("tcp://localhost:5555");
for (int requestNbr = 0; requestNbr != 100; requestNbr++) {
String request = "Hello";
System.out.println("Sending Hello " + requestNbr);
requester.send(request.getBytes(), 0);
byte[] reply = requester.recv(0);
System.out.println("Received " + new String(reply) + " " + requestNbr);
}
requester.close();
context.term();
}
}
public class RepZeroMQ {
public static void main(String[] args) {
new Thread(() -> {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to clients
ZMQ.Socket responder = context.socket(SocketType.REP);
responder.bind("tcp://*:5555");
while (!Thread.currentThread().isInterrupted()) {
// Wait for next request from the client
byte[] request = responder.recv(0);
System.out.println("Received Hello");
// Do some 'work'
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// Send reply back to client
String reply = "World";
responder.send(reply.getBytes(), 0);
}
responder.close();
context.term();
}).start();
}
}
发布/订阅模式(Publish-Subscribe)
将一组发布者连接到一组订阅者。这是一种数据分发模式。
public class PubZeroMQ {
public static void main(String[] args) {
// Prepare our context and publisher
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket publisher = context.socket(SocketType.PUB);
publisher.bind("tcp://*:5556");
// Initialize random number generator
Random srandom = new Random(System.currentTimeMillis());
while (!Thread.currentThread().isInterrupted()) {
// Get values that will fool the boss
int zipcode, temperature, relhumidity;
zipcode = 10000 + srandom.nextInt(10000);
temperature = srandom.nextInt(215) - 80 + 1;
relhumidity = srandom.nextInt(50) + 10 + 1;
// Send message to all subscribers
String update = String.format("%05d %d %d", zipcode, temperature, relhumidity);
publisher.send(update, 0);
}
publisher.close();
context.term();
}
}
public class SubZeroMQ {
public static void main(String[] args) {
new Thread(() -> {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to server
System.out.println("Collecting updates from weather server");
ZMQ.Socket subscriber = context.socket(SocketType.SUB);
subscriber.connect("tcp://localhost:5556");
// Subscribe to zipcode, default is NYC, 10001
String filter = (args.length > 0) ? args[0] : "10001 ";
subscriber.subscribe(filter.getBytes());
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
// Use trim to remove the tailing '0' character
String string = subscriber.recvStr(0).trim();
StringTokenizer sscanf = new StringTokenizer(string, " ");
int temperature = Integer.valueOf(sscanf.nextToken());
total_temp += temperature;
}
System.out.println("Average temperature for zipcode '"
+ filter + "' was " + (int) (total_temp / update_nbr));
System.out.println("...");
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
subscriber.close();
context.term();
}).start();
new Thread(() -> {
ZMQ.Context context = ZMQ.context(1);
// Socket to talk to server
System.out.println("Collecting updates from weather server");
ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
subscriber.connect("tcp://localhost:5556");
// Subscribe to zipcode, default is NYC, 10001
String filter = (args.length > 0) ? args[0] : "10002";
subscriber.subscribe(filter.getBytes());
// Process 100 updates
int update_nbr;
long total_temp = 0;
for (update_nbr = 0; update_nbr < 100; update_nbr++) {
// Use trim to remove the tailing '0' character
String string = subscriber.recvStr(0).trim();
StringTokenizer sscanf = new StringTokenizer(string, " ");
int temperature = Integer.valueOf(sscanf.nextToken());
total_temp += temperature;
}
System.out.println("Average temperature for zipcode '"
+ filter + "' was " + (int) (total_temp / update_nbr));
System.out.println("...");
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
subscriber.close();
context.term();
}).start();
}
}
管道模式(Push-Pull)
以Push/Pull模式连接节点,可以有多个步骤,可以有循环。这是一种并行的任务分发和收集模式。
public class PushZeroMQ {
public static void main(String[] args) {
new Thread(() -> {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(SocketType.PUSH);
socket.connect("tcp://127.0.0.1:5555");
socket.setSendTimeOut(1000);
int i = 0;
while (true) {
byte[] data = String.valueOf(i++).getBytes();
boolean send = socket.send(data);
System.out.println(socket.getTCPKeepAlive() + " send data " + data.length + " result " + send);
}
}).start();
}
}
public class PullZeroMQ {
public static void main(String[] args) {
new Thread(() -> {
ZMQ.Context context = ZMQ.context(1);
ZMQ.Socket socket = context.socket(SocketType.PULL);
socket.bind("tcp://127.0.0.1:5555");
socket.setReceiveTimeOut(1000);
while (true) {
String recv = socket.recvStr();
System.out.println(socket.getTCPKeepAlive() + " recv data " + "result " + recv);
}
}).start();
}
}
排他对模式
在一个排他对中连接两个套接字(Socket)。(这是一种高级的为某种用例而设计的低级别模式)
参考文献
消息队列库——ZeroMQ
RabbitMq、ActiveMq、ZeroMq、kafka之间的比较,资料汇总
ØMQ维基百科