DAG
hyperf/dag
是一个轻量级有向无环图 (Directed Acyclic Graph) 任务编排库。
场景
假设我们有一系列任务需要执行。
- 如果他们之间存在依赖关系,则可以将他们顺序执行。
- 如果他们并不相互依赖,那么我们可以选择并发执行,以加快执行速度。
- 两者间还存在中间状态:一部分任务存在依赖关系,而另一些任务又可以并发执行。
我们可以将第三种复杂的场景抽象成 DAG
来解决。
安装
composer require hyperf/dag
示例
假设我们有一系列任务,拓扑结构如上图所示,顶点代表任务,边缘代表依赖关系。(A 完成后才能完成 B、C、D,B 完成后才能完成 H、E、F…)
通过 hyperf/dag
可以使用如下方式构建 DAG
并执行。
<?php
$dag = new \Hyperf\Dag\Dag();
$a = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "A\n";});
$b = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "B\n";});
$c = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "C\n";});
$d = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "D\n";});
$e = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "E\n";});
$f = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "F\n";});
$g = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "G\n";});
$h = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "H\n";});
$i = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "I\n";});
$dag->addVertex($a)
->addVertex($b)
->addVertex($c)
->addVertex($d)
->addVertex($e)
->addVertex($f)
->addVertex($g)
->addVertex($h)
->addVertex($i)
->addEdge($a, $b)
->addEdge($a, $c)
->addEdge($a, $d)
->addEdge($b, $h)
->addEdge($b, $e)
->addEdge($b, $f)
->addEdge($c, $f)
->addEdge($c, $g)
->addEdge($d, $g)
->addEdge($h, $i)
->addEdge($e, $i)
->addEdge($f, $i)
->addEdge($g, $i);
// 需要在协程环境下执行
$dag->run();
输出:
// 1s 后
A
// 2s 后
D
C
B
// 3s 后
G
F
E
H
// 4s 后
I
DAG 会按照尽可能早的原则调度任务。尝试将 B 点的耗时调整为 2 秒,会发现 B 和 G 一起完成。
访问前步结果
每一个任务可以接收一个数组参数,数组中包含所有前置依赖的结果。DAG
执行完毕后,也会返回一个同样结构的数组,包含每一步的执行结果。
<?php
$dag = new \Hyperf\Dag\Dag();
$a = \Hyperf\Dag\Vertex::make(function() {return 1;});
$b = \Hyperf\Dag\Vertex::make(function($results) use ($a) {
return $results[$a->key] + 1;
});
$results = $dag->addVertex($a)->addVertex($b)->addEdge($a, $b)->run();
assert($results[$a->key] === 1);
assert($results[$b->key] === 2);
定义一个任务
在上述文档中,我们使用了闭包来定义一个任务。格式如下。
// Vertex::make 的第二个参数为可选参数,作为 vertex 的 key,也就是结果数组的键值。
\Hyperf\Dag\Vertex::make(function() { return 'hello'; }, "greeting");
除了使用闭包函数定义任务外,还可以使用实现了 \Hyperf\Dag\Runner
接口的类来定义,并通过 Vertex::of
将其转化为一个顶点。
class MyJob implements \Hyperf\Dag\Runner {
public function run($results = []) {
return 'hello';
}
}
\Hyperf\Dag\Vertex::of(new MyJob(), "greeting");
\Hyperf\Dag\Dag
本身也实现了 \Hyperf\Dag\Runner
接口,所以可以嵌套使用。
<?php
// 命名空间已省略
$a = Vertex::make(function () { return 1;});
$b = Vertex::make(function () { return 2;});
$c = Vertex::make(function () { return 3;});
$nestedDag = new Dag();
$nestedDag->addVertex($a)->addVertex($b)->addEdge($a, $b);
$d = Vertex::of($nestedDag);
$superDag = new Dag();
$superDag->addVertex($c)->addVertex($d)->addEdge($c, $d);
$superDag->run();
控制并发数
\Hyperf\Dag\Dag
类提供了 setConcurrency(int n)
方法控制最大并发数。默认为 10。