WebSocket 服务

Hyperf 提供了对 WebSocket Server 的封装,可基于 hyperf/websocket-server 组件快速搭建一个 WebSocket 应用。

安装

  1. composer require hyperf/websocket-server

配置 Server

修改 config/autoload/server.php,增加以下配置。

  1. <?php
  2. return [
  3. 'servers' => [
  4. [
  5. 'name' => 'ws',
  6. 'type' => Server::SERVER_WEBSOCKET,
  7. 'host' => '0.0.0.0',
  8. 'port' => 9502,
  9. 'sock_type' => SWOOLE_SOCK_TCP,
  10. 'callbacks' => [
  11. Event::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
  12. Event::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
  13. Event::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
  14. ],
  15. ],
  16. ],
  17. ];

配置路由

目前暂时只支持配置文件的模式配置路由,后续会提供注解模式。

config/routes.php 文件内增加对应 ws 的 Server 的路由配置,这里的 ws 值取决于您在 config/autoload/server.php 内配置的 WebSocket Server 的 name 值。

  1. <?php
  2. Router::addServer('ws', function () {
  3. Router::get('/', 'App\Controller\WebSocketController');
  4. });

创建对应控制器

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use Hyperf\Contract\OnCloseInterface;
  5. use Hyperf\Contract\OnMessageInterface;
  6. use Hyperf\Contract\OnOpenInterface;
  7. use Swoole\Http\Request;
  8. use Swoole\Server;
  9. use Swoole\Websocket\Frame;
  10. use Swoole\WebSocket\Server as WebSocketServer;
  11. class WebSocketController implements OnMessageInterface, OnOpenInterface, OnCloseInterface
  12. {
  13. public function onMessage($server, Frame $frame): void
  14. {
  15. $server->push($frame->fd, 'Recv: ' . $frame->data);
  16. }
  17. public function onClose($server, int $fd, int $reactorId): void
  18. {
  19. var_dump('closed');
  20. }
  21. public function onOpen($server, Request $request): void
  22. {
  23. $server->push($request->fd, 'Opened');
  24. }
  25. }

接下来启动 Server,便能看到对应启动了一个 WebSocket Server 并监听于 9502 端口,此时您便可以通过各种 WebSocket Client 来进行连接和数据传输了。

  1. $ php bin/hyperf.php start
  2. [INFO] Worker#0 started.
  3. [INFO] WebSocket Server listening at 0.0.0.0:9502
  4. [INFO] HTTP Server listening at 0.0.0.0:9501

!> 当我们同时监听了 HTTP Server 的 9501 端口和 WebSocket Server 的 9502 端口时, WebSocket Client 可以通过 9501 和 9502 两个端口连接 WebSocket Server,即连接 ws://0.0.0.0:9501ws://0.0.0.0:9502 都可以成功。

因为 Swoole\WebSocket\Server 继承自 Swoole\Http\Server,可以使用 HTTP 触发所有 WebSocket 的推送,了解详情可查看 Swoole 文档 onRequest 回调部分。

如需关闭,可以修改 config/autoload/server.php 文件给 http 服务中增加 open_websocket_protocol 配置项。

  1. <?php
  2. return [
  3. // 这里省略了该文件的其它配置
  4. 'servers' => [
  5. [
  6. 'name' => 'http',
  7. 'type' => Server::SERVER_HTTP,
  8. 'host' => '0.0.0.0',
  9. 'port' => 9501,
  10. 'sock_type' => SWOOLE_SOCK_TCP,
  11. 'callbacks' => [
  12. Event::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],
  13. ],
  14. 'settings' => [
  15. 'open_websocket_protocol' => false,
  16. ]
  17. ],
  18. ]
  19. ];

连接上下文

