15.5. Jersey 客户端使用 SSE

对于客户端, Jersey API 提供支持 接收和处理 SSE 事件两种编程模式:

  • Pull 模式 - 从 EventInput 里面 pull 事件, 或者
  • Push 模式 - 监听 EventSource 异步通知

15.5.1. 从 EventInput 中读 SSE 事件

客户端可以从 EventInput 里面读事件,代码如下:

  1. Client client = ClientBuilder.newBuilder()
  2. .register(SseFeature.class).build();
  3. WebTarget target = client.target("http://localhost:9998/events");
  4. EventInput eventInput = target.request().get(EventInput.class);
  5. while (!eventInput.isClosed()) {
  6. final InboundEvent inboundEvent = eventInput.read();
  7. if (inboundEvent == null) {
  8. // connection has been closed
  9. break;
  10. }
  11. System.out.println(inboundEvent.getName() + "; "
  12. + inboundEvent.readData(String.class));
  13. }

在这个例子中,客户端连接到服务器。首先,创建一个新的 JAX-RS/Jersey 客户端实例并注册 SseFeature。然后 WebTarget 实例从客户端和用于检索调用 HTTP 请求。返回的响应实体直接读取作为 EventInput Java 类型,这是一个 Jersey ChunkedInput 的扩展,提供通用的支持用于分块消息消费的有效负载。示例中的代码开启循环处理入站 SSE 事件从 eventInput 读取响应流。每个块读取输入的是 InboundEvent。方法 InboundEvent.readData(Class) 为客户提供了一种方法来表示 Java 类型应该用于事件数据反序列化。在我们的示例中,单独事件被反序列化为字符串 Java 类型的实例。这种方法在内部找到并执行适当的 MessageBodyReader 这是用来做实际的反序列化。这类似于通过 readEntity(Class) 来读取Response中的实体。方法 readData 也可以抛出一个ProcessingException

在 inboundEvent 中检查 null 是必要的,以确保该数据块被正确读取和连接不会被服务器关闭。一旦连接关闭,循环终止,项目完成执行。客户端代码将生成以下控制台输出:

  1. message-to-client; Hello world 0!
  2. message-to-client; Hello world 1!
  3. message-to-client; Hello world 2!
  4. message-to-client; Hello world 3!
  5. message-to-client; Hello world 4!
  6. message-to-client; Hello world 5!
  7. message-to-client; Hello world 6!
  8. message-to-client; Hello world 7!
  9. message-to-client; Hello world 8!
  10. message-to-client; Hello world 9!

15.5.2. 通过 EventSource 处理异步 SSE

Jersey SSE client API 中用于异步的读取 SSE 的是 EventSource 。下面是 EventSource 的用法:

Example 15.4. Registering EventListener with EventSource

  1. Client client = ClientBuilder.newBuilder()
  2. .register(SseFeature.class).build();
  3. WebTarget target = client.target("http://example.com/events");
  4. EventSource eventSource = EventSource.target(target).build();
  5. EventListener listener = new EventListener() {
  6. @Override
  7. public void onEvent(InboundEvent inboundEvent) {
  8. System.out.println(inboundEvent.getName() + "; "
  9. + inboundEvent.readData(String.class));
  10. }
  11. };
  12. eventSource.register(listener, "message-to-client");
  13. eventSource.open();
  14. ...
  15. eventSource.close();

在这个例子中,客户端代码连接到服务器。在这种情况下请求到 web target 不是直接在代码中,相反, web target 实例是用来初始化一个新的 EventSource.Builder 实例用于创建新的 EventSource。 build() 方法的选择会告诉 ,用于建立一个新的EventSource。建立()方法的选择是很重要的,因为它告诉 EventSource.Builder 来创建一个新的 EventSource ,这不会自动连接到 target。在建立连接之后才通过手动调用 eventSource.open() 方法。使用自定义 EventListener 实现听和处理传入的 SSE 事件。方法 readData(Class) 定义了,事件数据应该从收到 InboundEvent 实例反序列化为一个字符串的 Java 类型。这个方法调用内部执行MessageBodyReader 反序列化的事件数据。这类似于通过 readEntity(Class) 从 Response 读取一个实体)。readData 可以抛出一个ProcessingException的方法。

自定义事件源注册侦听器是通过 EventSource.register(EventListener, String) 方法在事件源中来注册。下一个方法参数定义了可以被接收和省略的事件名称。如果名称定义了,与指定名字关联的侦听器将只会调用定义了一组事件的名称的事件。它不会调用其他名字或者没有名字的事件。

