Rxified API
Embedding Rxfified Vert.x Just use the Vertx.vertx methods:
Vertx vertx = io.vertx.rxjava.core.Vertx.vertx(); As a Verticle Extend the AbstractVerticle class, it will wrap it for you:
class MyVerticle extends io.vertx.rxjava.core.AbstractVerticle { public void start() { // Use Rxified Vertx here } } Deploying an RxJava verticle is still performed by the Java deployer and does not need a specified deployer.
Api examples Let’s study now a few examples of using Vert.x with RxJava.
EventBus message stream
The event bus MessageConsumer provides naturally an Observable
EventBus eb = vertx.eventBus();
MessageConsumer
// Unregisters the stream after 10 seconds vertx.setTimer(10000, id -> { sub.unsubscribe(); }); The MessageConsumer provides a stream of Message. The body gives access to a new stream of message bodies if needed:
EventBus eb = vertx.eventBus();
MessageConsumer
Observable
observable. buffer(1, TimeUnit.SECONDS). map(samples -> samples. stream(). collect(Collectors.averagingDouble(d -> d))). subscribe(heat -> { vertx.eventBus().send(“news-feed”, “Current heat is “ + heat); }); Timers Timer task can be created with timerStream:
vertx.timerStream(1000). toObservable(). subscribe( id -> { System.out.println(“Callback after 1 second”); } ); Periodic task can be created with periodicStream:
vertx.periodicStream(1000). toObservable(). subscribe( id -> { System.out.println(“Callback every second”); } ); The observable can be cancelled with an unsubscription:
vertx.periodicStream(1000).
toObservable().
subscribe(new Subscriber
HttpClient client = vertx.createHttpClient(new HttpClientOptions());
HttpClientRequest request = client.request(HttpMethod.GET, 8080, “localhost”, “/the_uri”);
request.toObservable().subscribe(
response -> {
// Process the response
},
error -> {
// Could not connect
}
);
request.end();
The response can be processed as an Observable
request.toObservable().
subscribe(
response -> {
Observable
request.toObservable().
flatMap(HttpClientResponse::toObservable).
forEach(
buffer -> {
// Process buffer
}
);
We can also unmarshall the Observable
request.toObservable(). flatMap(HttpClientResponse::toObservable). lift(io.vertx.rxjava.core.RxHelper.unmarshaller(MyPojo.class)). forEach( pojo -> { // Process pojo } ); Http server requests The requestStream provides a callback for each incoming request:
Observable
Observable
Observable
HttpClient client = vertx.createHttpClient(new HttpClientOptions());
WebSocketStream stream = client.websocketStream(8080, “localhost”, “/the_uri”);
stream.toObservable().subscribe(
ws -> {
// Use the websocket
},
error -> {
// Could not connect
}
);
The WebSocket can then be turned into an Observable
socketObservable.subscribe(
socket -> {
Observable
Observable
socketObservable.subscribe(
socket -> {
Observable