WebSocket 服务的 onOpen, onMessage, onClose 回调并不在同一个协程下触发,因此不能直接使用协程上下文存储状态信息。WebSocket Server 组件提供了 连接级 的上下文,API 与协程上下文完全一样。

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use Hyperf\Contract\OnMessageInterface;
  5. use Hyperf\Contract\OnOpenInterface;
  6. use Hyperf\WebSocketServer\Context;
  7. use Swoole\Http\Request;
  8. use Swoole\Websocket\Frame;
  9. use Swoole\WebSocket\Server as WebSocketServer;
  10. class WebSocketController implements OnMessageInterface, OnOpenInterface
  11. {
  12. public function onMessage($server, Frame $frame): void
  13. {
  14. $server->push($frame->fd, 'Username: ' . Context::get('username'));
  15. }
  16. public function onOpen($server, Request $request): void
  17. {
  18. Context::set('username', $request->cookie['username']);
  19. }
  20. }

多 server 配置

  1. # /etc/nginx/conf.d/ng_socketio.conf
  2. # 多个 ws server
  3. upstream io_nodes {
  4. server ws1:9502;
  5. server ws2:9502;
  6. }
  7. server {
  8. listen 9502;
  9. # server_name your.socket.io;
  10. location / {
  11. proxy_set_header Upgrade "websocket";
  12. proxy_set_header Connection "upgrade";
  13. # proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
  14. # proxy_set_header Host $host;
  15. # proxy_http_version 1.1;
  16. # 转发到多个 ws server
  17. proxy_pass http://io_nodes;
  18. }
  19. }

消息发送器

当我们想在 HTTP 服务中,关闭 WebSocket 连接时,可以直接使用 Hyperf\WebSocketServer\Sender

Sender 会判断 fd 是否被当前 Worker 所持有,如果是,则会直接发送数据,如果不是,则会通过 PipeMessage 发送给除自己外的所有 Worker,然后由其他 Worker 进行判断, 如果是自己持有的 fd,就会发送对应数据到客户端。

Sender 支持 pushdisconnect 两个 API,如下:

  1. <?php
  2. declare(strict_types=1);
  3. namespace App\Controller;
  4. use Hyperf\Di\Annotation\Inject;
  5. use Hyperf\HttpServer\Annotation\AutoController;
  6. use Hyperf\WebSocketServer\Sender;
  7. /**
  8. * @AutoController
  9. */
  10. class ServerController
  11. {
  12. /**
  13. * @Inject
  14. * @var Sender
  15. */
  16. protected $sender;
  17. public function close(int $fd)
  18. {
  19. go(function () use ($fd) {
  20. sleep(1);
  21. $this->sender->disconnect($fd);
  22. });
  23. return '';
  24. }
  25. public function send(int $fd)
  26. {
  27. $this->sender->push($fd, 'Hello World.');
  28. return '';
  29. }
  30. }

在 WebSocket 服务中处理 HTTP 请求

我们除了可以将 HTTP 服务和 WebSocket 服务通过端口分开,也可以在 WebSocket 中监听 HTTP 请求。

因为 server.servers.*.callbacks 中的配置项,都是单例的,所以我们需要在 dependencies 中配置一个单独的实例。

  1. <?php
  2. return [
  3. 'HttpServer' => Hyperf\HttpServer\Server::class,
  4. ];

然后修改我们的 WebSocket 服务中的 callbacks 配置,以下隐藏了不相干的配置

  1. <?php
  2. declare(strict_types=1);
  3. use Hyperf\Server\Event;
  4. use Hyperf\Server\Server;
  5. return [
  6. 'mode' => SWOOLE_BASE,
  7. 'servers' => [
  8. [
  9. 'name' => 'ws',
  10. 'type' => Server::SERVER_WEBSOCKET,
  11. 'host' => '0.0.0.0',
  12. 'port' => 9502,
  13. 'sock_type' => SWOOLE_SOCK_TCP,
  14. 'callbacks' => [
  15. Event::ON_REQUEST => ['HttpServer', 'onRequest'],
  16. Event::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
  17. Event::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
  18. Event::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
  19. ],
  20. ],
  21. ],
  22. ];

最后我们便可以在 ws 中,添加 HTTP 路由了。