• 在gradle中添加库:

      1. //noinspection GradleCompatible
      2. implementation 'com.android.support:support-v4:4.4.1'
      3. implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
      4. implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    • 在清单文件中添加权限,并在application中引用服务:

      1. <uses-permission android:name="android.permission.INTERNET" />
      2. <uses-permission android:name="android.permission.WAKE_LOCK" />
      3. <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
      4. ....
      5. <application
      6. android:allowBackup="true"
      7. android:icon="@mipmap/ic_launcher"
      8. android:label="@string/app_name"
      9. android:roundIcon="@mipmap/ic_launcher_round"
      10. android:supportsRtl="true"
      11. android:theme="@style/Theme.MQTTProject">
      12. <activity android:name=".MainActivity">
      13. <intent-filter>
      14. <action android:name="android.intent.action.MAIN" />
      15. <category android:name="android.intent.category.LAUNCHER" />
      16. </intent-filter>
      17. </activity>
      18. <!--此处为添加库中的服务-->
      19. <service android:name="org.eclipse.paho.android.service.MqttService" />
      20. <!--此处为自行写入的服务-->
      21. <service android:name=".MyService" android:enabled="true"/>
      22. </application>
    • 编写MQTT服务代码

      class MyService : Service() {
        private lateinit var mMQttConnectOptions: MqttConnectOptions
      
        companion object {
            private lateinit var mQttClient: MqttAndroidClient
      
            //        客户端唯一标识
            const val CLIENT_ID = "kotlin"
      
            //        发布者主题
            const val PUBLISH_TOPIC = "topic_test"
      
            //        响应主题
            const val RESPONSE_TOPIC = "message_arrived"
      
            //        服务器地址
            const val HOST = "tcp://broker.emqx.io:1883"
      
            //        用户名
            const val USERNAME = "admin"
      
            //        密码
            const val PASSWORD = "public"
            const val TAG = "MyService"
        }
      
        override fun onCreate() {
            super.onCreate()
            initMQTTConnect()
        }
        @SuppressLint("LongLogTag")
        fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {
            try {
                val message = MqttMessage()
                message.payload = msg.toByteArray()
      //            传输质量
                message.qos = qos
      //            是否保留
                message.isRetained = retained
                mQttClient.publish(topic, message, null, object : IMqttActionListener {
                    override fun onSuccess(asyncActionToken: IMqttToken?) {
                        Log.e(com.wsd.mqttproject.Service.TAG, "$msg 推送至 $topic 成功")
                    }
      
                    override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                        Log.e(com.wsd.mqttproject.Service.TAG, "$msg 推送至 $topic 失败")
                    }
      
                })
            } catch (e: MqttException) {
                Log.e(com.wsd.mqttproject.Service.TAG, "publish: $e")
            }
        }
        //    初始化MQttAndroidClient
        fun initMQTTConnect() {
            Log.e(TAG, "initMQTTConnect: 初始化中")
            mQttClient = MqttAndroidClient(this, HOST, CLIENT_ID)
            mMQttConnectOptions = MqttConnectOptions()
      //        在重新启动和重新连接时记住状态
            mMQttConnectOptions.isCleanSession = true
            mMQttConnectOptions.userName = USERNAME
            mMQttConnectOptions.password = PASSWORD.toCharArray()
      //        设置超时时间,单位:秒
            mMQttConnectOptions.connectionTimeout = 10
      //        心跳包发送间隔,单位:秒
            mMQttConnectOptions.keepAliveInterval = 20
      //        设置服务质量
            val message = MqttMessage("payLoad".toByteArray())
            message.qos = 1
            mQttClient.setCallback(mqttCallback) // 回调
            connectionMQTTServer()
        }
        //    进行链接操作
        private fun connectionMQTTServer() {
            Log.e(TAG, "connectionMQTTServer: 正在进行链接操作")
            Thread {
                try {
                    mQttClient.connect(mMQttConnectOptions, null, object : IMqttActionListener {
                        override fun onSuccess(asyncActionToken: IMqttToken?) {
                            Log.e(TAG, "链接成功")
                            try {
      //                            mQttClient.subscribe(PUBLISH_TOPIC, 1) //订阅主题,参数:主题、服务质量
      //                            subscribe(PUBLISH_TOPIC,1)
      //                            publish(PUBLISH_TOPIC, "你好MQtt", 1, false)
                            } catch (e: MqttException) {
                                Log.e(TAG, "onSuccess: $e")
                            }
                        }
      
                        @Suppress("DEPRECATION")
                        override fun onFailure(
                            asyncActionToken: IMqttToken?,
                            exception: Throwable?
                        ) {
                            Log.e(TAG, "连接失败,正在重新链接")
                            Log.e(TAG, "onFailure: $exception")
                            Handler().postDelayed({ connectionMQTTServer() }, 500) //延时5秒重新连接MQTT服务器
                        }
      
                    })
                } catch (e: MqttException) {
                    Log.e(TAG, "run: $e")
                }
            }.run()
        }
        //    创建MQtt订阅  订阅Topic //订阅主题,参数:主题、服务质量
        @Suppress("SameParameterValue")
        fun subscribe(topic:String, qos:Int = 1){
            try {
                mQttClient.subscribe(topic, qos, null, object : IMqttActionListener {
                    override fun onSuccess(asyncActionToken: IMqttToken?) {
                        Log.e(TAG, "订阅到 $topic")
                    }
      
                    override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                        Log.e(TAG, "订阅到 $topic 失败")
                    }
      
                })
            }catch (e: MqttException){
                Log.e(TAG, "subscribe: $e")
            }
        }
        //    取消订阅
        @SuppressLint("LongLogTag")
        private fun unsubscribe(topic: String){
            try {
                mQttClient.unsubscribe(topic, null, object : IMqttActionListener {
                    override fun onSuccess(asyncActionToken: IMqttToken?) {
                        Log.e(TAG, "取消订阅$topic 成功")
                    }
      
                    override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                        Log.e(TAG, "取消订阅$topic 失败")
                    }
      
                })
            }catch (e: MqttException){
                Log.e(TAG, "unsubscribe: $e")
            }
        }
        private val mqttCallback = object : MqttCallbackExtended {
            override fun connectionLost(cause: Throwable?) {
                Log.e(TAG, "连接断开 ")
                connectionMQTTServer() // 重连
            }
      
            //        接受的消息
            override fun messageArrived(topic: String?, message: MqttMessage?) {
                Log.e(TAG, "接受的消息来自主题${topic}消息为:${message.toString()}")
                response("GetInfo")
            }
      
            override fun deliveryComplete(token: IMqttDeliveryToken?) {
      
            }
      
            override fun connectComplete(reconnect: Boolean, serverURI: String?) {
                /**
                 *与服务器的连接成功完成时调用。
                 * @param reconnect如果为true,则连接是自动重新连接的结果。
                 * @param serverURI建立连接的服务器URI。
                 **/
            }
      
        }
      
        fun response(message: String) {
            val topic = RESPONSE_TOPIC
            val qos = 1
            val retained = false
            try {
                //参数分别为:主题、消息的字节数组、服务质量、是否在服务器保留断开连接后的最后一条消息
                mQttClient.publish(topic, message.toByteArray(), qos, retained)
            } catch (e: MqttException) {
                Log.e(TAG, "response: $e")
            }
        }
      
        override fun onBind(intent: Intent?): IBinder? {
            return null
        }
      
        override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
            Log.e(TAG, "onStartCommand: 服务已经启动")
            return super.onStartCommand(intent, flags, startId)
        }
      
        override fun onDestroy() {
            super.onDestroy()
            try {
                mQttClient.disconnect()
            } catch (e: MqttException) {
                Log.e(TAG, "onDestroy: $e")
            }
        }
      }
      
    • MainActivity中使用:

      class MainActivity : AppCompatActivity() {
        private val myService = MyService()
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
      //        链接服务器
      //        创建订阅
            subscribe_btn.setOnClickListener {
                myService.subscribe(PUBLISH_TOPIC, 1)
            }
      //        发布消息
            publish_btn.setOnClickListener {
                myService.publish(PUBLISH_TOPIC,"你好你好你好",1,false)
            }
        }
      
        override fun onStart() {
            super.onStart()
            val intent = Intent(this,MyService::class.java)
            startService(intent)
        }
      }