1、源码地址:
https://github.com/nats-io/nats.java.git
2、如何使用?
在该项目的readme文档中,已经很好的进行了说明,参照例子可直接简单使用java去使用nats消息中间件.
在使用之前,你可能需要知道如何去搭建nats服务.可参照下面:
https://github.com/nats-io/nats-server
3、nats.java项目的整体介绍
[1]examples
这个目录中是常用的功能例子,你想实现的功能在这里应该都能找到.
[2]main
这个目录中是源码,包含接口和实现.想要更好的理解nats.java,深入阅读源码,理解原理.
[3]test
这个目录中是针对main中源码的测试代码,重要性与examples在同一等级.
4、使用示例:
[1]连接到nats服务
[1.1]连接本地Connection nc = Nats.connect();
[1.2]指定IP+PortConnection nc = Nats.connect("nats://myhost:4222");
[1.3]指定多nats服务Options o = new Options.Builder().server("nats://serverone:4222").server("nats://servertwo:4222").maxReconnects(-1).build();
Connection nc = Nats.connect(o);
[1.4]异步连接Options options = new Options.Builder().server(Options.DEFAULT_URL).connectionListener(handler).build();
Nats.connectAsynchronously(options, true);
[2]发布消息到nats
[2.1]发送消息:设定主题nc.publish("subject", "hello world".getBytes(StandardCharsets.UTF_8));
[2.2]发送消息:设定发布主题和回复主题(用于在消息被收件人消费后回复使用)nc.publish("subject", "replyto", "hello world".getBytes(StandardCharsets.UTF_8));
[2.3]这种方式消息的需要等待回复.由nats内部管理一个发布和订阅.Future incoming = nc.request("subject", "hello world".getBytes(StandardCharsets.UTF_8));
Message msg = incoming.get(500, TimeUnit.MILLISECONDS);
String response = new String(msg.getData(), StandardCharsets.UTF_8);
[3]订阅nats消息
[3.1]订阅消息:按照主题,可设置订阅超时时间Subscription sub = nc.subscribe("subject");
Message msg = sub.nextMessage(Duration.ofMillis(500));
String response = new String(msg.getData(), StandardCharsets.UTF_8);
[3.2]订阅消息:使用调度器,持续订阅消息Dispatcher d = nc.createDispatcher((msg) -> {
String response = new String(msg.getData(), StandardCharsets.UTF_8);
...
});
d.subscribe("subject");
[3.3]订阅消息:使用调度器,可设置业务场景,订阅消息到达一定量,取消订阅.Dispatcher d = nc.createDispatcher((msg) -> {});
Subscription s = d.subscribe("some.subject", (msg) -> {
String response = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println("Message received (up to 100 times): " + response);
});
d.unsubscribe(s, 100);