title: OpenMessaging兼容示例 date: 2017/12/21

categories: 文档翻译

OpenMessaging Example

OpenMessaging, which includes the establishment of industry guidelines and messaging, streaming specifications to provide a common framework for finance, e-commerce, IoT and big-data area. The design principles are the cloud-oriented, simplicity, flexibility, and language independent in distributed heterogeneous environments. Conformance to these specifications will make it possible to develop a heterogeneous messaging applications across all major platforms and operating systems.

OpenMessaging兼容示例

OpenMessaging力图建立行业准则和消息分发、流计算领域标准,为金融、电子商务、物联网和大数据领域提供通用框架。设计原则包括,面向云、简单易用、灵活度高、独立于编程语言,以及能在分布式异构环境中使用。这些标准的一致性将会使得开发一款跨所有主流平台和操作系统的异构消息分发应用成为可能。

RocketMQ provides a partial implementation of OpenMessaging 0.1.0-alpha, the following examples demonstrate how to access RocketMQ based on OpenMessaging.

RocketMQ提供了关于OpenMessaging 0.1.0-alpha版本的部分实现,以下例子展示了如何按照OpenMessaging规范接入RocketMQ.

OMSProducer

The following example shows how to send message to RocketMQ broker in synchronous, asynchronous, or one-way transmissions.

  1. public class OMSProducer {
  2. public static void main(String[] args) {
  3. final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
  4. .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
  5. final Producer producer = messagingAccessPoint.createProducer();
  6. messagingAccessPoint.startup();
  7. System.out.printf("MessagingAccessPoint startup OK%n");
  8. producer.startup();
  9. System.out.printf("Producer startup OK%n");
  10. {
  11. Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
  12. SendResult sendResult = producer.send(message);
  13. System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
  14. }
  15. {
  16. final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  17. result.addListener(new PromiseListener<SendResult>() {
  18. @Override
  19. public void operationCompleted(Promise<SendResult> promise) {
  20. System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
  21. }
  22. @Override
  23. public void operationFailed(Promise<SendResult> promise) {
  24. System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
  25. }
  26. });
  27. }
  28. {
  29. producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  30. System.out.printf("Send oneway message OK%n");
  31. }
  32. producer.shutdown();
  33. messagingAccessPoint.shutdown();
  34. }
  35. }

OMSProducer

下面这个例子展示了如何向RocketMQ broker用同步、异步、单向发送的方式发送消息。

  1. public class OMSProducer {
  2. public static void main(String[] args) {
  3. final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
  4. .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
  5. final Producer producer = messagingAccessPoint.createProducer();
  6. messagingAccessPoint.startup();
  7. System.out.printf("MessagingAccessPoint startup OK%n");
  8. producer.startup();
  9. System.out.printf("Producer startup OK%n");
  10. {
  11. Message message = producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")));
  12. SendResult sendResult = producer.send(message);
  13. System.out.printf("Send sync message OK, msgId: %s%n", sendResult.messageId());
  14. }
  15. {
  16. final Promise<SendResult> result = producer.sendAsync(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  17. result.addListener(new PromiseListener<SendResult>() {
  18. @Override
  19. public void operationCompleted(Promise<SendResult> promise) {
  20. System.out.printf("Send async message OK, msgId: %s%n", promise.get().messageId());
  21. }
  22. @Override
  23. public void operationFailed(Promise<SendResult> promise) {
  24. System.out.printf("Send async message Failed, error: %s%n", promise.getThrowable().getMessage());
  25. }
  26. });
  27. }
  28. {
  29. producer.sendOneway(producer.createBytesMessageToTopic("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8"))));
  30. System.out.printf("Send oneway message OK%n");
  31. }
  32. producer.shutdown();
  33. messagingAccessPoint.shutdown();
  34. }
  35. }

OMSPullConsumer

Use OMS PullConsumer to poll【译者注:为什么是poll】 messages from a specified queue.

  1. public class OMSPullConsumer {
  2. public static void main(String[] args) {
  3. final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
  4. .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
  5. final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
  6. OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
  7. messagingAccessPoint.startup();
  8. System.out.printf("MessagingAccessPoint startup OK%n");
  9. consumer.startup();
  10. System.out.printf("Consumer startup OK%n");
  11. Message message = consumer.poll();
  12. if (message != null) {
  13. String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
  14. System.out.printf("Received one message: %s%n", msgId);
  15. consumer.ack(msgId);
  16. }
  17. consumer.shutdown();
  18. messagingAccessPoint.shutdown();
  19. }
  20. }

OMSPullConsumer

使用OMS PullConsumer从特定队列中拉取消息。

  1. public class OMSPullConsumer {
  2. public static void main(String[] args) {
  3. final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
  4. .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
  5. final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC",
  6. OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
  7. messagingAccessPoint.startup();
  8. System.out.printf("MessagingAccessPoint startup OK%n");
  9. consumer.startup();
  10. System.out.printf("Consumer startup OK%n");
  11. Message message = consumer.poll();
  12. if (message != null) {
  13. String msgId = message.headers().getString(MessageHeader.MESSAGE_ID);
  14. System.out.printf("Received one message: %s%n", msgId);
  15. consumer.ack(msgId);
  16. }
  17. consumer.shutdown();
  18. messagingAccessPoint.shutdown();
  19. }
  20. }

OMSPushConsumer

Attaches OMS PushConsumer to a specified queue and consumes messages by MessageListener

  1. public class OMSPushConsumer {
  2. public static void main(String[] args) {
  3. final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
  4. .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
  5. final PushConsumer consumer = messagingAccessPoint.
  6. createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
  7. messagingAccessPoint.startup();
  8. System.out.printf("MessagingAccessPoint startup OK%n");
  9. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  10. @Override
  11. public void run() {
  12. consumer.shutdown();
  13. messagingAccessPoint.shutdown();
  14. }
  15. }));
  16. consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
  17. @Override
  18. public void onMessage(final Message message, final ReceivedMessageContext context) {
  19. System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
  20. context.ack();
  21. }
  22. });
  23. }
  24. }

OMSPushConsumer

把OMS PushConsumer和某一个特定的队列绑定在一起,通过MessageListener消费消息。

  1. public class OMSPushConsumer {
  2. public static void main(String[] args) {
  3. final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory
  4. .getMessagingAccessPoint("openmessaging:rocketmq://IP1:9876,IP2:9876/namespace");
  5. final PushConsumer consumer = messagingAccessPoint.
  6. createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER"));
  7. messagingAccessPoint.startup();
  8. System.out.printf("MessagingAccessPoint startup OK%n");
  9. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  10. @Override
  11. public void run() {
  12. consumer.shutdown();
  13. messagingAccessPoint.shutdown();
  14. }
  15. }));
  16. consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
  17. @Override
  18. public void onMessage(final Message message, final ReceivedMessageContext context) {
  19. System.out.printf("Received one message: %s%n", message.headers().getString(MessageHeader.MESSAGE_ID));
  20. context.ack();
  21. }
  22. });
  23. }
  24. }