1. <?php
    2. /*
    3. * 接收客户端的sql,执行sql,结果返回
    4. * */
    5. class DBServer{
    6. protected $server;
    7. protected $min; //最小连接数
    8. protected $max; //最大连接数
    9. protected $idle_pool;//空闲连接池
    10. protected $worker_pool;//工作连接池
    11. protected $wait_queue;//等待队列
    12. protected $db_config; //配置信息
    13. protected $wait_queue_max;//最多的排队数
    14. protected $count; //记录当前数据库的连接数量
    15. protected $table;
    16. public function __construct($config)
    17. {
    18. //全局对象
    19. $this->server=new \swoole\server('0.0.0.0',9800);
    20. $this->server->set($config['tcp']);
    21. //注册事件
    22. $this->server->on('WorkerStart',[$this,'onWorkerStart']);
    23. $this->server->on('receive',[$this,'onReceive']);
    24. //$this->server->on('close',[$this,'onClose']);
    25. $this->max=$config['server']['max'];
    26. $this->min=$config['server']['min'];
    27. $this->count=0;
    28. $this->idle_pool=new SplQueue();
    29. $this->wait_queue=new SplQueue();
    30. $this->wait_queue_max=35;
    31. //$this->worker_pool=new SplQueue();
    32. $this->db_config=$config;
    33. //$this->idle_pool->push('1');
    34. //$this->idle_pool->push('2');
    35. //echo $this->idle_pool->shift();
    36. $this->table=new swoole\table(1024);
    37. //一个字段是保存的是task_workerid号,保存的是状态
    38. $this->table->column('count',swoole\table::TYPE_INT,5); //连接数
    39. $this->table->create(); //必须在创建子进程之前.
    40. $this->server->start(); //全局生命周期
    41. }
    42. public function onWorkerStart($server,$worker_id){
    43. //生成数据库连接
    44. for ($i=0;$i<$this->min;$i++){
    45. $this->db_connect($this->db_config);
    46. }
    47. swoole_timer_tick(1000,function (){
    48. var_dump('创建的连接数'.$this->count.'-----'.'等待队列数'.count($this->wait_queue).'------连接池个数'.count($this->idle_pool));
    49. });
    50. //没有限制queue, shift为空错误,如何解决
    51. /*
    52. swoole_timer_tick(100,function (){
    53. if(count($this->wait_queue)>0){
    54. //空闲连接池当中去取出连接去执行sql
    55. $idle_num=count($this->idle_pool);
    56. for($i=0;$i<$idle_num;$i++){
    57. $req=$this->wait_queue->shift();
    58. $this->query($req['db'],function ($res){
    59. //var_dump($res);
    60. var_dump('等待队列的处理'.count($res));
    61. //$this->server->send($fd,'123');
    62. });
    63. }
    64. }
    65. });*/
    66. }
    67. public function onReceive($server,$fd,$reactor_id,$data){
    68. $len=unpack('N',$data)[1];
    69. $body=substr($data,-$len);//去除二进制数据之后,不要包头的数据
    70. // try{
    71. //连接数不够了
    72. if(count($this->idle_pool)!=0){
    73. $this->query($body,function ($res) use($fd){
    74. var_dump('最小连接的执行结果'.count($res));
    75. //$this->server->send($fd,'123');
    76. });
    77. }else{
    78. //重新建立连接,并且判断当前的连接池数量小于最大的连接数
    79. //var_dump('连接池个数'.$this->count);
    80. $count=$this->table->get(1,'count');
    81. if($count<$this->max){
    82. $this->table->incr(1,'count');
    83. $this->db_connect($this->db_config,function() use($body){
    84. $this->query($body,function($res){
    85. var_dump('新创建的连接结果'.count($res));
    86. });
    87. });
    88. }else{
    89. if(count($this->wait_queue)<$this->wait_queue_max){
    90. $this->push_queue('wait_queue',$body);
    91. }
    92. }
    93. }
    94. //判断连接池当中,有没有空闲的数据库连接,如果没有空闲的连接,并且等待队列未满
    95. /*//放入到等待的队列当中
    96. if(count($this->idle_pool)===0){
    97. if(count($this->wait_queue)<$this->wait_queue_max){
    98. $this->push_queue('wait_queue',$body);
    99. }
    100. }*/
    101. // }catch(\Exception $e){
    102. // echo '捕获异常'.$e->getMessage();
    103. // }
    104. }
    105. protected function query($sql,$callback=''){
    106. //取出可用连接
    107. $idle_pool=$this->idle_pool->shift();
    108. $db=$idle_pool['db'];
    109. $db->query($sql,function ($db,$res) use ($callback){
    110. $callback($res);
    111. //执行完毕,重新放回到空闲连接池,复用连接
    112. $this->push_queue('idle_pool',$db);
    113. //取出队列当中的
    114. if(count($this->wait_queue)>0){
    115. //空闲连接池当中去取出连接去执行sql
    116. $idle_num=count($this->idle_pool);
    117. for($i=0;$i<$idle_num;$i++){
    118. $req=$this->wait_queue->shift();
    119. $this->query($req['db'],function ($res){
    120. //var_dump($res);
    121. var_dump('等待队列的处理'.count($res));
    122. //$this->server->send($fd,'123');
    123. });
    124. }
    125. }
    126. });
    127. }
    128. //向队列当中添加连接
    129. public function push_queue($queueName,$pdo,$fd=0){
    130. $arr=[
    131. 'fd'=>$fd,
    132. 'db'=>$pdo
    133. ];
    134. $this->$queueName->push($arr);
    135. }
    136. /**
    137. * 数据库长连接
    138. * @param $db_config
    139. * @return array|PDO
    140. */
    141. public function db_connect($db_config,$callback=''){
    142. $mysql=new swoole\mysql;
    143. $mysql->connect($db_config['db'],function ($db,$res) use($callback){
    144. //错误提示
    145. if ($res == false) {
    146. var_dump('错误信息'.$db->connect_errno,$db->connect_error);
    147. return;
    148. }
    149. //维护数据库连接数量
    150. $this->table->incr(1,'count');
    151. //放入到空闲的连接池当中
    152. $this->push_queue('idle_pool',$db);
    153. if(is_callable($callback)){
    154. $callback();
    155. return;
    156. }
    157. });
    158. }
    159. }
    160. $config['tcp']=[
    161. 'worker_num'=>1,
    162. 'open_length_check' => true,
    163. 'package_length_type'=>'N',
    164. 'package_length_offset'=>0, //计算总长度
    165. 'package_body_offset'=>4,//包体位置
    166. ];
    167. $config['server']=[
    168. 'max'=>20,
    169. 'min'=>5
    170. ];
    171. $config['db']=[
    172. 'host' => '127.0.0.1',
    173. 'port' => 3306,
    174. 'user' => 'root',
    175. 'password' => 'Qq!990979940',
    176. 'database' => 'test',
    177. 'charset' => 'utf8', //指定字符集
    178. 'timeout' =>10, // 可选:连接超时时间(非查询超时时间),默认为SW_MYSQL_CONNECT_TIMEOUT1.0
    179. ];
    180. new DBServer($config);