<?php/* * 接收客户端的sql,执行sql,结果返回 * */class DBServer{ protected $server; protected $min; //最小连接数 protected $max; //最大连接数 protected $idle_pool;//空闲连接池 protected $worker_pool;//工作连接池 protected $wait_queue;//等待队列 protected $db_config; //配置信息 protected $wait_queue_max;//最多的排队数 protected $count; //记录当前数据库的连接数量 protected $table; public function __construct($config) { //全局对象 $this->server=new \swoole\server('0.0.0.0',9800); $this->server->set($config['tcp']); //注册事件 $this->server->on('WorkerStart',[$this,'onWorkerStart']); $this->server->on('receive',[$this,'onReceive']); //$this->server->on('close',[$this,'onClose']); $this->max=$config['server']['max']; $this->min=$config['server']['min']; $this->count=0; $this->idle_pool=new SplQueue(); $this->wait_queue=new SplQueue(); $this->wait_queue_max=35; //$this->worker_pool=new SplQueue(); $this->db_config=$config; //$this->idle_pool->push('1'); //$this->idle_pool->push('2'); //echo $this->idle_pool->shift(); $this->table=new swoole\table(1024); //一个字段是保存的是task_worker的id号,保存的是状态 $this->table->column('count',swoole\table::TYPE_INT,5); //连接数 $this->table->create(); //必须在创建子进程之前. $this->server->start(); //全局生命周期 } public function onWorkerStart($server,$worker_id){ //生成数据库连接 for ($i=0;$i<$this->min;$i++){ $this->db_connect($this->db_config); } swoole_timer_tick(1000,function (){ var_dump('创建的连接数'.$this->count.'-----'.'等待队列数'.count($this->wait_queue).'------连接池个数'.count($this->idle_pool)); }); //没有限制queue, shift为空错误,如何解决 /* swoole_timer_tick(100,function (){ if(count($this->wait_queue)>0){ //空闲连接池当中去取出连接去执行sql $idle_num=count($this->idle_pool); for($i=0;$i<$idle_num;$i++){ $req=$this->wait_queue->shift(); $this->query($req['db'],function ($res){ //var_dump($res); var_dump('等待队列的处理'.count($res)); //$this->server->send($fd,'123'); }); } } });*/ } public function onReceive($server,$fd,$reactor_id,$data){ $len=unpack('N',$data)[1]; $body=substr($data,-$len);//去除二进制数据之后,不要包头的数据 // try{ //连接数不够了 if(count($this->idle_pool)!=0){ $this->query($body,function ($res) use($fd){ var_dump('最小连接的执行结果'.count($res)); //$this->server->send($fd,'123'); }); }else{ //重新建立连接,并且判断当前的连接池数量小于最大的连接数 //var_dump('连接池个数'.$this->count); $count=$this->table->get(1,'count'); if($count<$this->max){ $this->table->incr(1,'count'); $this->db_connect($this->db_config,function() use($body){ $this->query($body,function($res){ var_dump('新创建的连接结果'.count($res)); }); }); }else{ if(count($this->wait_queue)<$this->wait_queue_max){ $this->push_queue('wait_queue',$body); } } } //判断连接池当中,有没有空闲的数据库连接,如果没有空闲的连接,并且等待队列未满 /*//放入到等待的队列当中 if(count($this->idle_pool)===0){ if(count($this->wait_queue)<$this->wait_queue_max){ $this->push_queue('wait_queue',$body); } }*/ // }catch(\Exception $e){ // echo '捕获异常'.$e->getMessage(); // } } protected function query($sql,$callback=''){ //取出可用连接 $idle_pool=$this->idle_pool->shift(); $db=$idle_pool['db']; $db->query($sql,function ($db,$res) use ($callback){ $callback($res); //执行完毕,重新放回到空闲连接池,复用连接 $this->push_queue('idle_pool',$db); //取出队列当中的 if(count($this->wait_queue)>0){ //空闲连接池当中去取出连接去执行sql $idle_num=count($this->idle_pool); for($i=0;$i<$idle_num;$i++){ $req=$this->wait_queue->shift(); $this->query($req['db'],function ($res){ //var_dump($res); var_dump('等待队列的处理'.count($res)); //$this->server->send($fd,'123'); }); } } }); } //向队列当中添加连接 public function push_queue($queueName,$pdo,$fd=0){ $arr=[ 'fd'=>$fd, 'db'=>$pdo ]; $this->$queueName->push($arr); } /** * 数据库长连接 * @param $db_config * @return array|PDO */ public function db_connect($db_config,$callback=''){ $mysql=new swoole\mysql; $mysql->connect($db_config['db'],function ($db,$res) use($callback){ //错误提示 if ($res == false) { var_dump('错误信息'.$db->connect_errno,$db->connect_error); return; } //维护数据库连接数量 $this->table->incr(1,'count'); //放入到空闲的连接池当中 $this->push_queue('idle_pool',$db); if(is_callable($callback)){ $callback(); return; } }); }}$config['tcp']=[ 'worker_num'=>1, 'open_length_check' => true, 'package_length_type'=>'N', 'package_length_offset'=>0, //计算总长度 'package_body_offset'=>4,//包体位置];$config['server']=[ 'max'=>20, 'min'=>5];$config['db']=[ 'host' => '127.0.0.1', 'port' => 3306, 'user' => 'root', 'password' => 'Qq!990979940', 'database' => 'test', 'charset' => 'utf8', //指定字符集 'timeout' =>10, // 可选:连接超时时间(非查询超时时间),默认为SW_MYSQL_CONNECT_TIMEOUT(1.0)];new DBServer($config);