19讲如何⽤协程来优化多线程业务
你好,我是刘超。
近⼀两年,国内很多互联⽹公司开始使⽤或转型Go语⾔,其中⼀个很重要的原因就是Go语⾔优越的性能表现,⽽这个优势与
Go实现的轻量级线程Goroutines(协程Coroutine)不⽆关系。那么Go协程的实现与Java线程的实现有什么区别呢?
线程实现模型
了解协程和线程的区别之前,我们不妨先来了解下底层实现线程⼏种⽅式,为后⾯的学习打个基础。
实现线程主要有三种⽅式:轻量级进程和内核线程⼀对⼀相互映射实现的1:1线程模型、⽤户线程和内核线程实现的N:1线程模型以及⽤户线程和轻量级进程混合实现的N:M线程模型。
1:1线程模型
以上我提到的内核线程(Kernel-Level Thread, KLT)是由操作系统内核⽀持的线程,内核通过调度器对线程进⾏调度,并负责完成线程的切换。
我们知道在Linux操作系统编程中,往往都是通过fork()函数创建⼀个⼦进程来代表⼀个内核中的线程。⼀个进程调⽤fork()函数后,系统会先给新的进程分配资源,例如,存储数据和代码的空间。然后把原来进程的所有值都复制到新的进程中,只有少数值与原来进程的值(⽐如PID)不同,这相当于复制了⼀个主进程。
采⽤fork()创建⼦进程的⽅式来实现并⾏运⾏,会产⽣⼤量冗余数据,即占⽤⼤量内存空间,⼜消耗⼤量CPU时间⽤来初始化内存空间以及复制数据。
如果是⼀份⼀样的数据,为什么不共享主进程的这⼀份数据呢?这时候轻量级进程(Light Weight Process,即LWP)出现了。
相对于fork()系统调⽤创建的线程来说,LWP使⽤clone()系统调⽤创建线程,该函数是将部分⽗进程的资源的数据结构进⾏复制,复制内容可选,且没有被复制的资源可以通过指针共享给⼦进程。因此,轻量级进程的运⾏单元更⼩,运⾏速度更快。
LWP是跟内核线程⼀对⼀映射的,每个LWP都是由⼀个内核线程⽀持。
N:1线程模型
1:1线程模型由于跟内核是⼀对⼀映射,所以在线程创建、切换上都存在⽤户态和内核态的切换,性能开销⽐较⼤。除此之外,它还存在局限性,主要就是指系统的资源有限,不能⽀持创建⼤量的LWP。
N:1线程模型就可以很好地解决1:1线程模型的这两个问题。
该线程模型是在⽤户空间完成了线程的创建、同步、销毁和调度,已经不需要内核的帮助了,也就是说在线程创建、同步、销毁的过程中不会产⽣⽤户态和内核态的空间切换,因此线程的操作⾮常快速且低消耗。
N:M线程模型
N:1线程模型的缺点在于操作系统不能感知⽤户态的线程,因此容易造成某⼀个线程进⾏系统调⽤内核线程时被阻塞,从⽽导致整个进程被阻塞。
N:M线程模型是基于上述两种线程模型实现的⼀种混合线程管理模型,即⽀持⽤户态线程通过LWP与内核线程连接,⽤户态的线程数量和内核态的LWP数量是N:M的映射关系。
了解完这三个线程模型,你就可以清楚地了解到Go协程的实现与Java线程的实现有什么区别了。
JDK 1.8 Thread.java 中 Thread#start ⽅法的实现,实际上是通过Native调⽤start0⽅法实现的;在Linux下, JVM Thread的实现是基于pthread_create实现的,⽽pthread_create实际上是调⽤了clone()完成系统调⽤创建线程的。
所以,⽬前Java在Linux操作系统下采⽤的是⽤户线程加轻量级线程,⼀个⽤户线程映射到⼀个内核线程,即1:1线程模型。由于线程是通过内核调度,从⼀个线程切换到另⼀个线程就涉及到了上下⽂切换。
⽽Go语⾔是使⽤了N:M线程模型实现了⾃⼰的调度器,它在N个内核线程上多路复⽤(或调度)M个协程,协程的上下⽂切换是在⽤户态由协程调度器完成的,因此不需要陷⼊内核,相⽐之下,这个代价就很⼩了。
协程的实现原理
协程不只在Go语⾔中实现了,其实⽬前⼤部分语⾔都实现了⾃⼰的⼀套协程,包括C#、erlang、python、lua、javascript、
ruby等。
相对于协程,你可能对进程和线程更为熟悉。进程⼀般代表⼀个应⽤服务,在⼀个应⽤服务中可以创建多个线程,⽽协程与进程、线程的概念不⼀样,我们可以将协程看作是⼀个类函数或者⼀块函数中的代码,我们可以在⼀个主线程⾥⾯轻松创建多个协程。
程序调⽤协程与调⽤函数不⼀样的是,协程可以通过暂停或者阻塞的⽅式将协程的执⾏挂起,⽽其它协程可以继续执⾏。这⾥的挂起只是在程序中(⽤户态)的挂起,同时将代码执⾏权转让给其它协程使⽤,待获取执⾏权的协程执⾏完成之后,将从挂起点唤醒挂起的协程。 协程的挂起和唤醒是通过⼀个调度器来完成的。
结合下图,你可以更清楚地了解到基于N:M线程模型实现的协程是如何⼯作的。
假设程序中默认创建两个线程为协程使⽤,在主线程中创建协程ABCD…,分别存储在就绪队列中,调度器⾸先会分配⼀个⼯作线程A执⾏协程A,另外⼀个⼯作线程B执⾏协程B,其它创建的协程将会放在队列中进⾏排队等待。
当协程A调⽤暂停⽅法或被阻塞时,协程A会进⼊到挂起队列,调度器会调⽤等待队列中的其它协程抢占线程A执⾏。当协程A
被唤醒时,它需要重新进⼊到就绪队列中,通过调度器抢占线程,如果抢占成功,就继续执⾏协程A,失败则继续等待抢占线程。
相⽐线程,协程少了由于同步资源竞争带来的CPU上下⽂切换,I/O密集型的应⽤⽐较适合使⽤,特别是在⽹络请求中,有较
多的时间在等待后端响应,协程可以保证线程不会阻塞在等待⽹络响应中,充分利⽤了多核多线程的能⼒。⽽对于CPU密集型的应⽤,由于在多数情况下CPU都⽐较繁忙,协程的优势就不是特别明显了。
Kilim协程框架
虽然这么多的语⾔都实现了协程,但⽬前Java原⽣语⾔暂时还不⽀持协程。不过你也不⽤泄⽓,我们可以通过协程框架在
Java中使⽤协程。
⽬前Kilim协程框架在Java中应⽤得⽐较多,通过这个框架,开发⼈员就可以低成本地在Java中使⽤协程了。
在Java中引⼊ Kilim ,和我们平时引⼊第三⽅组件不太⼀样,除了引⼊jar包之外,还需要通过Kilim提供的织⼊(Weaver)⼯具对Java代码编译⽣成的字节码进⾏增强处理,⽐如,识别哪些⽅式是可暂停的,对相关的⽅法添加上下⽂处理。通常有以 下四种⽅式可以实现这种织⼊操作:
在编译时使⽤maven插件;
在运⾏时调⽤kilim.tools.Weaver⼯具;
在运⾏时使⽤kilim.tools.Kilim invoking调⽤Kilim的类⽂件;
在main函数添加 if (kilim.tools.Kilim.trampoline(false,args)) return。
Kilim框架包含了四个核⼼组件,分别为:任务载体(Task)、任务上下⽂(Fiber)、任务调度器(Scheduler)以及通信载体
(Mailbox)。
Task对象主要⽤来执⾏业务逻辑,我们可以把这个⽐作多线程的Thread,与Thread类似,Task中也有⼀个run⽅法,不过在
Task中⽅法名为execute,我们可以将协程⾥⾯要做的业务逻辑操作写在execute⽅法中。
与Thread实现的线程⼀样,Task实现的协程也有状态,包括:Ready、Running、Pausing、Paused以及Done总共五种。
Task对象被创建后,处于Ready状态,在调⽤execute()⽅法后,协程处于Running状态,在运⾏期间,协程可以被暂停,暂停中的状态为Pausing,暂停后的状态为Paused,暂停后的协程可以被再次唤醒。协程正常结束后的状态为Done。
Fiber对象与Java的线程栈类似,主要⽤来维护Task的执⾏堆栈,Fiber是实现N:M线程映射的关键。
Scheduler是Kilim实现协程的核⼼调度器,Scheduler负责分派Task给指定的⼯作者线程WorkerThread执⾏,⼯作者线程
WorkerThread默认初始化个数为机器的CPU个数。
Mailbox对象类似⼀个邮箱,协程之间可以依靠邮箱来进⾏通信和数据共享。协程与线程最⼤的不同就是,线程是通过共享内
存来实现数据共享,⽽协程是使⽤了通信的⽅式来实现了数据共享,主要就是为了避免内存共享数据⽽带来的线程安全问题。
协程与线程的性能⽐较
接下来,我们通过⼀个简单的⽣产者和消费者的案例,来对⽐下协程和线程的性能。可通过 Github 下载本地运⾏代码。
Java多线程实现源码:
public class MyThread { private static Integer count = 0;// private static final Integer FULL = 10; //最⼤⽣产数量private static String LOCK = “lock”; //资源锁 public static void main(String[] args) { MyThread test1 = new MyThread(); long start = System.currentTimeMillis(); List for (int i = 0; i < 1000; i++) {//创建五个⽣产者线程Thread thread = new Thread(test1.new Producer()); thread.start(); list.add(thread); } for (int i = 0; i < 1000; i++) {//创建五个消费者线程Thread thread = new Thread(test1.new Consumer()); thread.start(); list.add(thread); } try { for (Thread thread : list) { thread.join();//等待所有线程执⾏完 } } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(“⼦线程执⾏时⻓:” + (end - start)); } //⽣产者 class Producer implements Runnable { |
---|
public void run() { for (int i = 0; i < 10; i++) { synchronized (LOCK) { while (count == FULL) {//当数量满了时 try { LOCK.wait(); } catch (Exception e) { e.printStackTrace(); } } count++; System.out.println(Thread.currentThread().getName() + “⽣产者⽣产,⽬前总共有” + count); LOCK.notifyAll(); } } } } //消费者 class Consumer implements Runnable { public void run() { for (int i = 0; i < 10; i++) { synchronized (LOCK) { while (count == 0) {//当数量为零时 try { LOCK.wait(); } catch (Exception e) { } } count—; System.out.println(Thread.currentThread().getName() + “消费者消费,⽬前总共有” + count); LOCK.notifyAll(); } } } } } |
---|
Kilim协程框架实现源码:
public class Coroutine {
static Map
if (kilim.tools.Kilim.trampoline(false,args)) return; Properties propes = new Properties();
propes.setProperty(“kilim.Scheduler.numThreads”, “1”);//设置⼀个线程
System.setProperties(propes);
long startTime = System.currentTimeMillis(); for (int i = 0; i < 1000; i++) {//创建⼀千⽣产者
Mailbox
mailMap.put(i, mb);
}
for (int i = 0; i < 1000; i++) {//创建⼀千个消费者new Consumer(mailMap.get(i)).start();
}
Task.idledown();//开始运⾏
long endTime = System.currentTimeMillis();
System.out.println( Thread.currentThread().getName() + “总计花费时⻓:” + (endTime- startTime));
}
}
;
//⽣产者
public class Producer extends Task