简述

ZeroMQ(ØMQ)也叫ZMQ,是一个github开源项目,他为可伸缩的分布式或并发应用程序设计的高性能异步消息库。它提供一个消息队列,但是与面向消息的中间件不同,ZeroMQ的运行不需要专门的消息代理(message broker)。该库设计成常见的套接字(Socket)风格的API。

  • 优点
    • 高性能
    • 开源
    • 简洁
    • 跨语言
    • 提供多种消息传输协议:TCPPGM(可靠的多播),进程间通信(IPC) 以及线程间通讯(ITC)
  • 缺点
    • 不具备消息持久化

image.png
image.png

消息模型

请求响应模式(Request-Reply)

将一组客户端连接到一组服务器。这是一种远程过程调用和任务分发模式。

  1. public class ReqZeroMQ {
  2. public static void main(String[] args) {
  3. ZMQ.Context context = ZMQ.context(1);
  4. // Socket to talk to server
  5. System.out.println("Connecting to hello world server…");
  6. ZMQ.Socket requester = context.socket(SocketType.REQ);
  7. requester.connect("tcp://localhost:5555");
  8. for (int requestNbr = 0; requestNbr != 100; requestNbr++) {
  9. String request = "Hello";
  10. System.out.println("Sending Hello " + requestNbr);
  11. requester.send(request.getBytes(), 0);
  12. byte[] reply = requester.recv(0);
  13. System.out.println("Received " + new String(reply) + " " + requestNbr);
  14. }
  15. requester.close();
  16. context.term();
  17. }
  18. }
  1. public class RepZeroMQ {
  2. public static void main(String[] args) {
  3. new Thread(() -> {
  4. ZMQ.Context context = ZMQ.context(1);
  5. // Socket to talk to clients
  6. ZMQ.Socket responder = context.socket(SocketType.REP);
  7. responder.bind("tcp://*:5555");
  8. while (!Thread.currentThread().isInterrupted()) {
  9. // Wait for next request from the client
  10. byte[] request = responder.recv(0);
  11. System.out.println("Received Hello");
  12. // Do some 'work'
  13. try {
  14. Thread.sleep(1000);
  15. } catch (InterruptedException e) {
  16. e.printStackTrace();
  17. }
  18. // Send reply back to client
  19. String reply = "World";
  20. responder.send(reply.getBytes(), 0);
  21. }
  22. responder.close();
  23. context.term();
  24. }).start();
  25. }
  26. }

发布/订阅模式(Publish-Subscribe)

