1. <?php
    2. /**
    3. * Created by PhpStorm.
    4. * User: Administrator
    5. * Date: 2018/5/23 0023
    6. * Time: 下午 8:53
    7. */
    8. class Pool{
    9. protected $table;
    10. protected $mpid; //进程id
    11. public function __construct()
    12. {
    13. $this->mpid=getmypid();
    14. $this->table=$this->create_table(); //将进程pid跟任务内容进行绑定
    15. $this->run();
    16. $this->signal();//信号监听
    17. }
    18. //获取任务,创建进程
    19. protected function run(){
    20. $data=[
    21. '990979940@qq.com',
    22. '213123@qq.com',
    23. '457567657@qq.com'
    24. ];
    25. foreach ($data as $v){
    26. $process=$this->create_process();
    27. //pid=>990979940@qq.com pid=>任务内容
    28. $this->table->set($process->pid,['data'=>$v]);
    29. $process->write($v);
    30. }
    31. /*foreach($this->table as $row)
    32. {
    33. var_dump($row);
    34. }*/
    35. }
    36. //创建一个内存表
    37. protected function create_table(){
    38. $table=new swoole\table(1024);
    39. $table->column('attempts',swoole\table::TYPE_INT,2);
    40. $table->column('data',swoole\table::TYPE_STRING,64);
    41. $table->create();
    42. return $table;
    43. }
    44. /**
    45. * 创建子进程
    46. */
    47. protected function create_process(){
    48. $process=new swoole\Process([$this,'callback_function']);
    49. $process->start(); //启动子进程
    50. //异步读取管道消息
    51. swoole_event::add($process->pipe,function () use($process){
    52. $data=json_decode($process->read(),true);
    53. if($data['status']==false);
    54. swoole_event::del($process->pipe);
    55. echo '子进程发送的内容:'.PHP_EOL;
    56. var_dump($data);
    57. });
    58. return $process;
    59. }
    60. //子进程业务处理逻辑
    61. public function callback_function($worker){
    62. $res=$worker->read();
    63. $rand=mt_rand(1,2);
    64. for ($i=0;$i<20;$i++){
    65. echo '子进程在执行'.$i.PHP_EOL;
    66. $this->checkPid($worker,$this->mpid);//检测主进程是否异常终止避免子进程成为僵尸进程
    67. sleep(1);
    68. }
    69. if($rand==1){
    70. throw new Exception('出现异常了');
    71. }
    72. /*//执行发短信的逻辑,正常的逻辑
    73. $res=false;
    74. if($res==false){
    75. $data=[
    76. 'status'=>false,'data'=>$res,'msg'=>'任务执行成功'
    77. ];
    78. $worker->write(json_encode($data)); //写入到主进程
    79. return;
    80. }*/
    81. $data=[
    82. 'status'=>true,'data'=>$res,'msg'=>'任务执行成功'
    83. ];
    84. $worker->write(json_encode($data)); //写入到主进程
    85. $this->table->del($worker->pid); //删除关系绑定的记录
    86. }
    87. /**
    88. *
    89. *检测主进程是否存在
    90. * @param $mpid 父进程pid
    91. *
    92. */
    93. public function checkPid($worker,$mpid){
    94. if(!swoole\Process::kill($mpid,0)){
    95. //等待子进程任务处理完毕之后再去关闭,根据业务场景,如果需要跟主进程交互的,主进程关闭了
    96. //子进程需要关闭,如果不需要的,等待子进程执行完毕之后再关闭
    97. $msg="主进程已经退出,工作进程{$worker->pid}退出";
    98. echo $msg.PHP_EOL;
    99. $worker->exit();//子进程退出
    100. }
    101. }
    102. //捕获子进程结束时的信号,回收子进程
    103. public function signal(){
    104. swoole_process::signal(SIGCHLD, function($sig) {
    105. //必须为false,非阻塞模式
    106. while($ret = swoole_process::wait(false)) {
    107. if($ret['code']>0){ //出现非0状态码
    108. $pid=$ret['pid'] ;//子进程结束pid
    109. $data=$this->table->get($pid);
    110. $this->table->del($pid); //删除之前内容
    111. //判断重试次数
    112. if($data['attempts']==0){
    113. $process=$this->create_process(); //重新拉起进程,会产生新的pid
    114. $this->table->set($process->pid,['data'=>$data['data'],'attempts'=>$data['attempts']+1]); //新创建的进程
    115. $process->write($data['data']);
    116. return;
    117. }
    118. //file_put_contents(); 记录日志|发送邮件|异常队列当中
    119. }
    120. }
    121. /* foreach($this->table as $row) {
    122. var_dump($row);
    123. }*/
    124. });
    125. }
    126. }
    127. new Pool();