重要

这是一个常见的错误认为,匿名注册事件将被注册用于处理从一组特定名称集合中的事件的侦听器处理。事实并非如此!匿名事件只是由没有绑定名字的侦听器处理。同样的限制适用于现代浏览器支持的 HTML5 Javascript SSE Client API

在连接到服务器后,通过调用事件源中 open() 方法来打开连接。当事件命名为“message-to-client”,侦听器将被事件源所执行。如果任何其他事件(名字不同于“message-to-client”)来到,注册侦听器不会调用。一旦客户端完成了处理并且不希望接收事件了,它将通过在事件源上调用close() 方法来关闭连接。

侦听器从上面的示例将打印以下输出:

  1. message-to-client; Hello world 0!
  2. message-to-client; Hello world 1!
  3. message-to-client; Hello world 2!
  4. message-to-client; Hello world 3!
  5. message-to-client; Hello world 4!
  6. message-to-client; Hello world 5!
  7. message-to-client; Hello world 6!
  8. message-to-client; Hello world 7!
  9. message-to-client; Hello world 8!
  10. message-to-client; Hello world 9!

浏览 Jersey SSE API 文档时,您可能已经注意到,EventSource 实现EventListener 并提供一个空实现 onEvent(InboundEvent inboundEvent) 侦听器方法。这增加了 Jersey 客户端 SSE API更多的灵活性。而不是定义和注册一个单独的事件侦听器,在简单的情况下你也可以选择直接从 EventSource和覆盖空侦听器方法来处理传入的事件。这种编程模型所示下面的例子:

Example 15.5. Overriding EventSource.onEvent(InboundEvent) method

  1. Client client = ClientBuilder.newBuilder()
  2. .register(SseFeature.class).build();
  3. WebTarget target = client.target("http://example.com/events");
  4. EventSource eventSource = new EventSource(target) {
  5. @Override
  6. public void onEvent(InboundEvent inboundEvent) {
  7. if ("message-to-client".equals(inboundEvent.getName())) {
  8. System.out.println(inboundEvent.getName() + "; "
  9. + inboundEvent.readData(String.class));
  10. }
  11. }
  12. };
  13. ...
  14. eventSource.close();

这种方式,连接到 SSE 端点在事件源创建默认情况下自动打开。EventListener 的实现被移入覆盖 EventSource.onEvent(…) 方法。然而,这一次,侦听器方法将执行所有事件的——无名以及任何带名称的。因此代码检查名称是否与“message-to-client”这个名字是一个事件,我们要处理。注意,您仍然可以注册额外的 EventListener 。事件源上的覆盖方法允许您处理消息即使没有额外的侦听器注册。

 15.5.2.1. EventSource 重连支持

从 Jersey 2.3开始,EventSource 实现支持自动恢复遗失的连接,包括交付谈判的任何错过了 基于最后收到了 SSE 事件 id 字段值的事件,提供这个服务器设置的字段和服务器支持的谈判设施。在连接丢失的情况下,最后收到了 SSE 事件 id 字段值是 Last-Event-ID 发送在 HTTP 请求报头作为新连接的一部分请求发送到 SSE 端点。在收到这样的连接请求,SSE 端点支持这种转让工具将所有错过的事件重演。

注意

注意,SSE丢失事件谈判设施是尽力机制,不提供任何担保所有事件将交付而没有损失。你不应该依赖于接收的每一个事件,而相应地设计您的客户机应用程序代码。

默认情况下,当一个连接 SSE 端点丢失,事件源将使用默认的延迟在试图重新连接 SSE 端点之前 。然而 SSE 端点可以控制客户端重试延迟包括一个特殊的 retry 字段值在任何情况下发送到客户机。Jersey EventSource 实现了自动跟踪任何收到 SSE 事件 retry 字段值设定的端点,并相应地调整连接延迟,使用最后收到 retry 字段值作为新的连接延迟。

除了处理标准连接损失,Jersey EventSource 自动处理任何HTTP 503 Service Unavailable 响应收到 SSE 端点,包括 Retry-After HTTP头一个有效值。HTTP 503 + Retry-After 技术是常用的 HTTP 端点的连接和流量节流。以防一个 HTTP 503 + Retry-After 收到响应从SSE 端点返回一个连接请求, Jersey EventSource 将自动安排一次重连接尝试并使用收到 Retry-After HTTP 报头值作为一次性覆盖的重连接的延迟。