DAG
hyperf/dag 是一个轻量级有向无环图 (Directed Acyclic Graph) 任务编排库。
场景
假设我们有一系列任务需要执行。
- 如果他们之间存在依赖关系,则可以将他们顺序执行。
- 如果他们并不相互依赖,那么我们可以选择并发执行,以加快执行速度。
- 两者间还存在中间状态:一部分任务存在依赖关系,而另一些任务又可以并发执行。
我们可以将第三种复杂的场景抽象成 DAG 来解决。
安装
composer require hyperf/dag
示例
假设我们有一系列任务,拓扑结构如上图所示,顶点代表任务,边缘代表依赖关系。(A 完成后才能完成 B、C、D,B 完成后才能完成 H、E、F…)
通过 hyperf/dag 可以使用如下方式构建 DAG 并执行。
<?php$dag = new \Hyperf\Dag\Dag();$a = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "A\n";});$b = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "B\n";});$c = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "C\n";});$d = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "D\n";});$e = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "E\n";});$f = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "F\n";});$g = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "G\n";});$h = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "H\n";});$i = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "I\n";});$dag->addVertex($a)->addVertex($b)->addVertex($c)->addVertex($d)->addVertex($e)->addVertex($f)->addVertex($g)->addVertex($h)->addVertex($i)->addEdge($a, $b)->addEdge($a, $c)->addEdge($a, $d)->addEdge($b, $h)->addEdge($b, $e)->addEdge($b, $f)->addEdge($c, $f)->addEdge($c, $g)->addEdge($d, $g)->addEdge($h, $i)->addEdge($e, $i)->addEdge($f, $i)->addEdge($g, $i);// 需要在协程环境下执行$dag->run();
输出:
// 1s 后A// 2s 后DCB// 3s 后GFEH// 4s 后I
DAG 会按照尽可能早的原则调度任务。尝试将 B 点的耗时调整为 2 秒,会发现 B 和 G 一起完成。
访问前步结果
每一个任务可以接收一个数组参数,数组中包含所有前置依赖的结果。DAG 执行完毕后,也会返回一个同样结构的数组,包含每一步的执行结果。
<?php$dag = new \Hyperf\Dag\Dag();$a = \Hyperf\Dag\Vertex::make(function() {return 1;});$b = \Hyperf\Dag\Vertex::make(function($results) use ($a) {return $results[$a->key] + 1;});$results = $dag->addVertex($a)->addVertex($b)->addEdge($a, $b)->run();assert($results[$a->key] === 1);assert($results[$b->key] === 2);
定义一个任务
在上述文档中,我们使用了闭包来定义一个任务。格式如下。
// Vertex::make 的第二个参数为可选参数,作为 vertex 的 key,也就是结果数组的键值。\Hyperf\Dag\Vertex::make(function() { return 'hello'; }, "greeting");
除了使用闭包函数定义任务外,还可以使用实现了 \Hyperf\Dag\Runner 接口的类来定义,并通过 Vertex::of 将其转化为一个顶点。
class MyJob implements \Hyperf\Dag\Runner {public function run($results = []) {return 'hello';}}\Hyperf\Dag\Vertex::of(new MyJob(), "greeting");
\Hyperf\Dag\Dag 本身也实现了 \Hyperf\Dag\Runner 接口,所以可以嵌套使用。
<?php// 命名空间已省略$a = Vertex::make(function () { return 1;});$b = Vertex::make(function () { return 2;});$c = Vertex::make(function () { return 3;});$nestedDag = new Dag();$nestedDag->addVertex($a)->addVertex($b)->addEdge($a, $b);$d = Vertex::of($nestedDag);$superDag = new Dag();$superDag->addVertex($c)->addVertex($d)->addEdge($c, $d);$superDag->run();
控制并发数
\Hyperf\Dag\Dag 类提供了 setConcurrency(int n) 方法控制最大并发数。默认为 10。

