Handler && Looper

looper

  • 每个线程有且最多只能有一个Looper对象,它是一个ThreadLocal
  • Looper内部有一个消息队列,loop()方法调用后线程开始不断从队列中取出消息执行
    • 消息列表采用单链表结构
  • Looper使一个线程变成Looper线程。
  1. class HandlerActivity : AppCompatActivity() {
  2. override fun onCreate(savedInstanceState: Bundle?) {
  3. super.onCreate(savedInstanceState)
  4. setContentView(R.layout.activity_handler)
  5. Thread(SampleTask(MyHandler())).start()
  6. }
  7. fun updateTextView(value: String) {
  8. textView3.text = value
  9. }
  10. inner class MyHandler : Handler() {
  11. override fun handleMessage(msg: Message) {
  12. val result = msg.data.getString("message")
  13. updateTextView(result)
  14. }
  15. }
  16. }
  • prepareMainLooper方法给主线程创建looper,也提供了getMainLooper方法来获取主线程的Looper
  • Looper提供了两种方法退出
    • quit—会直接退出Looper
    • quitSafely—只是设定一个退出标志,把消息队列里面的已有消息处理完毕后才安全退出
    • 在子线程如果手动创建了Looper,事情完成之后应该调用quit方法终止消息循环
      • 不退出,该子线程会一直处于等待状态
      • 退出后线程会立即终止
  • Looper最重要的方法是loop方法,只有调用了loop方法之后消息循环系统才会真正起作用
    • loop方法会调用MessageQueue的next方法来获取新消息,而next是一个阻塞操作,当没有消息时,next方法会一直阻塞在那里,这也导致loop方法一直阻塞在那里
    • 如果MessageQueue的next方法返回了新消息,Looper就会处理这条消息
  1. public static void loop() {
  2. //...
  3. final Looper me = myLooper();
  4. final MessageQueue queue = me.mQueue;
  5. //..
  6. for (;;) {
  7. Message msg = queue.next(); // might block
  8. msg.target.dispatchMessage(msg); //msg.target是发送这条消息的Handler对象
  9. }
  10. }
  11. /**
  12. * Handler.java
  13. * Handle system messages here.
  14. */
  15. public void dispatchMessage(Message msg) {
  16. if (msg.callback != null) {
  17. handleCallback(msg);
  18. } else {
  19. if (mCallback != null) {
  20. if (mCallback.handleMessage(msg)) {
  21. return;
  22. }
  23. }
  24. handleMessage(msg);
  25. }
  26. }

通过handler引用

  1. class SampleTask(val handler: Handler) : Runnable {
  2. override fun run() {
  3. try {
  4. Thread.sleep(5000)
  5. val msg = prepareMessage("task completed!")
  6. handler.sendMessage(msg)
  7. } catch (e: InterruptedException) {
  8. Log.d(TAG, "interrupted!")
  9. }
  10. }
  11. private fun prepareMessage(string: String): Message {
  12. val result = handler.obtainMessage()
  13. val data = Bundle()
  14. data.putString("message", string)
  15. result.data = data
  16. return result
  17. }
  18. companion object {
  19. const val TAG = "SampleTask"
  20. }
  21. }

handler

  • handler扮演了往MQ上添加消息和处理消息的角色(只处理由自己发出的消息)
  • 通知MQ它要执行一个任务(sendMessage),并在loop到自己的时候执行该任务(handleMessage),整个过程是异步的
  • 一个线程可以有多个handler, 但是只能有一个looper
  1. class LooperThread : Thread() {
  2. private lateinit var handler1: Handler
  3. private lateinit var handler2: Handler
  4. override fun run() {
  5. // 将当前线程初始化为Looper线程
  6. Looper.prepare()
  7. handler1 = Handler()
  8. handler2 = Handler()
  9. // 开始循环处理消息队列
  10. Looper.loop()
  11. }
  12. }

handler可以在任意线程发送消息,这些消息会被添加到关联的MQ上。

handler1.png

handler是在它关联的looper线程中处理消息的。

handler2.png

在activity中创建handler并将其引用传递给worker thread,worker thread执行完任务后使用handler发送消息通知activity更新UI

handler3.png

epoll

  • select、poll、epoll都是IO多路复用

    • select-select支持的文件描述符数量太小了,默认是1024, 轮询
    • poll- 没有最大连接数,基于链表,轮询
    • epoll- callback
  • epoll

    • epoll_create(): 创建一个epoll实例并返回相应的文件描述符
    • epoll_ctl(): 注册相关的文件描述符使用
    • epoll_wait(): 可以用于等待IO事件。如果当前没有可用的事件,这个函数会阻塞调用线程

