1、源码地址:

https://github.com/nats-io/nats.java.git

2、如何使用?

在该项目的readme文档中,已经很好的进行了说明,参照例子可直接简单使用java去使用nats消息中间件.
在使用之前,你可能需要知道如何去搭建nats服务.可参照下面:
https://github.com/nats-io/nats-server

3、nats.java项目的整体介绍

[1]examples
这个目录中是常用的功能例子,你想实现的功能在这里应该都能找到.
[2]main
这个目录中是源码,包含接口和实现.想要更好的理解nats.java,深入阅读源码,理解原理.
[3]test
这个目录中是针对main中源码的测试代码,重要性与examples在同一等级.

Java Nats Client.png

4、使用示例:

[1]连接到nats服务

[1.1]连接本地
Connection nc = Nats.connect();

[1.2]指定IP+Port
Connection nc = Nats.connect("nats://myhost:4222");

[1.3]指定多nats服务
Options o = new Options.Builder().server("nats://serverone:4222").server("nats://servertwo:4222").maxReconnects(-1).build();
Connection nc = Nats.connect(o);

[1.4]异步连接
Options options = new Options.Builder().server(Options.DEFAULT_URL).connectionListener(handler).build();
Nats.connectAsynchronously(options, true);

[2]发布消息到nats

[2.1]发送消息:设定主题
nc.publish("subject", "hello world".getBytes(StandardCharsets.UTF_8));

[2.2]发送消息:设定发布主题和回复主题(用于在消息被收件人消费后回复使用)
nc.publish("subject", "replyto", "hello world".getBytes(StandardCharsets.UTF_8));

[2.3]这种方式消息的需要等待回复.由nats内部管理一个发布和订阅.
Future incoming = nc.request("subject", "hello world".getBytes(StandardCharsets.UTF_8));
Message msg = incoming.get(500, TimeUnit.MILLISECONDS);
String response = new String(msg.getData(), StandardCharsets.UTF_8);

[3]订阅nats消息

[3.1]订阅消息:按照主题,可设置订阅超时时间
Subscription sub = nc.subscribe("subject");
Message msg = sub.nextMessage(Duration.ofMillis(500));
String response = new String(msg.getData(), StandardCharsets.UTF_8);

[3.2]订阅消息:使用调度器,持续订阅消息
Dispatcher d = nc.createDispatcher((msg) -> {
String response = new String(msg.getData(), StandardCharsets.UTF_8);
...
});
d.subscribe("subject");

[3.3]订阅消息:使用调度器,可设置业务场景,订阅消息到达一定量,取消订阅.
Dispatcher d = nc.createDispatcher((msg) -> {});
Subscription s = d.subscribe("some.subject", (msg) -> {
String response = new String(msg.getData(), StandardCharsets.UTF_8);
System.out.println("Message received (up to 100 times): " + response);
});
d.unsubscribe(s, 100);