<?php
/*
* 接收客户端的sql,执行sql,结果返回
* */
class DBServer{
protected $server;
protected $min; //最小连接数
protected $max; //最大连接数
protected $idle_pool;//空闲连接池
protected $worker_pool;//工作连接池
protected $wait_queue;//等待队列
protected $db_config; //配置信息
protected $wait_queue_max;//最多的排队数
protected $count; //记录当前数据库的连接数量
protected $table;
public function __construct($config)
{
//全局对象
$this->server=new \swoole\server('0.0.0.0',9800);
$this->server->set($config['tcp']);
//注册事件
$this->server->on('WorkerStart',[$this,'onWorkerStart']);
$this->server->on('receive',[$this,'onReceive']);
//$this->server->on('close',[$this,'onClose']);
$this->max=$config['server']['max'];
$this->min=$config['server']['min'];
$this->count=0;
$this->idle_pool=new SplQueue();
$this->wait_queue=new SplQueue();
$this->wait_queue_max=35;
//$this->worker_pool=new SplQueue();
$this->db_config=$config;
//$this->idle_pool->push('1');
//$this->idle_pool->push('2');
//echo $this->idle_pool->shift();
$this->table=new swoole\table(1024);
//一个字段是保存的是task_worker的id号,保存的是状态
$this->table->column('count',swoole\table::TYPE_INT,5); //连接数
$this->table->create(); //必须在创建子进程之前.
$this->server->start(); //全局生命周期
}
public function onWorkerStart($server,$worker_id){
//生成数据库连接
for ($i=0;$i<$this->min;$i++){
$this->db_connect($this->db_config);
}
swoole_timer_tick(1000,function (){
var_dump('创建的连接数'.$this->count.'-----'.'等待队列数'.count($this->wait_queue).'------连接池个数'.count($this->idle_pool));
});
//没有限制queue, shift为空错误,如何解决
/*
swoole_timer_tick(100,function (){
if(count($this->wait_queue)>0){
//空闲连接池当中去取出连接去执行sql
$idle_num=count($this->idle_pool);
for($i=0;$i<$idle_num;$i++){
$req=$this->wait_queue->shift();
$this->query($req['db'],function ($res){
//var_dump($res);
var_dump('等待队列的处理'.count($res));
//$this->server->send($fd,'123');
});
}
}
});*/
}
public function onReceive($server,$fd,$reactor_id,$data){
$len=unpack('N',$data)[1];
$body=substr($data,-$len);//去除二进制数据之后,不要包头的数据
// try{
//连接数不够了
if(count($this->idle_pool)!=0){
$this->query($body,function ($res) use($fd){
var_dump('最小连接的执行结果'.count($res));
//$this->server->send($fd,'123');
});
}else{
//重新建立连接,并且判断当前的连接池数量小于最大的连接数
//var_dump('连接池个数'.$this->count);
$count=$this->table->get(1,'count');
if($count<$this->max){
$this->table->incr(1,'count');
$this->db_connect($this->db_config,function() use($body){
$this->query($body,function($res){
var_dump('新创建的连接结果'.count($res));
});
});
}else{
if(count($this->wait_queue)<$this->wait_queue_max){
$this->push_queue('wait_queue',$body);
}
}
}
//判断连接池当中,有没有空闲的数据库连接,如果没有空闲的连接,并且等待队列未满
/*//放入到等待的队列当中
if(count($this->idle_pool)===0){
if(count($this->wait_queue)<$this->wait_queue_max){
$this->push_queue('wait_queue',$body);
}
}*/
// }catch(\Exception $e){
// echo '捕获异常'.$e->getMessage();
// }
}
protected function query($sql,$callback=''){
//取出可用连接
$idle_pool=$this->idle_pool->shift();
$db=$idle_pool['db'];
$db->query($sql,function ($db,$res) use ($callback){
$callback($res);
//执行完毕,重新放回到空闲连接池,复用连接
$this->push_queue('idle_pool',$db);
//取出队列当中的
if(count($this->wait_queue)>0){
//空闲连接池当中去取出连接去执行sql
$idle_num=count($this->idle_pool);
for($i=0;$i<$idle_num;$i++){
$req=$this->wait_queue->shift();
$this->query($req['db'],function ($res){
//var_dump($res);
var_dump('等待队列的处理'.count($res));
//$this->server->send($fd,'123');
});
}
}
});
}
//向队列当中添加连接
public function push_queue($queueName,$pdo,$fd=0){
$arr=[
'fd'=>$fd,
'db'=>$pdo
];
$this->$queueName->push($arr);
}
/**
* 数据库长连接
* @param $db_config
* @return array|PDO
*/
public function db_connect($db_config,$callback=''){
$mysql=new swoole\mysql;
$mysql->connect($db_config['db'],function ($db,$res) use($callback){
//错误提示
if ($res == false) {
var_dump('错误信息'.$db->connect_errno,$db->connect_error);
return;
}
//维护数据库连接数量
$this->table->incr(1,'count');
//放入到空闲的连接池当中
$this->push_queue('idle_pool',$db);
if(is_callable($callback)){
$callback();
return;
}
});
}
}
$config['tcp']=[
'worker_num'=>1,
'open_length_check' => true,
'package_length_type'=>'N',
'package_length_offset'=>0, //计算总长度
'package_body_offset'=>4,//包体位置
];
$config['server']=[
'max'=>20,
'min'=>5
];
$config['db']=[
'host' => '127.0.0.1',
'port' => 3306,
'user' => 'root',
'password' => 'Qq!990979940',
'database' => 'test',
'charset' => 'utf8', //指定字符集
'timeout' =>10, // 可选:连接超时时间(非查询超时时间),默认为SW_MYSQL_CONNECT_TIMEOUT(1.0)
];
new DBServer($config);