<?php
/*
* 接收客户端的sql,执行sql,结果返回
* */
class DBServer{
protected $server;
protected $min; //最小连接数
protected $max; //最大连接数
protected $idle_pool;//空闲连接池
protected $worker_pool;//工作连接池
protected $wait_queue;//工作连接池
protected $db_config; //配置信息
public function __construct($config)
{
//全局对象
$this->server=new \swoole\server('0.0.0.0',9800);
$this->server->set($config['tcp']);
//注册事件
$this->server->on('WorkerStart',[$this,'onWorkerStart']);
$this->server->on('receive',[$this,'onReceive']);
//$this->server->on('close',[$this,'onClose']);
$this->max=$config['server']['max'];
$this->min=$config['server']['min'];
$this->idle_pool=new SplQueue();
// $this->worker_pool=new SplQueue();
//$this->wait_queue=new SplQueue();
$this->db_config=$config;
//$this->idle_pool->push('1');
//$this->idle_pool->push('2');
//echo $this->idle_pool->shift();
$this->server->start(); //全局生命周期
}
public function onWorkerStart($server,$worker_id){
//生成数据库连接
for ($i=0;$i<$this->min;$i++){
$pdo=$this->db_connect($this->db_config);
$this->push_queue('idle_pool',$pdo);
}
/*swoole_timer_tick(1000,function (){
//var_dump($this->wait_queue);
});*/
}
public function onReceive($server,$fd,$reactor_id,$data){
$len=unpack('N',$data)[1];
$body=substr($data,-$len);//去除二进制数据之后,不要包头的数据
//判断有没有空闲连接,等待sql执行
/*if(count( $this->idle_pool)===0){
$queueName='wait_queue';
$this->push_queue($queueName,$body,$fd);
}*/
$res=$this->query($fd,$body); //执行一条sql
var_dump(count($res));
}
protected function query($fd,$sql){
$idle_pool=$this->idle_pool->shift();
$db=$idle_pool['db'];
//执行sql的时候,两次循环,如果执行成功,就跳出循环失败
for ($i=0;$i<2;$i++){
try{
$res=$db->query($sql);
break;
}catch (PDOException $e){
$err_msg=$e->getMessage();
if(strpos($err_msg,'MySQL server has gone away')!==false){
$db=$this->db_connect($this->db_config);
}
$res=$db->query($sql);
break;
}
}
//如果当前是select
if(preg_match('/^select/i',$sql)){
$data=$res->fetchAll(PDO::FETCH_ASSOC); //执行查询
}else{
$data=$res;
}
//sleep(2); //同步执行,如果有sql没有执行完
//执行完毕,重新放回到空闲连接池
$this->push_queue('idle_pool',$db,$fd);
return $data;
}
//向队列当中添加连接
public function push_queue($queueName,$pdo,$fd=0){
$arr=[
'fd'=>$fd,
'db'=>$pdo
];
$this->$queueName->push($arr);
}
/**
* 数据库长连接
* @param $db_config
* @return array|PDO
*/
public function db_connect($db_config){
try{
$pdo=new PDO($db_config['db']['dsn'],$db_config['db']['user'],$db_config['db']['password'],[PDO::ATTR_PERSISTENT=>true,PDO::ATTR_SERVER_INFO=>true]);
return $pdo;
}catch (Exception $e){
echo $e->getMessage();
return [];
}
}
}
$config['tcp']=[
'worker_num'=>1,
'open_length_check' => true,
'package_length_type'=>'N',
'package_length_offset'=>0, //计算总长度
'package_body_offset'=>4,//包体位置
];
$config['server']=[
'max'=>20,
'min'=>2
];
$config['db']=[
'dsn'=>'mysql:host=localhost;dbname=test',
'user'=>'root',
'password'=>'Qq!990979940'
];
new DBServer($config);