Scala与Golang的并发实现思路

Scala语言并发设计采用Actor模型,借鉴了Erlang的Actor实现,并且在Scala 2.10之后,Scala采用的是Akka Actor模型库。

Actor模型主要特征如下:

  • “一切皆是参与者”,且各个actor间是独立的;
  • 发送者与已发送消息间解耦,这是Actor模型显著特点,据此实现异步通信;
  • actor是封装状态和行为的对象,通过消息交换进行相互通信,交换的消息存放在接收方的邮箱中;
  • actor可以有父子关系,父actor可以监管子actor,子actor唯一的监管者就是父actor;
  • 一个actor就是一个容器,它包含了状态、行为、一个邮箱(邮箱用来接受消息)、子actor和一个监管策略;


    Go语言也能够实现传统的共享内存的通信方式,但Go更提倡“以通信来共享内存,而非以共享内存来通信”。Go的并发通信方式借鉴CSP(Communicating Sequential Process)模型,其主要特征如下:

  • goroutine(协程,Go的轻量级线程)是Go的轻量级线程管理机制,用“go”启动一个goroutine, 如果当前线程阻塞则分配一个空闲线程,如果没有空闲线程,则新建一个线程;

  • 通过管道(channel)来存放消息,channel在goroutine之间传递消息;比如通过读取channel里的消息(通俗点说好比一个个“值”),你能够明白某个goroutine里的任务完成以否;
  • Go给channel做了增强,可带缓存。

Scala与Go在并发通信模型实现上的主要差异如下:

  • actor是异步的,因为发送者与已发送消息间实现了解耦;而channel则是某种意义上的同步,比如channel的读写是有关系的,期间会依赖对方来决定是否阻塞自己;
  • actor是一个容器,使用actorOf来创建Actor实例时,也就意味着需指定具体Actor实例,即指定哪个actor在执行任务,该actor必然要有“身份”标识,否则怎么指定呢?!而channel通常是匿名的, 任务放进channel之后你不用关心是哪个channel在执行任务

实例说明

我们来看一个例子:对一组连续序列(1-10000)的整数值进行累加,分别观察Scala与Go环境下单线程与多线程效率,一方面了解并发效率的提升;一方面也能够对比Scala与Go并发实现的差异 ── 这才是本文的重点。具体要求如下:
对1 - 10000的整数进行累加,在并发条件下,我们将1 - 10000平均划分为四部分,启动四个线程进行并发计算,之后将四个线程的运行结果相加得出最终的累加统计值。为了更明显地观察到时间上的差异性,在每部分的每次计算过程中,我们添加一个3000000次的空循环:)

Scala实现

以下先列出Scala Akka Actor并发实现的完整示例代码:

  1. // Akka并发计算实例
  2. import akka.actor.Actor
  3. import akka.actor.Props
  4. import akka.actor.ActorSystem
  5. import akka.routing.RoundRobinPool
  6. // 定义一个case类
  7. sealed trait SumTrait
  8. case class Result(value: Int) extends SumTrait
  9. // 计算用的Actor
  10. class SumActor extends Actor {
  11. val RANGE = 10000
  12. def calculate(start: Int, end: Int, flag : String): Int = {
  13. var cal = 0
  14. for (i <- (start to end)) {
  15. for (j <- 1 to 3000000) {}
  16. cal += i
  17. }
  18. println("flag : " + flag + ".")
  19. return cal
  20. }
  21. def receive = {
  22. case value: Int =>
  23. sender ! Result(calculate((RANGE / 4) * (value - 1) + 1, (RANGE / 4) * value, value.toString))
  24. case _ => println("未知 in SumActor...")
  25. }
  26. }
  27. // 打印结果用的Actor
  28. class PrintActor extends Actor {
  29. def receive = {
  30. case (sum: Int, startTime: Long) =>
  31. println("总数为:" + sum + ";所花时间为:"
  32. + (System.nanoTime() - startTime)/1000000000.0 + "秒。")
  33. case _ => println("未知 in PrintActor...")
  34. }
  35. }
  36. // 主actor,发送计算指令给SumActor,发送打印指令给PrintActor
  37. class MasterActor extends Actor {
  38. var sum = 0
  39. var count = 0
  40. var startTime: Long = 0
  41. // 声明Actor实例,nrOfInstances是pool里所启routee(SumActor)的数量,
  42. // 这里用4个SumActor来同时计算,很Powerful。
  43. val sumActor = context.actorOf(Props[SumActor]
  44. .withRouter(RoundRobinPool(nrOfInstances = 4)), name = "sumActor")
  45. val printActor = context.actorOf(Props[PrintActor], name = "printActor")
  46. def receive = {
  47. case "calculate..." =>
  48. startTime = System.nanoTime()
  49. for (i <- 1 to 4) sumActor ! i
  50. case Result(value) =>
  51. sum += value
  52. count += 1
  53. if (count == 4) {
  54. printActor ! (sum, startTime)
  55. context.stop(self)
  56. }
  57. case _ => println("未知 in MasterActor...")
  58. }
  59. }
  60. object Sum {
  61. def main(args: Array[String]): Unit = {
  62. var sum = 0
  63. val system = ActorSystem("MasterActorSystem")
  64. val masterActor = system.actorOf(Props[MasterActor], name = "masterActor")
  65. masterActor ! "calculate..."
  66. Thread.sleep(5000)
  67. system.shutdown()
  68. }
  69. }

