1、如何连接到nats服务器
- Nats类作为一个静态类,提供了三种连接方式connect,但最终都到NatsImpl中实现.
- NatsImpl类实现了连接方法,创建对象NatsConnection,调用connect方法,返回一个连接对象.然后我们就可以是使用这个连接对象conn,完成nats消息的发布和订阅.后面阅读NatsConnection和options我们再细说.
- 分析下conn.connect(reconnectOnConnect),此方法无返回值,位于类NatsConnection中.
主要看一下红框位置:
a.buildServerList连接时候需要用到的nats服务地址,可能是一个也可能是列表,这里会用到option这个类,这个主要用于连接时候的参数设置.
b.tryToConnect指定uri进行连接,这是连接的核心.
c.这一部分没有展开,这里主要完成的是,在不是已连接和已关闭的状态情况下,我们要重新连接,前提是设置reconnectOnConnect为true.
- tryToConnect我们重点看一下,我们分几部分来看
a.下图的代码:
调用stop确认reader和writer都是停止状态:防止消息的丢失,停止后,既不能发布也不能订阅.
清空pong queue,
创建dataport,调用connect连接到nats服务,后面有重新启动reader和writer.
补充:dataPort.connect()可以看到,连接的底层使用的socket,数据传输使用stream的方式.
b.创建了一个连接任务connectTask,然后交由connentExecutor去执行.这里面有三个方法调用,先知道大概意思.
readInitialInfo():read initial info message读取初始的信息消息
checkVersionRequirements():检查nats服务版本是否支持
upgradeToSecureIfNeeded():SSL connection是否需要,如果是,dataport进行更新dataPort.upgradeToSecure();
c.这里很简单明了,开启reader和writer,发送连接消息,最后进行初始化ping,确认服务存在.
开启reader和writer的start()
- 连接状态的更新updateStatus
- 状态变化的信号通知statusChanged.signalAll()
2、如何发布主题消息
调用NatsConnection的publish/request方法来完成消息的发布.
- NatsConnection实现了Connection接口
public void publish(String subject, byte[] body); public void publish(String subject, String replyTo, byte[] body); public CompletableFuture
request(String subject, byte[] data); public Message request(String subject, byte[] data, Duration timeout) throws InterruptedException;
以上的发布主题消息的实现,最终都是调用第二个publish方法实现.
发布步骤:
a.校验连接;b.验证参数;c.构建NatsMessage消息传输对象;d.调用queueOutgoing(msg)执行发送.
补充:request的发布消息的方式返回值CompletableFuture
a.使用request方式进行发布消息的初衷就是期待发送后,在订阅者收到消息后收到回复.
b.设置了replay就会使用,没有设置默认设置一个inbox.
- 分析queueOutgoing(msg)
NatsConnectionWriter实现runable接口,重写run().
下面是run方法的部分代码:
a.实际消息的大小与option连接默认的缓冲区大小,每次进行2倍扩容.
b.调用dataPort.write()消息传输
c.记录统计监控
MessageQueue采用LinkedBlockingQueue来存放消息
3、如何订阅主题消息
第一种方式:
通过NatsDispatcher的以下方法
public Dispatcher subscribe(String subject);
public Dispatcher subscribe(String subject, String queue);
public Subscription subscribe(String subject, MessageHandler handler);
public Subscription subscribe(String subject, String queue, MessageHandler handler);
第二种方式:
通过NatsSubscription的以下方法
public Message nextMessage(Duration timeout) throws InterruptedException, IllegalStateException;
- 第二种包含的最简单的订阅消息方式,直接调用pop从读取incoming消息队列中的消息。
- 第一种是采用分发器的方式进行消息获取,通过回调处理订阅获得的message。
a.首先创建分发器,步骤如下
传入handler并创建对象;生成nuid,记录分发器;调用start方法启动。
b.通过分发器进行主题队列的订阅
分析一下订阅方法,我们看到最终实现为subscribeImpl
所以我们来分析一下subscribeImpl
a.根据消息处理函数进行不同判断
handler不存在,只发生订阅对象的创建,然后返回。等待后续操作。
handler存在,将处理函数放入订阅对象的handlers中。我们知道NatsDispatcher实现了Runnable,并且在创建NatsDispatcher已经启动线程。
b.在NatsDispatcher的run方法中有两行重要代码如下
NatsMessage msg = this.incoming.pop(this.waitForMessage);获取消息
currentHandler.onMessage(msg);处理消息的handler
4、options和它的静态类builder
options:在进行nats连接时候,可选择的连接方式,通过参数的形式来确定.创建一个option则是通过builder完成.
- option常用参数
_public static final int _DEFAULT_PORT = 4222;
_public static final _String DEFAULT_URL = “nats://localhost:4222”;
_public static final int _DEFAULT_MAX_RECONNECT = 60;
_public static final _Duration DEFAULT_RECONNECT_WAIT = Duration.ofSeconds(2);
_public static final _Duration DEFAULT_PING_INTERVAL = Duration.ofMinutes(2);
_public static final int _DEFAULT_BUFFER_SIZE = 64 * 1024;
public static final String DEFAULT_THREAD_NAME_PREFIX = “nats”;
public static final String DEFAULT_INBOX_PREFIX = “_INBOX.”;
public static final int MAX_MESSAGES_IN_NETWORK_BUFFER = 1000;
public static final int MAX_MESSAGES_IN_OUTGOING_QUEUE = 5000;
_private final int _maxReconnect;
_private final boolean _noRandomize;
_private final boolean _utf8Support;
_private final _ErrorListener errorListener;
_private final _ConnectionListener connectionListener;
_private final _ExecutorService executor;
着重看一下两个listener
- option的方法:大部分是获取某个属性值的,等同于getter方法.不乏含有其他的工具行方法,处理uri,创建连接消息等
- builder类参数:与options类的成员变量基本一致
- builder类方法:用于设置参数和创建生成option对象,调用build()方法.
知识补充:
这里用到静态内部类:
1)首先,用内部类是因为内部类与所在外部类有一定的关系,往往只有该外部类调用此内部类。所以没有必要专门用一个Java文件存放这个类。
2)静态都是用来修饰类的内部成员的。比如静态方法,静态成员变量,静态常量。它唯一的作用就是随着类的加载(而不是随着对象的产生)而产生,以致可以用类名+静态成员名直接获得。
这样静态内部类就可以理解了,因为这个类没有必要单独存放一个文件,它一般来说只被所在外部类使用。并且它可以直接被用 外部类名+内部类名 获得。
顺带提一下:关于静态方法的线程安全问题
静态方法如果没有使用静态变量,则没有线程安全问题。
为什么呢?因为静态方法内声明的变量,每个线程调用时,都会新创建一份,而不会共用一个存储单元。比如这里的tmp,每个线程都会创建自己的一份,因此不会有线程安全问题。
注意:静态变量,由于是在类加载时占用一个存储区,每个线程都是共用这个存储区的,所以如果在静态方法里使用了静态变量,这就会有线程安全问题!
5、NatsMessage 消息传送所使用的最小单位
- 成员属性
private _String sid;
_private _String subject;
_private _String replyTo;
_private byte[] data;
_private _NatsSubscription subscription;
NatsMessage next; // 用于LinkedBlockingQueue
- 构造函数
- getter方法:包含获取主题,获取data等