<?php/** * Created by PhpStorm. * User: Administrator * Date: 2018/5/23 0023 * Time: 下午 8:53 */class Pool{ protected $table; protected $mpid; //进程id public function __construct() { $this->mpid=getmypid(); $this->table=$this->create_table(); //将进程pid跟任务内容进行绑定 $this->run(); $this->signal();//信号监听 } //获取任务,创建进程 protected function run(){ $data=[ '990979940@qq.com', '213123@qq.com', '457567657@qq.com' ]; foreach ($data as $v){ $process=$this->create_process(); //pid=>990979940@qq.com pid=>任务内容 $this->table->set($process->pid,['data'=>$v]); $process->write($v); } /*foreach($this->table as $row) { var_dump($row); }*/ } //创建一个内存表 protected function create_table(){ $table=new swoole\table(1024); $table->column('attempts',swoole\table::TYPE_INT,2); $table->column('data',swoole\table::TYPE_STRING,64); $table->create(); return $table; } /** * 创建子进程 */ protected function create_process(){ $process=new swoole\Process([$this,'callback_function']); $process->start(); //启动子进程 //异步读取管道消息 swoole_event::add($process->pipe,function () use($process){ $data=json_decode($process->read(),true); if($data['status']==false); swoole_event::del($process->pipe); echo '子进程发送的内容:'.PHP_EOL; var_dump($data); }); return $process; } //子进程业务处理逻辑 public function callback_function($worker){ $res=$worker->read(); $rand=mt_rand(1,2); for ($i=0;$i<20;$i++){ echo '子进程在执行'.$i.PHP_EOL; $this->checkPid($worker,$this->mpid);//检测主进程是否异常终止避免子进程成为僵尸进程 sleep(1); } if($rand==1){ throw new Exception('出现异常了'); } /*//执行发短信的逻辑,正常的逻辑 $res=false; if($res==false){ $data=[ 'status'=>false,'data'=>$res,'msg'=>'任务执行成功' ]; $worker->write(json_encode($data)); //写入到主进程 return; }*/ $data=[ 'status'=>true,'data'=>$res,'msg'=>'任务执行成功' ]; $worker->write(json_encode($data)); //写入到主进程 $this->table->del($worker->pid); //删除关系绑定的记录 } /** * *检测主进程是否存在 * @param $mpid 父进程pid * */ public function checkPid($worker,$mpid){ if(!swoole\Process::kill($mpid,0)){ //等待子进程任务处理完毕之后再去关闭,根据业务场景,如果需要跟主进程交互的,主进程关闭了 //子进程需要关闭,如果不需要的,等待子进程执行完毕之后再关闭 $msg="主进程已经退出,工作进程{$worker->pid}退出"; echo $msg.PHP_EOL; $worker->exit();//子进程退出 } } //捕获子进程结束时的信号,回收子进程 public function signal(){ swoole_process::signal(SIGCHLD, function($sig) { //必须为false,非阻塞模式 while($ret = swoole_process::wait(false)) { if($ret['code']>0){ //出现非0状态码 $pid=$ret['pid'] ;//子进程结束pid $data=$this->table->get($pid); $this->table->del($pid); //删除之前内容 //判断重试次数 if($data['attempts']==0){ $process=$this->create_process(); //重新拉起进程,会产生新的pid $this->table->set($process->pid,['data'=>$data['data'],'attempts'=>$data['attempts']+1]); //新创建的进程 $process->write($data['data']); return; } //file_put_contents(); 记录日志|发送邮件|异常队列当中 } } /* foreach($this->table as $row) { var_dump($row); }*/ }); }}new Pool();