在这里我们设计了3个Actor实例,在这里,我们一共定义了 三个Actor实例(actor),MasterActor、SumActor和PrintActor,其中,前者是后两者的父亲actor,如前文Scala的Actor模型特征里提到的:“actor可以有父子关系,父actor可以监管子actor,子actor唯一的监管者就是父actor”。我们的主程序通过向MasterActor发送“calculate…”指令,启动整个计算过程,嗯哼,好戏开始登场了:)

注意以下代码:

  1. val sumActor = context.actorOf(Props[SumActor]
  2. .withRouter(RoundRobinPool(nrOfInstances = 4)), name = "sumActor")

这里的设置将会在线程池里初始化称为“routee”的子actor(这里是SumActor),数量为4,也就是我们需要4个SumActor实例参与并发计算。这一步很关键。

然后,在接受消息的模式匹配中,通过以下代码启动计算actor:

  1. for (i <- 1 to 4) sumActor ! i

在SumActor中,每个计算线程都会调用calculate方法,该方法将处理分段的整数累加,并返回分段累加值给父actor MasterActor,我们特地通过case类实现MasterActor接受消息中的一个模式匹配功能(case Result(value) =>…),可以发现,模式匹配在Scala并发功能实现中的地位非常重要,并大大提升了开发人员的开发效率。在这里,我们获取了4个并发过程返回的分段累加值,MasterActor会计算最终的累加值。如果4个并发过程全部完成,就调用PrintActor实例打印结果和所花时间。
在整个运算过程中,我们很容易理解发送者与已发送消息间的解耦特征,发送者和接受者各种关心自己要处理的任务即可,比如状态和行为处理、发送的时机与内容、接收消息的时机与内容等。当然,actor确实是一个“容器”,且“五脏俱全”:我们用类来封装,里面也封装了必须的逻辑方法。

Scala Akka的并发实现,给我的感觉是设计才是关键,将各个actor的功能及关联关系表述清楚,剩余的代码实现就非常容易,这正是Scala、Akka的魅力体现,在底层帮我们做了大量工作!

在这里的PrintActor实际上并无太大存在意义,因为它并不实现并发功能。实现它主要是为了演示actor间的消息传递与控制。

再来看看单线程的计算运行模式:

  1. ...
  2. val RANGE = 10000
  3. var cal = 0
  4. val startTime = System.nanoTime()
  5. for (i <- (1 to RANGE)) {
  6. for (j <- 1 to 3000000) {}
  7. cal += i
  8. }
  9. val endTime = System.nanoTime()
  10. ...

Go语言实现