将一组发布者连接到一组订阅者。这是一种数据分发模式。

  1. public class PubZeroMQ {
  2. public static void main(String[] args) {
  3. // Prepare our context and publisher
  4. ZMQ.Context context = ZMQ.context(1);
  5. ZMQ.Socket publisher = context.socket(SocketType.PUB);
  6. publisher.bind("tcp://*:5556");
  7. // Initialize random number generator
  8. Random srandom = new Random(System.currentTimeMillis());
  9. while (!Thread.currentThread().isInterrupted()) {
  10. // Get values that will fool the boss
  11. int zipcode, temperature, relhumidity;
  12. zipcode = 10000 + srandom.nextInt(10000);
  13. temperature = srandom.nextInt(215) - 80 + 1;
  14. relhumidity = srandom.nextInt(50) + 10 + 1;
  15. // Send message to all subscribers
  16. String update = String.format("%05d %d %d", zipcode, temperature, relhumidity);
  17. publisher.send(update, 0);
  18. }
  19. publisher.close();
  20. context.term();
  21. }
  22. }
  1. public class SubZeroMQ {
  2. public static void main(String[] args) {
  3. new Thread(() -> {
  4. ZMQ.Context context = ZMQ.context(1);
  5. // Socket to talk to server
  6. System.out.println("Collecting updates from weather server");
  7. ZMQ.Socket subscriber = context.socket(SocketType.SUB);
  8. subscriber.connect("tcp://localhost:5556");
  9. // Subscribe to zipcode, default is NYC, 10001
  10. String filter = (args.length > 0) ? args[0] : "10001 ";
  11. subscriber.subscribe(filter.getBytes());
  12. // Process 100 updates
  13. int update_nbr;
  14. long total_temp = 0;
  15. for (update_nbr = 0; update_nbr < 100; update_nbr++) {
  16. // Use trim to remove the tailing '0' character
  17. String string = subscriber.recvStr(0).trim();
  18. StringTokenizer sscanf = new StringTokenizer(string, " ");
  19. int temperature = Integer.valueOf(sscanf.nextToken());
  20. total_temp += temperature;
  21. }
  22. System.out.println("Average temperature for zipcode '"
  23. + filter + "' was " + (int) (total_temp / update_nbr));
  24. System.out.println("...");
  25. try {
  26. System.in.read();
  27. } catch (IOException e) {
  28. e.printStackTrace();
  29. }
  30. subscriber.close();
  31. context.term();
  32. }).start();
  33. new Thread(() -> {
  34. ZMQ.Context context = ZMQ.context(1);
  35. // Socket to talk to server
  36. System.out.println("Collecting updates from weather server");
  37. ZMQ.Socket subscriber = context.socket(ZMQ.SUB);
  38. subscriber.connect("tcp://localhost:5556");
  39. // Subscribe to zipcode, default is NYC, 10001
  40. String filter = (args.length > 0) ? args[0] : "10002";
  41. subscriber.subscribe(filter.getBytes());
  42. // Process 100 updates
  43. int update_nbr;
  44. long total_temp = 0;
  45. for (update_nbr = 0; update_nbr < 100; update_nbr++) {
  46. // Use trim to remove the tailing '0' character
  47. String string = subscriber.recvStr(0).trim();
  48. StringTokenizer sscanf = new StringTokenizer(string, " ");
  49. int temperature = Integer.valueOf(sscanf.nextToken());
  50. total_temp += temperature;
  51. }
  52. System.out.println("Average temperature for zipcode '"
  53. + filter + "' was " + (int) (total_temp / update_nbr));
  54. System.out.println("...");
  55. try {
  56. System.in.read();
  57. } catch (IOException e) {
  58. e.printStackTrace();
  59. }
  60. subscriber.close();
  61. context.term();
  62. }).start();
  63. }
  64. }

管道模式(Push-Pull)

以Push/Pull模式连接节点,可以有多个步骤,可以有循环。这是一种并行的任务分发和收集模式。

  1. public class PushZeroMQ {
  2. public static void main(String[] args) {
  3. new Thread(() -> {
  4. ZMQ.Context context = ZMQ.context(1);
  5. ZMQ.Socket socket = context.socket(SocketType.PUSH);
  6. socket.connect("tcp://127.0.0.1:5555");
  7. socket.setSendTimeOut(1000);
  8. int i = 0;
  9. while (true) {
  10. byte[] data = String.valueOf(i++).getBytes();
  11. boolean send = socket.send(data);
  12. System.out.println(socket.getTCPKeepAlive() + " send data " + data.length + " result " + send);
  13. }
  14. }).start();
  15. }
  16. }
  1. public class PullZeroMQ {
  2. public static void main(String[] args) {
  3. new Thread(() -> {
  4. ZMQ.Context context = ZMQ.context(1);
  5. ZMQ.Socket socket = context.socket(SocketType.PULL);
  6. socket.bind("tcp://127.0.0.1:5555");
  7. socket.setReceiveTimeOut(1000);
  8. while (true) {
  9. String recv = socket.recvStr();
  10. System.out.println(socket.getTCPKeepAlive() + " recv data " + "result " + recv);
  11. }
  12. }).start();
  13. }
  14. }

排他对模式

在一个排他对中连接两个套接字(Socket)。(这是一种高级的为某种用例而设计的低级别模式)

参考文献

消息队列库——ZeroMQ
RabbitMq、ActiveMq、ZeroMq、kafka之间的比较,资料汇总
ØMQ维基百科