handler创建epoll

  • 调用关系
    • MessageQueue初始化->nativeInit()->NativeMessageQueue()->Looper(false)->epoll_create->epoll_ctl
  1. Looper::Looper(bool allowNonCallbacks) :
  2. mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
  3. mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
  4. int wakeFds[2];
  5. int result = pipe(wakeFds);
  6. LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
  7. mWakeReadPipeFd = wakeFds[0];
  8. mWakeWritePipeFd = wakeFds[1];
  9. ...
  10. mIdling = false;
  11. // Allocate the epoll instance and register the wake pipe.
  12. mEpollFd = epoll_create(EPOLL_SIZE_HINT);
  13. LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
  14. struct epoll_event eventItem;
  15. memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
  16. eventItem.events = EPOLLIN;//监听管道的read()操作
  17. eventItem.data.fd = mWakeReadPipeFd;//记录管道读端的fd
  18. result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
  19. ...
  20. }

Handler中looper通过轮询获取message

  • Looper.loop中for循环获取消息
    • Message msg = queue.next();->nativePollOnce->nativeMessageQueue->pollOnce->pollInner()
  1. int Looper::pollInner(int timeoutMillis) {
  2. ...
  3. // Poll.
  4. int result = POLL_WAKE;
  5. mResponses.clear();
  6. mResponseIndex = 0;
  7. // We are about to idle.
  8. mIdling = true;
  9. struct epoll_event eventItems[EPOLL_MAX_EVENTS];
  10. //等待事件发生或者超时,在nativeWake()方法,向管道写端写入字符,则该方法会返回;
  11. int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
  12. // No longer idling.
  13. mIdling = false;
  14. // Acquire lock.
  15. mLock.lock();
  16. ...
  17. for (int i = 0; i < eventCount; i++) {
  18. int fd = eventItems[i].data.fd;
  19. uint32_t epollEvents = eventItems[i].events;
  20. if (fd == mWakeReadPipeFd) {
  21. if (epollEvents & EPOLLIN) {
  22. awoken();//
  23. } else {
  24. ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
  25. }
  26. } else {
  27. ...
  28. }
  29. }
  30. Done: ;
  31. ...
  32. return result;
  33. }
  • 最终调用awaken
  1. void Looper::awoken() {
  2. char buffer[16];
  3. ssize_t nRead;
  4. do {
  5. //不断读取管道数据,目的就是为了清空管道内容
  6. nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
  7. } while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
  8. }
  • 总结
    • Looper::loop中for循环不断调用next()从自己的链表中获取message,但是获取前先nativePollOnce进入阻塞状态,线程会释放资源进入休眠状态,直到下个消息到达或者有事务发生,通过往pipe管道写端写入数据来唤醒主线程工作

handler通过sendmessage最终给管道写数据

将消息push到MessageQueue中时候,即MessageQueue.enqueueMessages(…)方法中

  • handler.sendMessage(msg)
    • sendMessageDelayed(msg, 0);
      • sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
        • enqueueMessage(queue, msg, uptimeMillis)
          • queue.enqueueMessage(msg, uptimeMillis);先入列后调用wake
            • nativeWake(mPtr);—->
              • Looper->wake
                • write()
  1. void Looper::wake() {
  2. #if DEBUG_POLL_AND_WAKE
  3. ALOGD("%p ~ wake", this);
  4. #endif
  5. ssize_t nWrite;
  6. do {
  7. nWrite = write(mWakeWritePipeFd, "W", 1);//进行了写操作
  8. } while (nWrite == -1 && errno == EINTR);
  9. if (nWrite != 1) {
  10. if (errno != EAGAIN) {
  11. ALOGW("Could not write wake signal, errno=%d", errno);
  12. }
  13. }
  14. }
  • handler.postDelayed(runnable, time)
    • sendMessageDelayed(getPostMessage(r), delayMillis);
      • 同上
  • handler.postDelay并不是先等待一定的时间再放入到MessageQueue中,而是直接进入MessageQueue,以MessageQueue的时间顺序排列和唤醒的方式结合实现的

    • MessageQueue的next()方法,会根据其delay时间和链表头的比较,如果更短则,放入链表头,并且看时间是否有delay,
      • 如果有,则block,等待时间到来唤醒执行,
        • Looper::pollInner(int timeoutMillis)
          • epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
      • 否则将唤醒立即执行。
    • epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis)
      • timeoutMillis == -1, causes epoll_wait() to block indefinitely,
      • timeoutMillis == 0, cause epoll_wait() to return immediately
  • 发送消息, MessageQueue.enqueueMessage(Message msg, long when)会根据时间把message插入到合适位置

    • sendMessageAtFrontOfQueue //插入到队列首部
      • enqueueMessage(queue, msg, 0); //最后的参数uptimeMillis是0,保证了在队首
    • postDelayed
      • sendMessageDelayed(getPostMessage(r), delayMillis)
        • sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); //boot 2 now+delay
          • enqueueMessage(queue, msg, uptimeMillis);
    • sendMessage(Message msg)
      • sendMessageDelayed(msg, 0)
        • sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
          • //同上

参考:
handler&&epoll
https://blog.csdn.net/qingtiantianqing/article/details/72783952
https://www.jianshu.com/p/44b322dfc040