ActiveMQ中文教程参考手册@www.java1234.com.pdf

  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. public class TestProducer {
  4. public static String TEST_BROKER_URL = "tcp://47.94.17.101:61616";
  5. public static String TEST_QUEUE_NAME = "TEST_QUEUE";
  6. public static String TEST_TOPIC_NAME = "TEST_TOPIC";
  7. public static void main(String[] args) throws JMSException {
  8. // Create a ConnectionFactory
  9. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(TEST_BROKER_URL);
  10. connectionFactory.setUserName(ActiveMQConnectionFactory.DEFAULT_USER);
  11. connectionFactory.setPassword(ActiveMQConnectionFactory.DEFAULT_PASSWORD);
  12. // Create a Connection and start it
  13. Connection connection = connectionFactory.createConnection();
  14. connection.start();
  15. // Create a Session with AUTO_ACKNOWLEDGE model
  16. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  17. // Create the destination (Topic or Queue) and specified the queue name.
  18. Destination destination = session.createQueue(TEST_QUEUE_NAME);
  19. try {
  20. // Create a MessageProducer from the Session to the Topic or Queue
  21. MessageProducer producer = session.createProducer(destination);
  22. producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  23. //create 10 TextMessage and send it
  24. for (int i = 0; i < 10; i++) {
  25. TextMessage message = session.createTextMessage("hello! I was "+i);
  26. producer.send(message);
  27. }
  28. }catch (Exception e) {
  29. System.out.println("Caught: " + e);
  30. e.printStackTrace();
  31. }finally {
  32. //close session and connection if not null
  33. if (session!=null){
  34. session.close();
  35. }
  36. if (connection == null) {
  37. connection.close();
  38. }
  39. }
  40. }
  41. }
  1. import org.apache.activemq.ActiveMQConnectionFactory;
  2. import javax.jms.*;
  3. /**
  4. * Hello world!
  5. */
  6. public class TestCustomer {
  7. public static String TEST_BROKER_URL = "tcp://47.94.17.101:61616";
  8. public static String TEST_QUEUE_NAME = "TEST_QUEUE";
  9. public static String TEST_TOPIC_NAME = "TEST_TOPIC";
  10. public static void main(String[] args) throws Exception {
  11. // Create a ConnectionFactory
  12. ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(TEST_BROKER_URL);
  13. Connection connection = null;
  14. Session session = null;
  15. MessageConsumer consumer = null;
  16. try {
  17. // Create a Connection
  18. connection = connectionFactory.createConnection();
  19. connection.start();
  20. connection.setExceptionListener(new ExceptionListener() {
  21. @Override
  22. public void onException(JMSException e) {
  23. System.out.println("JMS Exception occured. Shutting down client.");
  24. }
  25. });
  26. // Create a Session
  27. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  28. // Create the destination (Topic or Queue)
  29. Destination destination = session.createQueue(TEST_QUEUE_NAME);
  30. // Create a MessageConsumer from the Session to the Topic or Queue
  31. consumer = session.createConsumer(destination);
  32. while (true){
  33. // Wait for a message
  34. Message message = consumer.receive(1000);
  35. if (message!=null){
  36. if (message instanceof TextMessage) {
  37. TextMessage textMessage = (TextMessage) message;
  38. String text = textMessage.getText();
  39. System.out.println(Thread.currentThread().getName()+":" + text);
  40. } else {
  41. System.out.println(Thread.currentThread().getName()+":" + message);
  42. }
  43. }
  44. }
  45. } catch (Exception e) {
  46. System.out.println("Caught: " + e);
  47. e.printStackTrace();
  48. }finally {
  49. if (consumer!=null){
  50. try {
  51. consumer.close();
  52. } catch (JMSException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. if (session == null) {
  57. try {
  58. session.close();
  59. } catch (JMSException e) {
  60. e.printStackTrace();
  61. }
  62. }
  63. if (connection == null) {
  64. try {
  65. connection.close();
  66. } catch (JMSException e) {
  67. e.printStackTrace();
  68. }
  69. }
  70. }
  71. }
  72. }

1. Broker

ActiveMQ 5.0 的二进制发布包中bin目录中包含一个名为activemq的脚本,直接运行这个脚本就可以启动一个broker。 此外也可以通过Broker Configuration URI或Broker XBean URI对broker进行配置,以下是一些命令行参数的例子:

Example Description
activemq Runs a broker using the default ‘xbean:activemq.xml’ as the broker configuration file.
activemq xbean:myconfig.xml Runs a broker using the file myconfig.xml as the broker configuration file that is located in the classpath.
activemq xbean:file:./conf/broker1.xml Runs a broker using the file broker1.xml as the broker configuration file that is located in the relative file path ./conf/broker1.xml
activemq xbean:file:C:/ActiveMQ/conf/broker2.xml Runs a broker using the file broker2.xml as the broker configuration file that is located in the absolute file path C:/ActiveMQ/conf/broker2.xml
activemq broker:(tcp://localhost:61616, tcp://localhost:5000)?useJmx=true Runs a broker with two transport connectors and JMX enabled.
activemq broker:(tcp://localhost:61616, network:tcp://localhost:5000)?persistent=false Runs a broker with 1 transport connector and 1 network connector with persistence disabled.

2. ActiveMQ的存储方式

2.1 KahaDB存储

kahaDB是默认的持久化策略,所有消息顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化。

在data/kahadb这个目录下,会生成四个文件,来完成消息持久化

  1. db.data: 它是消息的索引文件,本质上是B-Tree(B树),使用B-Tree作为索引指向db-*.log里面存储的消息.
  2. db.redo: 用来进行消息恢复 *
  3. db-.log: 存储消息内容。新的数据以APPEND的方式追加到日志文件末尾。属于顺序写入,因此消息存储是比较 快的。默认是32M,达到阀值会自动递增
  4. lock: 文件锁,写入当前获得kahadb读写权限的broker,用于在集群环境下的竞争处理
  1. <persistenceAdapter>
  2. <!--directory:保存数据的目录 ;maxFileLength:保存消息的文件大小 -->
  3. <amqPersistenceAdapterdirectory="${activemq.data}/amq"maxFileLength="32mb"/>
  4. </persistenceAdapter>

特性:

  1. 日志形式存储消息;
  2. 消息索引以 B-Tree 结构存储,可以快速更新;
  3. 完全支持 JMS 事务;
  4. 支持多种恢复机制kahadb 可以限制每个数据文件的大小。不代表总计数据容量。

2.2 AMQ 方式存储

只适用于 5.3 版本之前。 AMQ 也是一个文件型数据库,消息信息最终是存储在文件中。内存中也会有缓存数据。

  1. <persistenceAdapter>
  2. <!--directory:保存数据的目录 ;maxFileLength:保存消息的文件大小 -->
  3. <amqPersistenceAdapterdirectory="${activemq.data}/amq"maxFileLength="32mb"/>
  4. </persistenceAdapter>

性能高于 JDBC,写入消息时,会将消息写入日志文件,由于是顺序追加写,性能很高。

为了提升性能,创建消息主键索引,并且提供缓存机制,进一步提升性能。

每个日志文件的 大小都是有限制的(默认 32m,可自行配置) 。

当超过这个大小,系统会重新建立一个文件。

当所有的消息都消费完成,系统会删除这 个文件或者归档。

主要的缺点是 AMQ Message 会为每一个 Destination 创建一个索引,如果使用了大量的 Queue,索引文件的大小会占用很多磁盘空间。

而且由于索引巨大,一旦 Broker(ActiveMQ 应用实例)崩溃,重建索引的速度会非常 慢。

虽然 AMQ 性能略高于 Kaha DB 方式,但是由于其重建索引时间过长,而且索引文件 占用磁盘空间过大,所以已经不推荐使用。

2.3 JDBC存储

使用JDBC持久化方式,数据库默认会创建3个表,每个表的作用如下:
activemq_msgs:queue和topic的消息都存在这个表中
activemq_acks:存储持久订阅的信息和最后一个持久订阅接收的消息ID
activemq_lock:跟kahadb的lock文件类似,确保数据库在某一时刻只有一个broker在访问

ActiveMQ 将数据持久化到数据库中。

不指定具体的数据库。 可以使用任意的数据库 中。

本环节中使用 MySQL 数据库。 下述文件为 activemq.xml 配置文件部分内容。

首先定义一个 mysql-ds 的 MySQL 数据源,然后在 persistenceAdapter 节点中配置 jdbcPersistenceAdapter 并且引用刚才定义的数据源。

dataSource 指定持久化数据库的 bean,createTablesOnStartup 是否在启动的时候创建数 据表,默认值是 true,这样每次启动都会去创建数据表了,一般是第一次启动的时候设置为 true,之后改成 false。

Beans中添加

  1. <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
  2. <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
  3. <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
  4. <property name="username" value="activemq"/>
  5. <property name="password" value="activemq"/>
  6. <property name="maxActive" value="200"/>
  7. <property name="poolPreparedStatements" value="true"/>
  8. </bean>

修改persistenceAdapter

  1. <persistenceAdapter>
  2. <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
  3. <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" />
  4. </persistenceAdapter>

依赖jar包
commons-dbcp commons-pool mysql-connector-java

表字段解释:
activemq_acks 表:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存。
主要的数据库字段如下:

container 消息的destination
sub_dest 如果是使用static集群,这个字段会有集群其他系统的信息
client_id 每个订阅者都必须有一个唯一的客户端id用以区分
sub_name 订阅者名称
selector 选择器,可以选择只消费满足条件的消息。条件可以用自定义属性实现,可支持多属性and和or操作
last_acked_id 记录消费过的消息的id。

2:activemq_lock表:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker。

3:activemq_msgs表:用于存储消息,Queue和Topic都存储在这个表中。
主要的数据库字段如下:

id 自增的数据库主键
container 消息的destination
msgid_prod 消息发送者客户端的主键
msg_seq 是发送消息的顺序,msgid_prod+msg_seq可以组成jms的messageid
expiration 消息的过期时间,存储的是从1970-01-01到现在的毫秒数
msg 消息本体的java序列化对象的二进制数据
priority 优先级,从0-9,数值越大优先级越高
xid 用于存储订阅关系。如果是持久化topic,订阅者和服务器的订阅关系在这个表保存。

2.4 LevelDB存储

LevelDB持久化性能高于KahaDB,虽然目前默认的持久化方式仍然是KahaDB。并且,在ActiveMQ 5.9版本提供 了基于LevelDB和Zookeeper的数据复制方式,用于Master-slave方式的首选数据复制方案。 但是在ActiveMQ官网对LevelDB的表述:LevelDB官方建议使用以及不再支持,推荐使用的是KahaDB

2.5 Memory 消息存储

顾名思义,基于内存的消息存储,就是消息存储在内存中。persistent=”false”,表示不设置持 久化存储,直接存储到内存中
在broker标签处设置。

2.6 JDBC Message store with ActiveMQ Journal

这种方式克服了JDBC Store的不足,JDBC存储每次消息过来,都需要去写库和读库。 ActiveMQ Journal,使用延迟存储数据到数据库,当消息来到时先缓存到文件中,延迟后才写到数据库中。

当消费者的消费速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。 举个例子,生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况 下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上的消息,那么这个时候只需要同步剩余的 10%的消息到DB。 如果消费者的消费速度很慢,这个时候journal文件可以使消息以批量方式写到DB。

3. ActiveMQ支持的协议

完整支持的协议

http://activemq.apache.org/configuring-version-5-transports.html

ActiveMQ支持的client-broker通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM。

3.1 TCP协议

  1. 这是默认的Broker配置,TCP的client监听端口是61616。
  2. 在网络传输数据前,必须要序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。默认情况下,ActiveMQ把wire protocol叫做**OpenWire**,它的目的是促使网络上的效率和数据快速交互。
  3. TCP连接的URI形式:**tcp://hostname:port**?key=value&key=value,红色加粗部分是必须的
  4. TCP传输的优点:
    1. TCP协议传输可靠性高,稳定性强
    2. 高效性:字节流方式传递,效率很高
    3. 有效性、可用性:应用广泛,支持任何平台
  1. <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

3.2 NIO协议

  1. NIO是基于TCP的,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务端有更多的负载。
  2. 适合使用NIO协议的场景:
    1. 可能有大量的client去链接到Broker上一般情况下,大量的Client去链接Broker是被操作系统的线程数所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议
    2. 可能对于Broker有一个很迟钝的网络传输NIO比TCP提供更好的性能
  3. NIO连接的URI形式:nio://hostname:port?key=value
  4. Transport Connector配置示例:
    1. <transportConnectors>
    2.   <transportConnector name="tcp" uri="tcp://localhost:61616?trace=true" />
    3.   <transportConnector name="nio" uri="nio://localhost:61618?trace=true" />
    4. </transportConnectors>

上面的配置,示范了一个TCP协议监听61616端口,一个NIO协议监听61618端口

3.3 UDP协议

  1. UDP和TCP的区别:
    1. TCP是一个原始流的传递协议,意味着数据包是有保证的,换句话说,数据包是不会被复制和丢失的。UDP,另一方面,它是不会保证数据包的传递的
    2. TCP也是一个稳定可靠的数据包传递协议,意味着数据在传递的过程中不会被丢失。这样确保了在发送和接收之间能够可靠的传递。相反,UDP仅仅是一个链接协议,所以它没有可靠性之说
  2. 从上面可以得出:TCP是被用在稳定可靠的场景中使用的;UDP通常用在快速数据传递和不怕数据丢失的场景中,还有ActiveMQ通过防火墙时,只能用UDP
  3. UDP连接的URI形式:udp://hostname:port?key=value
  4. Transport Connector配置示例:
  1. <transportConnectors>
  2. <transportConnector name="udp" uri="udp://localhost:61618?trace=true" />
  3. </transportConnectors>

3.4 SSL协议

1:连接的URI形式:ssl://hostname:port?key=value
2:Transport Connector配置示例:

  1. <transportConnectors>
  2. <transportConnector name="ssl" uri="ssl://localhost:61617?trace=true"/>
  3. </transportConnectors>

3.5 HTTP/HTTPS协议

1:像web和email等服务需要通过防火墙来访问的,Http可以使用这种场合
2:连接的URI形式:http://hostname:port?key=value或者https://hostname:port?key=value
3:Transport Connector配置示例:

  1. <transportConnectors>
  2. <transportConnector name="http" uri="http://localhost:8080?trace=true" />
  3. </transportConnectors>

3.6 VM协议

1、VM transport允许在VM内部通信,从而避免了网络传输的开销。这时候采用的连 接不是socket连接,而是直接的方法调用。

2、第一个创建VM连接的客户会启动一个embed VM broker,接下来所有使用相同的 broker name的VM连接都会使用这个broker。当这个broker上所有的连接都关闭 的时候,这个broker也会自动关闭。

3、连接的URI形式:vm://brokerName?key=value

4、Java中嵌入的方式: vm:broker:(tcp://localhost:6000)?brokerName=embeddedbroker&persistent=fal se , 定义了一个嵌入的broker名称为embededbroker以及配置了一个 tcptransprotconnector在监听端口6000上

5、使用一个加载一个配置文件来启动broker vm://localhost?brokerConfig=xbean:activemq.xml

3.7 auto

Starting with version 5.13.0, ActiveMQ supports wire format protocol detection. OpenWire, STOMP, AMQP, and MQTT can be automatically detected. This allows one transport to be shared for all 4 types of clients.

  1. <transportConnector name="auto" uri="auto://localhost:5671"/>
  2. <transportConnector name="auto+ssl" uri="auto+ssl://localhost:5671"/>
  3. <transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>
  4. <transportConnector name="auto+nio+ssl" uri="auto+nio+ssl://localhost:5671"/>

3.8 OpenWrite

OpenWire 可用配置选项

Option Default Description
cacheEnabled true Should commonly repeated values be cached so that less marshaling occurs?
cacheSize 1024 When cacheEnabled=true
then this parameter is used to specify the number of values to be cached.
maxInactivityDuration 30000 The maximum inactivity
duration (before which the socket is considered dead) in milliseconds. On some platforms it can take a long time for a socket to die. Therefore allow the broker to kill connections when they have been inactive for the configured period of time. Used by some transports to enable a keep alive heart beat feature. Inactivity monitoring is disabled when set to a value <= 0
.
maxInactivityDurationInitalDelay 10000 The initial delay before starting inactivity
checks. Yes, the word 'Inital'
is supposed to be misspelled like that.
maxFrameSize MAX_LON Maximum allowed frame size. Can help help prevent OOM DOS attacks.
sizePrefixDisabled false Should the size of the packet be prefixed before each packet is marshaled?
stackTraceEnabled true Should the stack trace of exception that occur on the broker be sent to the client?
tcpNoDelayEnabled true Does not affect the wire format, but provides a hint to the peer that TCP_NODELAY
should be enabled on the communications Socket.
tightEncodingEnabled true Should wire size be optimized over CPU usage?

Transport 可用配置选项

Option Name Default Value Description
backlog 5000 Specifies the maximum number of connections waiting to be accepted by the transport server socket.
closeAsync true If **true**
the socket close call happens asynchronously. This parameter should be set to **false**
for protocols like STOMP, that are commonly used in situations where a new connection is created for each read or write. Doing so ensures the socket close call happens synchronously. A synchronous close prevents the broker from running out of available sockets owing to the rapid cycling of connections.
connectionTimeout 30000 If **>=1**
the value sets the connection timeout in milliseconds. A value of **0**
denotes no timeout. Negative values are ignored.
daemon false If **true**
the transport thread will run in daemon mode. Set this parameter to **true**
when embedding the broker in a Spring container or a web container to allow the container to shut down correctly.
dynamicManagement false If **true**
the **TransportLogger**
can be managed by JMX.
ioBufferSize 8 * 1024 Specifies the size of the buffer to be used between the TCP layer and the OpenWire layer where **wireFormat**
based marshaling occurs.
jmxPort 1099 (Client Only) Specifies the port that will be used by the JMX server to manage the **TransportLoggers**
. This should only be set, via URI, by either a client producer or consumer as the broker creates its own JMX server. Specifying an alternate JMX port is useful for developers that test a broker and client on the same machine and need to control both via JMX.
keepAlive false If **true**
, enables TCP KeepAlive
on the broker connection to prevent connections from timing out at the TCP level. This should not be confused with **KeepAliveInfo**
messages as used by the **InactivityMonitor**
.
logWriterName default Sets the name of the **org.apache.activemq.transport.LogWriter**
implementation to use. Names are mapped to classes in the **resources/META-INF/services/org/apache/activemq/transport/logwriters**
directory.
maximumConnections Integer.MAX_VALUE The maximum number of sockets allowed for this broker.
minmumWireFormatVersion 0 The minimum remote **wireFormat**
version that will be accepted (note the misspelling). Note: when the remote **wireFormat**
version is lower than the configured minimum acceptable version an exception will be thrown and the connection attempt will be refused. A value of **0**
denotes no checking of the remote **wireFormat**
version.
socketBufferSize 64 * 1024 Sets the size, in bytes, for the accepted socket’s read and write buffers.
soLinger Integer.MIN_VALUE Sets the socket’s option **soLinger**
when the value is **> -1**
. When set to **-1**
the **soLinger**
socket option is disabled.
soTimeout 0 Sets the socket’s read timeout in milliseconds. A value of **0**
denotes no timeout.
soWriteTimeout 0 Sets the socket’s write timeout in milliseconds. If the socket write operation does not complete before the specified timeout, the socket will be closed. A value of 0 denotes no timeout.
stackSize 0 Set the stack size of the transport’s background reading thread. Must be specified in multiples of **128K**
. A value of **0**
indicates that this parameter is ignored.
startLogging true If **true**
the **TransportLogger**
object of the Transport stack will initially write messages to the log. This parameter is ignored unless **trace=true**
.
tcpNoDelay false If **true**
the socket’s option **TCP_NODELAY**
is set. This disables Nagle’s algorithm for small packet transmission.
threadName N/A When this parameter is specified the name of the thread is modified during the invocation of a transport. The remote address is appended so that a call stuck in a transport method will have the destination information in the thread name. This is extremely useful when using thread dumps for degugging.
trace false Causes all commands that are sent over the transport to be logged. To view the logged output define the **Log4j**
logger: **log4j.logger.org.apache.activemq.transport.TransportLogger=DEBUG**
.
trafficClass 0 The Traffic Class to be set on the socket.
diffServ 0 (Client only) The preferred Differentiated Services traffic class to be set on outgoing packets, as described in RFC 2475. Valid integer values: **[0,64]**
. Valid string values: **EF**
, **AF[1-3][1-4]**
or **CS[0-7]**
. With JDK 6, only works when the JVM uses the IPv4 stack. To use the IPv4 stack set the system property **java.net.preferIPv4Stack=true**
. Note: it’s invalid to specify both ‘diffServ and typeOfService’ at the same time as they share the same position in the TCP/IP packet headers
typeOfService 0 (Client only) The preferred Type of Service value to be set on outgoing packets. Valid integer values: **[0,256]**
. With JDK 6, only works when the JVM is configured to use the IPv4 stack. To use the IPv4 stack set the system property **java.net.preferIPv4Stack=true**
. Note: it’s invalid to specify both ‘diffServ and typeOfService’ at the same time as they share the same position in the TCP/IP packet headers.
useInactivityMonitor true When **false**
the **InactivityMonitor**
is disabled and connections will never time out.
useKeepAlive true When **true**
**KeepAliveInfo**
messages are sent on an idle connection to prevent it from timing out. If this parameter is **false**
connections will still timeout if no data was received on the connection for the specified amount of time.
useLocalHost false When **true**
local connections will be made using the value **localhost**
instead of the actual local host name. On some operating systems, such as **OS X**
, it’s not possible to connect as the local host name so **localhos**
is better.
useQueueForAccept true When **true**
accepted sockets are placed onto a queue for asynchronous processing using a separate thread.
wireFormat default The name of the **wireFormat**
factory to use.
wireFormat.* N/A Properties with this prefix are used to configure the **wireFormat**
.

4. HelloWorld

下载

http://activemq.apache.org/

安装启动

解压后直接执行

bin/win64/activemq.bat

web控制台

http://localhost:8161/

通过8161端口访问

修改访问端口

修改 ActiveMQ 配置文件:/usr/local/activemq/conf/jetty.xml

jettyport节点

配置文件修改完毕,保存并重新启动 ActiveMQ 服务。

开发

maven坐标

  1. <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
  2. <dependency>
  3. <groupId>org.apache.activemq</groupId>
  4. <artifactId>activemq-all</artifactId>
  5. <version>5.15.11</version>
  6. </dependency>

Sender

  1. package com.mashibing.activemq01;
  2. import javax.jms.Connection;
  3. import javax.jms.Destination;
  4. import javax.jms.JMSException;
  5. import javax.jms.MessageProducer;
  6. import javax.jms.Queue;
  7. import javax.jms.Session;
  8. import javax.jms.TextMessage;
  9. import org.apache.activemq.ActiveMQConnectionFactory;
  10. public class Sender {
  11. public static void main(String[] args) throws Exception {
  12. // 1. 建立工厂对象,
  13. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  14. ActiveMQConnectionFactory.DEFAULT_USER,
  15. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  16. "tcp://localhost:61616"
  17. );
  18. //2 从工厂里拿一个连接
  19. Connection connection = activeMQConnectionFactory.createConnection();
  20. connection.start();
  21. //3 从连接中获取Session(会话)
  22. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  23. // 从会话中获取目的地(Destination)消费者会从这个目的地取消息
  24. Queue queue = session.createQueue("f");
  25. //从会话中创建消息提供者
  26. MessageProducer producer = session.createProducer(queue);
  27. //从会话中创建文本消息(也可以创建其它类型的消息体)
  28. for (int i = 0; i < 100; i++) {
  29. TextMessage message = session.createTextMessage("msg: " + i);
  30. // 通过消息提供者发送消息到ActiveMQ
  31. Thread.sleep(1000);
  32. producer.send(message);
  33. }
  34. // 关闭连接
  35. connection.close();
  36. System.out.println("exit");
  37. }
  38. }

Receiver

  1. package com.mashibing.activemq01;
  2. import javax.jms.Connection;
  3. import javax.jms.Destination;
  4. import javax.jms.JMSException;
  5. import javax.jms.Message;
  6. import javax.jms.MessageConsumer;
  7. import javax.jms.MessageProducer;
  8. import javax.jms.Queue;
  9. import javax.jms.Session;
  10. import javax.jms.TextMessage;
  11. import org.apache.activemq.ActiveMQConnectionFactory;
  12. public class Receiver {
  13. public static void main(String[] args) throws Exception {
  14. // 1. 建立工厂对象,
  15. ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(
  16. ActiveMQConnectionFactory.DEFAULT_USER,
  17. ActiveMQConnectionFactory.DEFAULT_PASSWORD,
  18. "tcp://localhost:61616"
  19. );
  20. //2 从工厂里拿一个连接
  21. Connection connection = activeMQConnectionFactory.createConnection();
  22. connection.start();
  23. //3 从连接中获取Session(会话)
  24. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  25. // 从会话中获取目的地(Destination)消费者会从这个目的地取消息
  26. Queue queue = session.createQueue("f");
  27. //从会话中创建消息提供者
  28. MessageConsumer consumer = session.createConsumer(queue);
  29. //从会话中创建文本消息(也可以创建其它类型的消息体)
  30. while (true) {
  31. TextMessage receive = (TextMessage)consumer.receive();
  32. System.out.println("TextMessage:" + receive.getText());
  33. }
  34. }
  35. }

5. ActiveMQ的安全机制

web控制台安全

  1. # username: password [,rolename ...]
  2. admin: admin, admin
  3. user: user, user
  4. yiming: 123, user

用户名:密码,角色

注意: 配置需重启ActiveMQ才会生效。

消息安全机制

修改 activemq.xml

在123行 节点中添加

  1. <plugins>
  2. <simpleAuthenticationPlugin>
  3. <users>
  4. <authenticationUser username="admin" password="admin" groups="admins,publishers,consumers"/>
  5. <authenticationUser username="publisher" password="publisher" groups="publishers,consumers"/>
  6. <authenticationUser username="consumer" password="consumer" groups="consumers"/>
  7. <authenticationUser username="guest" password="guest" groups="guests"/>
  8. </users>
  9. </simpleAuthenticationPlugin>
  10. </plugins>