<?php
/**
* Created by PhpStorm.
* User: Administrator
* Date: 2018/5/23 0023
* Time: 下午 8:53
*/
class Pool{
protected $table;
protected $mpid; //进程id
public function __construct()
{
$this->mpid=getmypid();
$this->table=$this->create_table(); //将进程pid跟任务内容进行绑定
$this->run();
$this->signal();//信号监听
}
//获取任务,创建进程
protected function run(){
$data=[
'990979940@qq.com',
'213123@qq.com',
'457567657@qq.com'
];
foreach ($data as $v){
$process=$this->create_process();
//pid=>990979940@qq.com pid=>任务内容
$this->table->set($process->pid,['data'=>$v]);
$process->write($v);
}
/*foreach($this->table as $row)
{
var_dump($row);
}*/
}
//创建一个内存表
protected function create_table(){
$table=new swoole\table(1024);
$table->column('attempts',swoole\table::TYPE_INT,2);
$table->column('data',swoole\table::TYPE_STRING,64);
$table->create();
return $table;
}
/**
* 创建子进程
*/
protected function create_process(){
$process=new swoole\Process([$this,'callback_function']);
$process->start(); //启动子进程
//异步读取管道消息
swoole_event::add($process->pipe,function () use($process){
$data=json_decode($process->read(),true);
if($data['status']==false);
swoole_event::del($process->pipe);
echo '子进程发送的内容:'.PHP_EOL;
var_dump($data);
});
return $process;
}
//子进程业务处理逻辑
public function callback_function($worker){
$res=$worker->read();
$rand=mt_rand(1,2);
for ($i=0;$i<20;$i++){
echo '子进程在执行'.$i.PHP_EOL;
$this->checkPid($worker,$this->mpid);//检测主进程是否异常终止避免子进程成为僵尸进程
sleep(1);
}
if($rand==1){
throw new Exception('出现异常了');
}
/*//执行发短信的逻辑,正常的逻辑
$res=false;
if($res==false){
$data=[
'status'=>false,'data'=>$res,'msg'=>'任务执行成功'
];
$worker->write(json_encode($data)); //写入到主进程
return;
}*/
$data=[
'status'=>true,'data'=>$res,'msg'=>'任务执行成功'
];
$worker->write(json_encode($data)); //写入到主进程
$this->table->del($worker->pid); //删除关系绑定的记录
}
/**
*
*检测主进程是否存在
* @param $mpid 父进程pid
*
*/
public function checkPid($worker,$mpid){
if(!swoole\Process::kill($mpid,0)){
//等待子进程任务处理完毕之后再去关闭,根据业务场景,如果需要跟主进程交互的,主进程关闭了
//子进程需要关闭,如果不需要的,等待子进程执行完毕之后再关闭
$msg="主进程已经退出,工作进程{$worker->pid}退出";
echo $msg.PHP_EOL;
$worker->exit();//子进程退出
}
}
//捕获子进程结束时的信号,回收子进程
public function signal(){
swoole_process::signal(SIGCHLD, function($sig) {
//必须为false,非阻塞模式
while($ret = swoole_process::wait(false)) {
if($ret['code']>0){ //出现非0状态码
$pid=$ret['pid'] ;//子进程结束pid
$data=$this->table->get($pid);
$this->table->del($pid); //删除之前内容
//判断重试次数
if($data['attempts']==0){
$process=$this->create_process(); //重新拉起进程,会产生新的pid
$this->table->set($process->pid,['data'=>$data['data'],'attempts'=>$data['attempts']+1]); //新创建的进程
$process->write($data['data']);
return;
}
//file_put_contents(); 记录日志|发送邮件|异常队列当中
}
}
/* foreach($this->table as $row) {
var_dump($row);
}*/
});
}
}
new Pool();