在gradle中添加库:
//noinspection GradleCompatible
implementation 'com.android.support:support-v4:4.4.1'
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
在清单文件中添加权限,并在application中引用服务:
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
....
<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:theme="@style/Theme.MQTTProject">
<activity android:name=".MainActivity">
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
<!--此处为添加库中的服务-->
<service android:name="org.eclipse.paho.android.service.MqttService" />
<!--此处为自行写入的服务-->
<service android:name=".MyService" android:enabled="true"/>
</application>
编写MQTT服务代码
class MyService : Service() { private lateinit var mMQttConnectOptions: MqttConnectOptions companion object { private lateinit var mQttClient: MqttAndroidClient // 客户端唯一标识 const val CLIENT_ID = "kotlin" // 发布者主题 const val PUBLISH_TOPIC = "topic_test" // 响应主题 const val RESPONSE_TOPIC = "message_arrived" // 服务器地址 const val HOST = "tcp://broker.emqx.io:1883" // 用户名 const val USERNAME = "admin" // 密码 const val PASSWORD = "public" const val TAG = "MyService" } override fun onCreate() { super.onCreate() initMQTTConnect() } @SuppressLint("LongLogTag") fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) { try { val message = MqttMessage() message.payload = msg.toByteArray() // 传输质量 message.qos = qos // 是否保留 message.isRetained = retained mQttClient.publish(topic, message, null, object : IMqttActionListener { override fun onSuccess(asyncActionToken: IMqttToken?) { Log.e(com.wsd.mqttproject.Service.TAG, "$msg 推送至 $topic 成功") } override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { Log.e(com.wsd.mqttproject.Service.TAG, "$msg 推送至 $topic 失败") } }) } catch (e: MqttException) { Log.e(com.wsd.mqttproject.Service.TAG, "publish: $e") } } // 初始化MQttAndroidClient fun initMQTTConnect() { Log.e(TAG, "initMQTTConnect: 初始化中") mQttClient = MqttAndroidClient(this, HOST, CLIENT_ID) mMQttConnectOptions = MqttConnectOptions() // 在重新启动和重新连接时记住状态 mMQttConnectOptions.isCleanSession = true mMQttConnectOptions.userName = USERNAME mMQttConnectOptions.password = PASSWORD.toCharArray() // 设置超时时间,单位:秒 mMQttConnectOptions.connectionTimeout = 10 // 心跳包发送间隔,单位:秒 mMQttConnectOptions.keepAliveInterval = 20 // 设置服务质量 val message = MqttMessage("payLoad".toByteArray()) message.qos = 1 mQttClient.setCallback(mqttCallback) // 回调 connectionMQTTServer() } // 进行链接操作 private fun connectionMQTTServer() { Log.e(TAG, "connectionMQTTServer: 正在进行链接操作") Thread { try { mQttClient.connect(mMQttConnectOptions, null, object : IMqttActionListener { override fun onSuccess(asyncActionToken: IMqttToken?) { Log.e(TAG, "链接成功") try { // mQttClient.subscribe(PUBLISH_TOPIC, 1) //订阅主题,参数:主题、服务质量 // subscribe(PUBLISH_TOPIC,1) // publish(PUBLISH_TOPIC, "你好MQtt", 1, false) } catch (e: MqttException) { Log.e(TAG, "onSuccess: $e") } } @Suppress("DEPRECATION") override fun onFailure( asyncActionToken: IMqttToken?, exception: Throwable? ) { Log.e(TAG, "连接失败,正在重新链接") Log.e(TAG, "onFailure: $exception") Handler().postDelayed({ connectionMQTTServer() }, 500) //延时5秒重新连接MQTT服务器 } }) } catch (e: MqttException) { Log.e(TAG, "run: $e") } }.run() } // 创建MQtt订阅 订阅Topic //订阅主题,参数:主题、服务质量 @Suppress("SameParameterValue") fun subscribe(topic:String, qos:Int = 1){ try { mQttClient.subscribe(topic, qos, null, object : IMqttActionListener { override fun onSuccess(asyncActionToken: IMqttToken?) { Log.e(TAG, "订阅到 $topic") } override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { Log.e(TAG, "订阅到 $topic 失败") } }) }catch (e: MqttException){ Log.e(TAG, "subscribe: $e") } } // 取消订阅 @SuppressLint("LongLogTag") private fun unsubscribe(topic: String){ try { mQttClient.unsubscribe(topic, null, object : IMqttActionListener { override fun onSuccess(asyncActionToken: IMqttToken?) { Log.e(TAG, "取消订阅$topic 成功") } override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) { Log.e(TAG, "取消订阅$topic 失败") } }) }catch (e: MqttException){ Log.e(TAG, "unsubscribe: $e") } } private val mqttCallback = object : MqttCallbackExtended { override fun connectionLost(cause: Throwable?) { Log.e(TAG, "连接断开 ") connectionMQTTServer() // 重连 } // 接受的消息 override fun messageArrived(topic: String?, message: MqttMessage?) { Log.e(TAG, "接受的消息来自主题${topic}消息为:${message.toString()}") response("GetInfo") } override fun deliveryComplete(token: IMqttDeliveryToken?) { } override fun connectComplete(reconnect: Boolean, serverURI: String?) { /** *与服务器的连接成功完成时调用。 * @param reconnect如果为true,则连接是自动重新连接的结果。 * @param serverURI建立连接的服务器URI。 **/ } } fun response(message: String) { val topic = RESPONSE_TOPIC val qos = 1 val retained = false try { //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息 mQttClient.publish(topic, message.toByteArray(), qos, retained) } catch (e: MqttException) { Log.e(TAG, "response: $e") } } override fun onBind(intent: Intent?): IBinder? { return null } override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { Log.e(TAG, "onStartCommand: 服务已经启动") return super.onStartCommand(intent, flags, startId) } override fun onDestroy() { super.onDestroy() try { mQttClient.disconnect() } catch (e: MqttException) { Log.e(TAG, "onDestroy: $e") } } }
MainActivity中使用:
class MainActivity : AppCompatActivity() { private val myService = MyService() override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main) // 链接服务器 // 创建订阅 subscribe_btn.setOnClickListener { myService.subscribe(PUBLISH_TOPIC, 1) } // 发布消息 publish_btn.setOnClickListener { myService.publish(PUBLISH_TOPIC,"你好你好你好",1,false) } } override fun onStart() { super.onStart() val intent = Intent(this,MyService::class.java) startService(intent) } }