<?php/* * 接收客户端的sql,执行sql,结果返回 * */class DBServer{ protected $server; protected $min; //最小连接数 protected $max; //最大连接数 protected $idle_pool;//空闲连接池 protected $worker_pool;//工作连接池 protected $wait_queue;//工作连接池 protected $db_config; //配置信息 public function __construct($config) { //全局对象 $this->server=new \swoole\server('0.0.0.0',9800); $this->server->set($config['tcp']); //注册事件 $this->server->on('WorkerStart',[$this,'onWorkerStart']); $this->server->on('receive',[$this,'onReceive']); //$this->server->on('close',[$this,'onClose']); $this->max=$config['server']['max']; $this->min=$config['server']['min']; $this->idle_pool=new SplQueue(); // $this->worker_pool=new SplQueue(); //$this->wait_queue=new SplQueue(); $this->db_config=$config; //$this->idle_pool->push('1'); //$this->idle_pool->push('2'); //echo $this->idle_pool->shift(); $this->server->start(); //全局生命周期 } public function onWorkerStart($server,$worker_id){ //生成数据库连接 for ($i=0;$i<$this->min;$i++){ $pdo=$this->db_connect($this->db_config); $this->push_queue('idle_pool',$pdo); } /*swoole_timer_tick(1000,function (){ //var_dump($this->wait_queue); });*/ } public function onReceive($server,$fd,$reactor_id,$data){ $len=unpack('N',$data)[1]; $body=substr($data,-$len);//去除二进制数据之后,不要包头的数据 //判断有没有空闲连接,等待sql执行 /*if(count( $this->idle_pool)===0){ $queueName='wait_queue'; $this->push_queue($queueName,$body,$fd); }*/ $res=$this->query($fd,$body); //执行一条sql var_dump(count($res)); } protected function query($fd,$sql){ $idle_pool=$this->idle_pool->shift(); $db=$idle_pool['db']; //执行sql的时候,两次循环,如果执行成功,就跳出循环失败 for ($i=0;$i<2;$i++){ try{ $res=$db->query($sql); break; }catch (PDOException $e){ $err_msg=$e->getMessage(); if(strpos($err_msg,'MySQL server has gone away')!==false){ $db=$this->db_connect($this->db_config); } $res=$db->query($sql); break; } } //如果当前是select if(preg_match('/^select/i',$sql)){ $data=$res->fetchAll(PDO::FETCH_ASSOC); //执行查询 }else{ $data=$res; } //sleep(2); //同步执行,如果有sql没有执行完 //执行完毕,重新放回到空闲连接池 $this->push_queue('idle_pool',$db,$fd); return $data; } //向队列当中添加连接 public function push_queue($queueName,$pdo,$fd=0){ $arr=[ 'fd'=>$fd, 'db'=>$pdo ]; $this->$queueName->push($arr); } /** * 数据库长连接 * @param $db_config * @return array|PDO */ public function db_connect($db_config){ try{ $pdo=new PDO($db_config['db']['dsn'],$db_config['db']['user'],$db_config['db']['password'],[PDO::ATTR_PERSISTENT=>true,PDO::ATTR_SERVER_INFO=>true]); return $pdo; }catch (Exception $e){ echo $e->getMessage(); return []; } }}$config['tcp']=[ 'worker_num'=>1, 'open_length_check' => true, 'package_length_type'=>'N', 'package_length_offset'=>0, //计算总长度 'package_body_offset'=>4,//包体位置 ];$config['server']=[ 'max'=>20, 'min'=>2];$config['db']=[ 'dsn'=>'mysql:host=localhost;dbname=test', 'user'=>'root', 'password'=>'Qq!990979940'];new DBServer($config);