Handler && Looper
looper
- 每个线程有且最多只能有一个Looper对象,它是一个ThreadLocal
- Looper内部有一个消息队列,loop()方法调用后线程开始不断从队列中取出消息执行
- 消息列表采用单链表结构
- Looper使一个线程变成Looper线程。
class HandlerActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_handler)
Thread(SampleTask(MyHandler())).start()
}
fun updateTextView(value: String) {
textView3.text = value
}
inner class MyHandler : Handler() {
override fun handleMessage(msg: Message) {
val result = msg.data.getString("message")
updateTextView(result)
}
}
}
- prepareMainLooper方法给主线程创建looper,也提供了getMainLooper方法来获取主线程的Looper
- Looper提供了两种方法退出
- quit—会直接退出Looper
- quitSafely—只是设定一个退出标志,把消息队列里面的已有消息处理完毕后才安全退出
- 在子线程如果手动创建了Looper,事情完成之后应该调用quit方法终止消息循环
- 不退出,该子线程会一直处于等待状态
- 退出后线程会立即终止
- Looper最重要的方法是loop方法,只有调用了loop方法之后消息循环系统才会真正起作用
- loop方法会调用MessageQueue的next方法来获取新消息,而next是一个阻塞操作,当没有消息时,next方法会一直阻塞在那里,这也导致loop方法一直阻塞在那里
- 如果MessageQueue的next方法返回了新消息,Looper就会处理这条消息
public static void loop() {
//...
final Looper me = myLooper();
final MessageQueue queue = me.mQueue;
//..
for (;;) {
Message msg = queue.next(); // might block
msg.target.dispatchMessage(msg); //msg.target是发送这条消息的Handler对象
}
}
/**
* Handler.java
* Handle system messages here.
*/
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
通过handler引用
class SampleTask(val handler: Handler) : Runnable {
override fun run() {
try {
Thread.sleep(5000)
val msg = prepareMessage("task completed!")
handler.sendMessage(msg)
} catch (e: InterruptedException) {
Log.d(TAG, "interrupted!")
}
}
private fun prepareMessage(string: String): Message {
val result = handler.obtainMessage()
val data = Bundle()
data.putString("message", string)
result.data = data
return result
}
companion object {
const val TAG = "SampleTask"
}
}
handler
- handler扮演了往MQ上添加消息和处理消息的角色(只处理由自己发出的消息)
- 通知MQ它要执行一个任务(sendMessage),并在loop到自己的时候执行该任务(handleMessage),整个过程是异步的
- 一个线程可以有多个handler, 但是只能有一个looper
class LooperThread : Thread() {
private lateinit var handler1: Handler
private lateinit var handler2: Handler
override fun run() {
// 将当前线程初始化为Looper线程
Looper.prepare()
handler1 = Handler()
handler2 = Handler()
// 开始循环处理消息队列
Looper.loop()
}
}
handler可以在任意线程发送消息,这些消息会被添加到关联的MQ上。
handler是在它关联的looper线程中处理消息的。
在activity中创建handler并将其引用传递给worker thread,worker thread执行完任务后使用handler发送消息通知activity更新UI
epoll
select、poll、epoll都是IO多路复用
- select-select支持的文件描述符数量太小了,默认是1024, 轮询
- poll- 没有最大连接数,基于链表,轮询
- epoll- callback
epoll
- epoll_create(): 创建一个epoll实例并返回相应的文件描述符
- epoll_ctl(): 注册相关的文件描述符使用
- epoll_wait(): 可以用于等待IO事件。如果当前没有可用的事件,这个函数会阻塞调用线程
handler创建epoll
- 调用关系
- MessageQueue初始化->nativeInit()->NativeMessageQueue()->Looper(false)->epoll_create->epoll_ctl
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not create wake pipe. errno=%d", errno);
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
...
mIdling = false;
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance. errno=%d", errno);
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;//监听管道的read()操作
eventItem.data.fd = mWakeReadPipeFd;//记录管道读端的fd
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
...
}
Handler中looper通过轮询获取message
- Looper.loop中for循环获取消息
- Message msg = queue.next();->nativePollOnce->nativeMessageQueue->pollOnce->pollInner()
int Looper::pollInner(int timeoutMillis) {
...
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mIdling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
//等待事件发生或者超时,在nativeWake()方法,向管道写端写入字符,则该方法会返回;
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mIdling = false;
// Acquire lock.
mLock.lock();
...
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();//
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
...
}
}
Done: ;
...
return result;
}
- 最终调用awaken
void Looper::awoken() {
char buffer[16];
ssize_t nRead;
do {
//不断读取管道数据,目的就是为了清空管道内容
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
- 总结
- Looper::loop中for循环不断调用next()从自己的链表中获取message,但是获取前先nativePollOnce进入阻塞状态,线程会释放资源进入休眠状态,直到下个消息到达或者有事务发生,通过往pipe管道写端写入数据来唤醒主线程工作
handler通过sendmessage最终给管道写数据
将消息push到MessageQueue中时候,即MessageQueue.enqueueMessages(…)方法中
- handler.sendMessage(msg)
- sendMessageDelayed(msg, 0);
- sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
- enqueueMessage(queue, msg, uptimeMillis)
- queue.enqueueMessage(msg, uptimeMillis);先入列后调用wake
- nativeWake(mPtr);—->
- Looper->wake
- write()
- Looper->wake
- nativeWake(mPtr);—->
- queue.enqueueMessage(msg, uptimeMillis);先入列后调用wake
- enqueueMessage(queue, msg, uptimeMillis)
- sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
- sendMessageDelayed(msg, 0);
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);//进行了写操作
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
- handler.postDelayed(runnable, time)
- sendMessageDelayed(getPostMessage(r), delayMillis);
- 同上
- sendMessageDelayed(getPostMessage(r), delayMillis);
handler.postDelay并不是先等待一定的时间再放入到MessageQueue中,而是直接进入MessageQueue,以MessageQueue的时间顺序排列和唤醒的方式结合实现的
- MessageQueue的next()方法,会根据其delay时间和链表头的比较,如果更短则,放入链表头,并且看时间是否有delay,
- 如果有,则block,等待时间到来唤醒执行,
- Looper::pollInner(int timeoutMillis)
- epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
- Looper::pollInner(int timeoutMillis)
- 否则将唤醒立即执行。
- 如果有,则block,等待时间到来唤醒执行,
- epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis)
- timeoutMillis == -1, causes epoll_wait() to block indefinitely,
- timeoutMillis == 0, cause epoll_wait() to return immediately
- MessageQueue的next()方法,会根据其delay时间和链表头的比较,如果更短则,放入链表头,并且看时间是否有delay,
发送消息, MessageQueue.enqueueMessage(Message msg, long when)会根据时间把message插入到合适位置
- sendMessageAtFrontOfQueue //插入到队列首部
- enqueueMessage(queue, msg, 0); //最后的参数uptimeMillis是0,保证了在队首
- postDelayed
- sendMessageDelayed(getPostMessage(r), delayMillis)
- sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); //boot 2 now+delay
- enqueueMessage(queue, msg, uptimeMillis);
- sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); //boot 2 now+delay
- sendMessageDelayed(getPostMessage(r), delayMillis)
- sendMessage(Message msg)
- sendMessageDelayed(msg, 0)
- sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
- //同上
- sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
- sendMessageDelayed(msg, 0)
- sendMessageAtFrontOfQueue //插入到队列首部
参考:
handler&&epoll
https://blog.csdn.net/qingtiantianqing/article/details/72783952
https://www.jianshu.com/p/44b322dfc040