1. <?php
    2. class pool{
    3. protected $server;
    4. protected $db_config; //配置信息
    5. public function __construct($config)
    6. {
    7. //全局对象
    8. $this->server=new \swoole\server('0.0.0.0',9800);
    9. $this->server->set($config['tcp']);
    10. //注册事件
    11. $this->server->on('WorkerStart',[$this,'onWorkerStart']);
    12. $this->server->on('receive',[$this,'onReceive']);
    13. $this->server->on('task',[$this,'onTask']);
    14. $this->server->on('finish',[$this,'onFinish']);
    15. $this->db_config=$config;
    16. //echo $this->idle_pool->shift();
    17. $this->server->start(); //全局生命周期
    18. }
    19. public function onWorkerStart($server,$worker_id){
    20. }
    21. public function onReceive($server,$fd,$reactor_id,$data){
    22. $len=unpack('N',$data)[1];
    23. $body=substr($data,-$len);//去除二进制数据之后,不要包头的数据
    24. //sql发送过来的时候,发送给连接池 task
    25. //同步阻塞
    26. $result=$server->taskwait($body);
    27. var_dump($result);
    28. }
    29. /*
    30. * 优点
    31. * 1.简单实现
    32. * 2.自带排队效果
    33. *
    34. *缺点
    35. * 1.没有办法去控制连接池数量,资源非常昂贵是多进程形式的
    36. * 2.多了一次进程间通讯
    37. * 3.长连接复用连接
    38. */
    39. public function onTask($server,$task_id,$worker_id,$data){
    40. $res=$this->query($data); //执行一条sql
    41. $server->finish(json_encode($res));
    42. }
    43. public function onFinish($server,$task_id,$data){
    44. }
    45. protected function query($sql){
    46. //执行sql的时候,两次循环,如果执行成功,就跳出循环失败
    47. $db=$this->db_connect($this->db_config);
    48. for ($i=0;$i<2;$i++){
    49. try{
    50. $res=$db->query($sql);
    51. break;
    52. }catch (PDOException $e){
    53. $err_msg=$e->getMessage();
    54. if(strpos($err_msg,'MySQL server has gone away')!==false){
    55. $db=$this->db_connect($this->db_config);
    56. }
    57. $res=$db->query($sql);
    58. break;
    59. }
    60. }
    61. //如果当前是select
    62. if(preg_match('/^select/i',$sql)){
    63. $data=$res->fetchAll(PDO::FETCH_ASSOC); //执行查询
    64. }else{
    65. $data=$res;
    66. }
    67. return $data;
    68. }
    69. /**
    70. * 数据库长连接
    71. * @param $db_config
    72. * @return array|PDO
    73. */
    74. public function db_connect($db_config){
    75. try{
    76. $pdo=new PDO($db_config['db']['dsn'],$db_config['db']['user'],$db_config['db']['password'],[PDO::ATTR_PERSISTENT=>true,PDO::ATTR_SERVER_INFO=>true]);
    77. return $pdo;
    78. }catch (Exception $e){
    79. echo $e->getMessage();
    80. return [];
    81. }
    82. }
    83. }
    84. $config['tcp']=[
    85. 'worker_num'=>10,
    86. 'task_worker_num'=>20, //连接池数量
    87. 'open_length_check' => true,
    88. 'package_length_type'=>'N',
    89. 'package_length_offset'=>0, //计算总长度
    90. 'package_body_offset'=>4,//包体位置
    91. ];
    92. $config['server']=[
    93. 'max'=>20,
    94. 'min'=>2
    95. ];
    96. $config['db']=[
    97. 'dsn'=>'mysql:host=localhost;dbname=test',
    98. 'user'=>'root',
    99. 'password'=>'Qq!990979940'
    100. ];
    101. new Pool($config);