1.发送消息
https://help.aliyun.com/document_detail/29547.html?spm=a2c4g.11186623.0.0.43df9029etJ14J#section-q63-74s-m5s
2.订阅消息
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class ConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制台创建的Group ID。
properties.put(PropertyKeyConst.GROUP_ID, "XXX");
// AccessKey ID,阿里云身份验证。获取方式,请参见本文前提条件中的获取AccessKey。
properties.put(PropertyKeyConst.AccessKey, "XXX");
// AccessKey Secret,阿里云身份验证。获取方式,请参见本文前提条件中的获取AccessKey。
properties.put(PropertyKeyConst.SecretKey, "XXX");
// 设置TCP协议接入点,进入控制台的实例详情页面的TCP协议客户端接入点区域查看。
properties.put(PropertyKeyConst.NAMESRV_ADDR,
"XXX");
// 集群订阅方式(默认)。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
// 广播订阅方式。
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { //订阅多个Tag
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
//订阅另外一个Topic,如需取消订阅该Topic,请删除该部分的订阅代码,重新启动消费端即可。
consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部Tag。
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();
System.out.println("Consumer Started");
}
}