当我们需要同时执行多个任务、并且不会因为其中一个任务的执行时间漫长而陷入阻塞,就需要用到并发,也即多线程。Java中的并发实现主要在java.util.concurrent包中,Future这个概念其实Java里也有。Scala是在Java的基础上实现的函数式编程语言,所以Scala也可以调用Java API。今天会介绍Scala并发中应用比较多的两个概念——Future、ExecutionContext。

Future概念

Future表示未来,某线程执行一项操作,这个操作有延迟的话,Future会提供一系列方法来处理这个线程过程,可取消,可操作完成后执行其他操作等等。其实就相当于Java中的一个thread,只不过Future提供了回调函数,可以对线程中执行操作返回的数据进行处理,这样可以避免阻塞操作。
Future拥有两种状态:1.未完成:线程操作还未结束;2.已完成:操作操作完成,并且有返回值或者有异常。 当一个Future完成的时候,它就变成了一个不可变对象,永远不会被重写。

创建Future

val sumFuture = Future[Int] { var sum = 0 for(i <- Range(1,100000)) sum = sum + i sum }
如上图所示,使用Future这个object提供的apply方法,就可以创建一个Future。这里需要注意的是使用Future的apply时,需要一个隐式的ExecutionContext(下面再作介绍)

Future回调函数

Future提供了onComplete,onFailure,onSuccess三种回调。

  1. calFuture.onComplete {
  2. case Success(result) => println(result)
  3. case Failure(e) => println("error: " + e.getMessage)
  4. }
  5. successFuture.onSuccess {
  6. case num => println(num)
  7. }
  8. errorFuture.onFailure {
  9. case e => println(e.getMessage)
  10. }

ExecutionContext

ExecutionContext与Java中的线程池类似,都是用来管理线程的。Java中可以利用类Executors来定义线程池ExecutorService:
newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool、newScheduledThreadPool分别可以定义不同的线程池。而ExecutionContext提供了方法fromExecutorService,该方法的传参就是Java中的ExecutorService。

应用:Future、ExecutionContext、lock

并发时如果多个线程之间需要共享变量,则需要用到lock。并且用的时候为了避免死锁,需要及时释放lock

  1. private var lock= new ReentrantLock()
  2. implicit lazy val ec: ExecutionContextExecutorService =
  3. ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(conf.getDSHealthCheckThreadPoolSize))
  4. if (lock.tryLock()) {
  5. logInfo(s"lock is available")
  6. try {
  7. val future = Future[(Boolean, String)] {
  8. /* compiled code */
  9. }
  10. future.onSuccess {
  11. case (value1, value2) =>
  12. /* compiled code */
  13. }
  14. Await.result(future, Duration.Inf)
  15. } finally {
  16. lock.unlock()
  17. }
  18. }