仍然先列出Go语言实现的并发功能完整代码:

  1. // Go并发计算实例
  2. package main
  3. import (
  4. "fmt"
  5. "runtime"
  6. "strconv"
  7. "time"
  8. )
  9. type Sum []int
  10. func (s Sum) Calculate(count, start, end int, flag string, ch chan int) {
  11. cal := 0
  12. for i := start; i <= end; i++ {
  13. for j := 1; j <= 3000000; j++ {
  14. }
  15. cal += i
  16. }
  17. s[count] = cal
  18. fmt.Println("flag :", flag, ".")
  19. ch <- count
  20. }
  21. func (s Sum) LetsGo() {
  22. // runtime.NumCPU()可以获取CPU核数,我的环境为4核,所以这里就简单起见直接设为4了
  23. const NCPU = 4
  24. const RANGE = 10000
  25. var ch = make(chan int)
  26. runtime.GOMAXPROCS(NCPU)
  27. for i := 0; i < NCPU; i++ {
  28. go s.Calculate(i, (RANGE/NCPU)*i+1, (RANGE/NCPU)*(i+1), strconv.Itoa(i+1), ch)
  29. }
  30. for i := 0; i < NCPU; i++ {
  31. <-ch
  32. }
  33. }
  34. func main() {
  35. var s Sum = make([]int, 4, 4)
  36. var sum int = 0
  37. var startTime = time.Now()
  38. s.LetsGo()
  39. for _, v := range s {
  40. sum += v
  41. }
  42. fmt.Println("总数为:", sum, ";所花时间为:",
  43. (time.Now().Sub(startTime)), "秒。")
  44. }

Go语言的实现与之前的Scala实现风格完全不一样,其通过“go”关键字实现的goroutine协程工作方式,结合channel,实现并发功能。goroutine和channel是Go语言非常强大的两个招式,简约而不简单。由上可知,Go语言的并发魔力来源于goroutine和channel。我们定义了一个Sum类型(插一句:Go语言的类型系统设计得也非常特别,这是别的主题了,:)),它有两个方法:LetsGo()和Calculate,LetsGo()首先创建一个计数用的channel,随后发起4个并发计算的协程。每个计算协程调用Calculate()进行分段计算(并会传入channel),Calculate()方法的最后,在分段计算完成时,都会往channel里塞一个计数标志:

  1. ch <- count

总有某个协程抢先运行到此处,那么该协程对应的计数标志就塞进了channel,在channel里的计数标志未被读取之前,其他协程在处理完分段计算的业务逻辑之后,其他协程的计数标志是无法塞进channel里的,其他协程只能等待,因为channel在之前被塞进一个计数标志之后,标志一直未被读取出来,程序阻塞了。再看看以下代码:

  1. for i := 0; i < NCPU; i++ {
  2. <-ch
  3. }
  1. 协程在判断channel为空之后,会将它的计数标志塞进channel。如此循环,直至channel里的计数标志全被取出,则所有的协程都处理完毕了。另外,如果读取的channel里没东西了还继续读取它会怎样?那么,程序也会阻塞,直至有东西可读。

对于channel的写入、等待和读取,这里为了演示方便,且本例中的协程和业务逻辑也不至于会造成协程僵死或locked,因此未考虑协程永久等待的处理,如果要处理超时,可以这么考虑:

  1. for {
  2. select {
  3. case <-ch: ...
  4. case <-time.After(3 * time.Second): ...
  5. }
  6. }

在所有的分段计算结束后,就可以计算总的累加值了:

  1. for _, v := range s {
  2. sum += v
  3. }

这段代码从Sum类型实例中获取分段累加值,最后计算出总的累加统计值。

Go中的channel是可以带缓存的,在缓存未被填满之前,都可以写入。本例中未使用带缓存的channel,虽然这样做在理论上可以节省写入channel时的等待时间,但在这里可以忽略,大型应用中就要慎重对待了。

来看看单线程的计算运行模式:

  1. ...
  2. cal := 0
  3. for i := start; i <= end; i++ {
  4. for j := 1; j <= 3000000; j++ {
  5. }
  6. cal += i
  7. }
  8. ...

image.jpeg