1. <?php
    2. /*
    3. * 接收客户端的sql,执行sql,结果返回
    4. * */
    5. class DBServer{
    6. protected $server;
    7. protected $min; //最小连接数
    8. protected $max; //最大连接数
    9. protected $idle_pool;//空闲连接池
    10. protected $worker_pool;//工作连接池
    11. protected $wait_queue;//工作连接池
    12. protected $db_config; //配置信息
    13. public function __construct($config)
    14. {
    15. //全局对象
    16. $this->server=new \swoole\server('0.0.0.0',9800);
    17. $this->server->set($config['tcp']);
    18. //注册事件
    19. $this->server->on('WorkerStart',[$this,'onWorkerStart']);
    20. $this->server->on('receive',[$this,'onReceive']);
    21. //$this->server->on('close',[$this,'onClose']);
    22. $this->max=$config['server']['max'];
    23. $this->min=$config['server']['min'];
    24. $this->idle_pool=new SplQueue();
    25. // $this->worker_pool=new SplQueue();
    26. //$this->wait_queue=new SplQueue();
    27. $this->db_config=$config;
    28. //$this->idle_pool->push('1');
    29. //$this->idle_pool->push('2');
    30. //echo $this->idle_pool->shift();
    31. $this->server->start(); //全局生命周期
    32. }
    33. public function onWorkerStart($server,$worker_id){
    34. //生成数据库连接
    35. for ($i=0;$i<$this->min;$i++){
    36. $pdo=$this->db_connect($this->db_config);
    37. $this->push_queue('idle_pool',$pdo);
    38. }
    39. /*swoole_timer_tick(1000,function (){
    40. //var_dump($this->wait_queue);
    41. });*/
    42. }
    43. public function onReceive($server,$fd,$reactor_id,$data){
    44. $len=unpack('N',$data)[1];
    45. $body=substr($data,-$len);//去除二进制数据之后,不要包头的数据
    46. //判断有没有空闲连接,等待sql执行
    47. /*if(count( $this->idle_pool)===0){
    48. $queueName='wait_queue';
    49. $this->push_queue($queueName,$body,$fd);
    50. }*/
    51. $res=$this->query($fd,$body); //执行一条sql
    52. var_dump(count($res));
    53. }
    54. protected function query($fd,$sql){
    55. $idle_pool=$this->idle_pool->shift();
    56. $db=$idle_pool['db'];
    57. //执行sql的时候,两次循环,如果执行成功,就跳出循环失败
    58. for ($i=0;$i<2;$i++){
    59. try{
    60. $res=$db->query($sql);
    61. break;
    62. }catch (PDOException $e){
    63. $err_msg=$e->getMessage();
    64. if(strpos($err_msg,'MySQL server has gone away')!==false){
    65. $db=$this->db_connect($this->db_config);
    66. }
    67. $res=$db->query($sql);
    68. break;
    69. }
    70. }
    71. //如果当前是select
    72. if(preg_match('/^select/i',$sql)){
    73. $data=$res->fetchAll(PDO::FETCH_ASSOC); //执行查询
    74. }else{
    75. $data=$res;
    76. }
    77. //sleep(2); //同步执行,如果有sql没有执行完
    78. //执行完毕,重新放回到空闲连接池
    79. $this->push_queue('idle_pool',$db,$fd);
    80. return $data;
    81. }
    82. //向队列当中添加连接
    83. public function push_queue($queueName,$pdo,$fd=0){
    84. $arr=[
    85. 'fd'=>$fd,
    86. 'db'=>$pdo
    87. ];
    88. $this->$queueName->push($arr);
    89. }
    90. /**
    91. * 数据库长连接
    92. * @param $db_config
    93. * @return array|PDO
    94. */
    95. public function db_connect($db_config){
    96. try{
    97. $pdo=new PDO($db_config['db']['dsn'],$db_config['db']['user'],$db_config['db']['password'],[PDO::ATTR_PERSISTENT=>true,PDO::ATTR_SERVER_INFO=>true]);
    98. return $pdo;
    99. }catch (Exception $e){
    100. echo $e->getMessage();
    101. return [];
    102. }
    103. }
    104. }
    105. $config['tcp']=[
    106. 'worker_num'=>1,
    107. 'open_length_check' => true,
    108. 'package_length_type'=>'N',
    109. 'package_length_offset'=>0, //计算总长度
    110. 'package_body_offset'=>4,//包体位置
    111. ];
    112. $config['server']=[
    113. 'max'=>20,
    114. 'min'=>2
    115. ];
    116. $config['db']=[
    117. 'dsn'=>'mysql:host=localhost;dbname=test',
    118. 'user'=>'root',
    119. 'password'=>'Qq!990979940'
    120. ];
    121. new DBServer($config);