熟悉ES6的同学可能知道,promise是JS在ES6中引入的新特性,其主要目的是将回调转变成链式调动。
当我们不确定 future 何时会完成时,可以会借助 Promise 许下一个 “承诺” ,它表示:在某个未来的时间点,一定能够得到值。
当然scala的promise和ES6的promise还是不一样的
基本使用
- 定义Promise对象
- 添加回调函数(成功的回调或者失败的都行)
- 为Promise对象赋值(成功值或者失败值)
注意:方法一赋值会抛出异常,方法二不会
import scala.concurrent.ExecutionContext.Implicits.global
val promise = Promise[String]
promise.future.foreach(str => print("我的祖国是: " + str))
promise.future.failed.foreach(e => println(e))
/*
赋值方法一:
使用success和failure。complete方式相当于success和failure的抽象
这种赋值方法有个问题就是我们的promise只允许赋值一次,重复赋值会被抛出异常
*/
promise.success("china")
promise.failure(new Exception("not china"))
promise.complete(Success[String]("china"))
promise.complete(Failure[String](new Exception("not china")))
/*
赋值方法二:
使用try方法赋值,这种赋值和方法一类似,但是这种方法赋值不会抛出异常
*/
promise.trySuccess("china")
promise.tryFailure(new Exception("not china"))
promise.tryComplete(Success("china"))
promise.tryComplete(Failure(new Exception("not china")))
Thread.sleep(3000)
Promise和future结合使用
PromisedInt 在这里充当着代理的作用。它承诺提供的值具体要由哪个 future 来计算并提供,程序的调用者可能并不关心:它也许是 intFuture ,也许是 IntFuture2 。因此,我们仅需要为代理( PromisedInt.future )设置回调函数,而不是其它的 future
import scala.concurrent.ExecutionContext.Implicits.global
// 当我们不确定 future 何时会完成时,可以会借助 Promise 许下一个 “承诺” ,它表示:在某个未来的时间点,一定能够得到值。
val promisedInt: Promise[Int] = Promise[Int]
// 然而,这个 Int 值的计算实际上委托给了其它的 future 来完成。受托的 Future 在计算完结果之后会调用该 promise 的 success 方法来 “兑现” 这个承诺
// promisedInt 承诺的值由 intFuture 真正实现
val intFuture = Future {
println("正在计算…")
println("执行此计算任务的线程是:" + Thread.currentThread().getName)
Thread.sleep(1000)
//一旦这样做,这个 promise 将和当前的 future 绑定。
promisedInt.success(300)
}
//考虑到异常情况,除了 success 方法, Promise 还提供了 failure , Complete 等方法。无论调用哪种方法,一个 Promise 都只能被使用一次。
// promisedInt.success(300)
// promisedInt.failure(new Exception(“可能的错误”))
// promisedInt.complete(Success(1))
//onComplete 回调函数中 “兑现” 它的返回值
promisedInt.future onComplete {
case Success(value) => println(value)
case _ => println("出现了意外的错误")
}
Thread.sleep(3000)
输出
正在计算…
执行此计算任务的线程是:scala-execution-context-global-11
300
Promise 的使用模式
基于回调函数的API转换
示例:自定义timeout
import scala.concurrent.ExecutionContext.Implicits.global
val timer = new Timer(true)
def timeout(t: Int) = {
val promisedUnit = Promise[Unit]
timer.schedule(new TimerTask {
override def run(): Unit = {
promisedUnit.success()
timer.cancel()
}
}, t)
promisedUnit.future
}
println("延时之前")
timeout(1000).foreach(_ => println("延时一秒"))
println("延时之后")
Thread.sleep(2000)
//输出
//延时之前
//延时之后
//延时一秒
扩展Future的方法
import scala.concurrent.ExecutionContext.Implicits.global
/**
* 接收一个future,生成一个FutureOps,FutureOps具有or方法
* 当future调用or方法的时候就会寻找该隐式类生成一个FutureOps
*
* @param self
* @tparam T
*/
implicit class FutureOps[T](val self: Future[T]) {
def or(that: Future[T]): Future[T] = {
val promisedT = Promise[T]
// 下面两个赋值,只能成功一个,哪个Future先完善,哪个成功
self.onComplete(x => promisedT tryComplete (x))
that.onComplete(y => promisedT tryComplete (y))
promisedT.future
}
}
val future1 = Future {
Thread.sleep(1000)
1
}
val future2 = Future(2)
future1.or(future2).foreach(println)
Thread.sleep(2000)
和Future通信,结束Future
type Cancellable[T] = (Promise[Unit], Future[T])
/**
* 这个方法接收首一个方法参数b,b接收一个future,返回一个T
* 我们在主线程里完善Future就可以在返回T的代码块里感知到
* 然后通过判断我们的Future是否完善来抛出异常来结束我们的异步程序
*
* @param b
* @tparam T
* @return
*/
def cancellable[T](b: Future[Unit] => T): Cancellable[T] = {
import scala.concurrent.ExecutionContext.Implicits.global
val cancel = Promise[Unit]
val f = Future {
val r = b(cancel.future)
// 在这里向cancel对象执行赋值操作,如果我们的cancel已经被赋值
// 如果我们正常结束,这里就不会被赋值
// 如果从外部调用cancel结束,那这里就会抛出异常
if (!cancel.tryFailure(new Exception)) {
throw new CancellationException
}
r
}
(cancel, f)
}
使用await进行同步
基本用法
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
Thread.sleep(1000)
1
}
// 使用await.ready,返回完善返回值的Future对象
Await.ready(future,Duration.apply(3,TimeUnit.SECONDS)).foreach(println)
// 使用await.result,返回Future里面的完善值
println(Await.result(future, Duration(3, TimeUnit.SECONDS)))
使用blocking语句,增加异步回调线程提高速度
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.blocking
//当前时间,单位是纳秒
val startTime = System.nanoTime()
// 如果异步程序数大于默认线程数,异步程序会分批次执行
val futures = for (- <- 0 until 10) yield Future {
Thread.sleep(1000)
}
// 如果异步程序里,使用block结构,会创建多余的线程执行程序,防止阻塞
// val futures = for (- <- 0 until 10) yield Future {
// blocking {
// Thread.sleep(1000)
// }
// }
for (f <- futures) Await.ready(f, Duration.Inf)
val endTime = System.nanoTime()
println((endTime - startTime) / 1000000)
例子
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import scala.concurrent.{Future, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
println("Step 1: Define a method which returns a Future")
import scala.concurrent.ExecutionContext.Implicits.global
def donutStock(donut: String): Int = {
if(donut == "vanilla donut") 10
else throw new IllegalStateException("Out of stock")
}
println(s"\nStep 2: Define a Promise of type Int")
val donutStockPromise = Promise[Int]()
println("\nStep 3: Define a future from Promise")
val donutStockFuture = donutStockPromise.future
donutStockFuture.onComplete {
case Success(stock) => println(s"Stock for vanilla donut = $stock")
case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e")
}
println("\nStep 4: Use Promise.success or Promise.failure to control execution of your future")
val donut = "vanilla donut"
if(donut == "vanilla donut") {
donutStockPromise.success(donutStock(donut))
} else {
donutStockPromise.failure(Try(donutStock(donut)).failed.get)
}
println("\nStep 5: Completing Promise using Promise.complete() method")
val donutStockPromise2 = Promise[Int]()
val donutStockFuture2 = donutStockPromise2.future
donutStockFuture2.onComplete {
case Success(stock) => println(s"Stock for vanilla donut = $stock")
case Failure(e) => println(s"Failed to find vanilla donut stock, exception = $e")
}
donutStockPromise2.complete(Try(donutStock("unknown donut")))
上面例子中我们使用了 Promise.success, Promise.failure, Promise.complete() 来控制程序的运行。