DAG

hyperf/dag 是一个轻量级有向无环图 (Directed Acyclic Graph) 任务编排库。

场景

假设我们有一系列任务需要执行。

  • 如果他们之间存在依赖关系,则可以将他们顺序执行。
  • 如果他们并不相互依赖,那么我们可以选择并发执行,以加快执行速度。
  • 两者间还存在中间状态:一部分任务存在依赖关系,而另一些任务又可以并发执行。

我们可以将第三种复杂的场景抽象成 DAG 来解决。

安装

  1. composer require hyperf/dag

示例

DAG - 图1

假设我们有一系列任务,拓扑结构如上图所示,顶点代表任务,边缘代表依赖关系。(A 完成后才能完成 B、C、D,B 完成后才能完成 H、E、F…)

通过 hyperf/dag 可以使用如下方式构建 DAG 并执行。

  1. <?php
  2. $dag = new \Hyperf\Dag\Dag();
  3. $a = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "A\n";});
  4. $b = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "B\n";});
  5. $c = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "C\n";});
  6. $d = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "D\n";});
  7. $e = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "E\n";});
  8. $f = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "F\n";});
  9. $g = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "G\n";});
  10. $h = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "H\n";});
  11. $i = \Hyperf\Dag\Vertex::make(function() {sleep(1); echo "I\n";});
  12. $dag->addVertex($a)
  13. ->addVertex($b)
  14. ->addVertex($c)
  15. ->addVertex($d)
  16. ->addVertex($e)
  17. ->addVertex($f)
  18. ->addVertex($g)
  19. ->addVertex($h)
  20. ->addVertex($i)
  21. ->addEdge($a, $b)
  22. ->addEdge($a, $c)
  23. ->addEdge($a, $d)
  24. ->addEdge($b, $h)
  25. ->addEdge($b, $e)
  26. ->addEdge($b, $f)
  27. ->addEdge($c, $f)
  28. ->addEdge($c, $g)
  29. ->addEdge($d, $g)
  30. ->addEdge($h, $i)
  31. ->addEdge($e, $i)
  32. ->addEdge($f, $i)
  33. ->addEdge($g, $i);
  34. // 需要在协程环境下执行
  35. $dag->run();

输出:

  1. // 1s 后
  2. A
  3. // 2s 后
  4. D
  5. C
  6. B
  7. // 3s 后
  8. G
  9. F
  10. E
  11. H
  12. // 4s 后
  13. I

DAG 会按照尽可能早的原则调度任务。尝试将 B 点的耗时调整为 2 秒,会发现 B 和 G 一起完成。

访问前步结果

每一个任务可以接收一个数组参数,数组中包含所有前置依赖的结果。DAG 执行完毕后,也会返回一个同样结构的数组,包含每一步的执行结果。

  1. <?php
  2. $dag = new \Hyperf\Dag\Dag();
  3. $a = \Hyperf\Dag\Vertex::make(function() {return 1;});
  4. $b = \Hyperf\Dag\Vertex::make(function($results) use ($a) {
  5. return $results[$a->key] + 1;
  6. });
  7. $results = $dag->addVertex($a)->addVertex($b)->addEdge($a, $b)->run();
  8. assert($results[$a->key] === 1);
  9. assert($results[$b->key] === 2);

定义一个任务

在上述文档中,我们使用了闭包来定义一个任务。格式如下。

  1. // Vertex::make 的第二个参数为可选参数,作为 vertex 的 key,也就是结果数组的键值。
  2. \Hyperf\Dag\Vertex::make(function() { return 'hello'; }, "greeting");

除了使用闭包函数定义任务外,还可以使用实现了 \Hyperf\Dag\Runner 接口的类来定义,并通过 Vertex::of 将其转化为一个顶点。

  1. class MyJob implements \Hyperf\Dag\Runner {
  2. public function run($results = []) {
  3. return 'hello';
  4. }
  5. }
  6. \Hyperf\Dag\Vertex::of(new MyJob(), "greeting");

\Hyperf\Dag\Dag 本身也实现了 \Hyperf\Dag\Runner 接口,所以可以嵌套使用。

  1. <?php
  2. // 命名空间已省略
  3. $a = Vertex::make(function () { return 1;});
  4. $b = Vertex::make(function () { return 2;});
  5. $c = Vertex::make(function () { return 3;});
  6. $nestedDag = new Dag();
  7. $nestedDag->addVertex($a)->addVertex($b)->addEdge($a, $b);
  8. $d = Vertex::of($nestedDag);
  9. $superDag = new Dag();
  10. $superDag->addVertex($c)->addVertex($d)->addEdge($c, $d);
  11. $superDag->run();

控制并发数

\Hyperf\Dag\Dag 类提供了 setConcurrency(int n) 方法控制最大并发数。默认为 10。