本章将讲解如何实现流媒体服务器的信令系统,以及信令与数据转发之间是如何进行配合的。一般信令系统都是整个系统的灵魂,知道了信令的运转就撑握了系统的脉落,这样就能很轻松的知道Mediasoup的运行逻辑了。

14-1 mediasoup-demo整体分析

图片.png
图片.png
aiortc: 使用asyncio 库实现的Webrtc 和ortc 库

14-2 JavaScript基本语法一

14-3 JavaScript基本语法二

https://www.yuque.com/caokunchao/pelvyg/dypyoo

14-4 JavaScriptES6高级特性

图片.png

图片.png

14-5 Promise与EventEmitter详解

图片.png
图片.png

14-6 剖析serverjs

const mediasoup = require(‘mediasoup’);
runMediasoupWorkers函数的mediasoup.createWorker函数,来自于mediasoup源码里面的index.js
下载mediasoup源码

  1. git clone https://github.com/versatica/mediasoup
  2. cd mediasoup
  3. grep "createWorker" * -Rn

添加注释后的server.js

  1. #!/usr/bin/env node
  2. //进程名称
  3. process.title = 'mediasoup-demo-server';
  4. //设置DEBUG环境变量
  5. process.env.DEBUG = process.env.DEBUG || '*INFO* *WARN* *ERROR*';
  6. //引用config.js内容
  7. const config = require('./config');
  8. /* eslint-disable no-console */
  9. console.log('process.env.DEBUG:', process.env.DEBUG);
  10. console.log('config.js:\n%s', JSON.stringify(config, null, ' '));
  11. /* eslint-enable no-console */
  12. //引用需要的一些库
  13. const fs = require('fs');
  14. const https = require('https');
  15. const url = require('url');
  16. //提供websocket服务器
  17. const protoo = require('protoo-server');
  18. //mediasoup库,获取代码https://github.com/versatica/mediasoup
  19. const mediasoup = require('mediasoup');
  20. const express = require('express');
  21. const bodyParser = require('body-parser');
  22. const { AwaitQueue } = require('awaitqueue');
  23. const Logger = require('./lib/Logger');
  24. //房间管理
  25. const Room = require('./lib/Room');
  26. const interactiveServer = require('./lib/interactiveServer');
  27. const interactiveClient = require('./lib/interactiveClient');
  28. const logger = new Logger();
  29. // Async queue to manage rooms.
  30. // @type {AwaitQueue}
  31. const queue = new AwaitQueue();
  32. // Map of Room instances indexed by roomId.
  33. // @type {Map<Number, Room>}
  34. const rooms = new Map();
  35. // HTTPS server.
  36. // @type {https.Server}
  37. let httpsServer;
  38. // Express application.
  39. // @type {Function}
  40. let expressApp;
  41. // Protoo WebSocket server.
  42. // @type {protoo.WebSocketServer}
  43. let protooWebSocketServer;
  44. // mediasoup Workers.
  45. // @type {Array<mediasoup.Worker>}
  46. const mediasoupWorkers = [];
  47. // Index of next mediasoup Worker to use.
  48. // @type {Number}
  49. let nextMediasoupWorkerIdx = 0;
  50. run();
  51. //核心函数,相当于main函数
  52. async function run()
  53. {
  54. // Open the interactive server.
  55. await interactiveServer();
  56. // Open the interactive client.
  57. if (process.env.INTERACTIVE === 'true' || process.env.INTERACTIVE === '1')
  58. await interactiveClient();
  59. // Run a mediasoup Worker.
  60. await runMediasoupWorkers();
  61. // Create Express app.
  62. await createExpressApp();
  63. // Run HTTPS server.
  64. await runHttpsServer();
  65. // Run a protoo WebSocketServer.
  66. //引进protoo库,创建WebSocketServer
  67. await runProtooWebSocketServer();
  68. // Log rooms status every X seconds.
  69. setInterval(() =>
  70. {
  71. for (const room of rooms.values())
  72. {
  73. room.logStatus();
  74. }
  75. }, 120000);
  76. }
  77. /**
  78. * Launch as many mediasoup Workers as given in the configuration file.
  79. */
  80. async function runMediasoupWorkers()
  81. {
  82. //获取config.js里面读取到的cpu核数
  83. const { numWorkers } = config.mediasoup;
  84. logger.info('running %d mediasoup Workers...', numWorkers);
  85. //创建numWorkers个Worker工作进程
  86. for (let i = 0; i < numWorkers; ++i)
  87. {
  88. const worker = await mediasoup.createWorker(
  89. {
  90. logLevel : config.mediasoup.workerSettings.logLevel,
  91. logTags : config.mediasoup.workerSettings.logTags,
  92. rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),
  93. rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)
  94. });
  95. worker.on('died', () =>
  96. {
  97. logger.error(
  98. 'mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid);
  99. setTimeout(() => process.exit(1), 2000);
  100. });
  101. //每创建一个worker进程,都保存一下
  102. mediasoupWorkers.push(worker);
  103. // Log worker resource usage every X seconds.
  104. setInterval(async () =>
  105. {
  106. const usage = await worker.getResourceUsage();
  107. logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);
  108. }, 120000);
  109. }
  110. }
  111. /**
  112. * Create an Express based API server to manage Broadcaster requests.
  113. */
  114. async function createExpressApp()
  115. {
  116. logger.info('creating Express app...');
  117. expressApp = express();
  118. expressApp.use(bodyParser.json());
  119. /**
  120. * For every API request, verify that the roomId in the path matches and
  121. * existing room.
  122. */
  123. expressApp.param(
  124. 'roomId', (req, res, next, roomId) =>
  125. {
  126. // The room must exist for all API requests.
  127. if (!rooms.has(roomId))
  128. {
  129. const error = new Error(`room with id "${roomId}" not found`);
  130. error.status = 404;
  131. throw error;
  132. }
  133. req.room = rooms.get(roomId);
  134. next();
  135. });
  136. /**
  137. * API GET resource that returns the mediasoup Router RTP capabilities of
  138. * the room.
  139. */
  140. expressApp.get(
  141. '/rooms/:roomId', (req, res) =>
  142. {
  143. const data = req.room.getRouterRtpCapabilities();
  144. res.status(200).json(data);
  145. });
  146. /**
  147. * POST API to create a Broadcaster.
  148. */
  149. expressApp.post(
  150. '/rooms/:roomId/broadcasters', async (req, res, next) =>
  151. {
  152. const {
  153. id,
  154. displayName,
  155. device,
  156. rtpCapabilities
  157. } = req.body;
  158. try
  159. {
  160. const data = await req.room.createBroadcaster(
  161. {
  162. id,
  163. displayName,
  164. device,
  165. rtpCapabilities
  166. });
  167. res.status(200).json(data);
  168. }
  169. catch (error)
  170. {
  171. next(error);
  172. }
  173. });
  174. /**
  175. * DELETE API to delete a Broadcaster.
  176. */
  177. expressApp.delete(
  178. '/rooms/:roomId/broadcasters/:broadcasterId', (req, res) =>
  179. {
  180. const { broadcasterId } = req.params;
  181. req.room.deleteBroadcaster({ broadcasterId });
  182. res.status(200).send('broadcaster deleted');
  183. });
  184. /**
  185. * POST API to create a mediasoup Transport associated to a Broadcaster.
  186. * It can be a PlainTransport or a WebRtcTransport depending on the
  187. * type parameters in the body. There are also additional parameters for
  188. * PlainTransport.
  189. */
  190. expressApp.post(
  191. '/rooms/:roomId/broadcasters/:broadcasterId/transports',
  192. async (req, res, next) =>
  193. {
  194. const { broadcasterId } = req.params;
  195. const { type, rtcpMux, comedia, sctpCapabilities } = req.body;
  196. try
  197. {
  198. const data = await req.room.createBroadcasterTransport(
  199. {
  200. broadcasterId,
  201. type,
  202. rtcpMux,
  203. comedia,
  204. sctpCapabilities
  205. });
  206. res.status(200).json(data);
  207. }
  208. catch (error)
  209. {
  210. next(error);
  211. }
  212. });
  213. /**
  214. * POST API to connect a Transport belonging to a Broadcaster. Not needed
  215. * for PlainTransport if it was created with comedia option set to true.
  216. */
  217. expressApp.post(
  218. '/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/connect',
  219. async (req, res, next) =>
  220. {
  221. const { broadcasterId, transportId } = req.params;
  222. const { dtlsParameters } = req.body;
  223. try
  224. {
  225. const data = await req.room.connectBroadcasterTransport(
  226. {
  227. broadcasterId,
  228. transportId,
  229. dtlsParameters
  230. });
  231. res.status(200).json(data);
  232. }
  233. catch (error)
  234. {
  235. next(error);
  236. }
  237. });
  238. /**
  239. * POST API to create a mediasoup Producer associated to a Broadcaster.
  240. * The exact Transport in which the Producer must be created is signaled in
  241. * the URL path. Body parameters include kind and rtpParameters of the
  242. * Producer.
  243. */
  244. expressApp.post(
  245. '/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/producers',
  246. async (req, res, next) =>
  247. {
  248. const { broadcasterId, transportId } = req.params;
  249. const { kind, rtpParameters } = req.body;
  250. try
  251. {
  252. const data = await req.room.createBroadcasterProducer(
  253. {
  254. broadcasterId,
  255. transportId,
  256. kind,
  257. rtpParameters
  258. });
  259. res.status(200).json(data);
  260. }
  261. catch (error)
  262. {
  263. next(error);
  264. }
  265. });
  266. /**
  267. * POST API to create a mediasoup Consumer associated to a Broadcaster.
  268. * The exact Transport in which the Consumer must be created is signaled in
  269. * the URL path. Query parameters must include the desired producerId to
  270. * consume.
  271. */
  272. expressApp.post(
  273. '/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/consume',
  274. async (req, res, next) =>
  275. {
  276. const { broadcasterId, transportId } = req.params;
  277. const { producerId } = req.query;
  278. try
  279. {
  280. const data = await req.room.createBroadcasterConsumer(
  281. {
  282. broadcasterId,
  283. transportId,
  284. producerId
  285. });
  286. res.status(200).json(data);
  287. }
  288. catch (error)
  289. {
  290. next(error);
  291. }
  292. });
  293. /**
  294. * POST API to create a mediasoup DataConsumer associated to a Broadcaster.
  295. * The exact Transport in which the DataConsumer must be created is signaled in
  296. * the URL path. Query body must include the desired producerId to
  297. * consume.
  298. */
  299. expressApp.post(
  300. '/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/consume/data',
  301. async (req, res, next) =>
  302. {
  303. const { broadcasterId, transportId } = req.params;
  304. const { dataProducerId } = req.body;
  305. try
  306. {
  307. const data = await req.room.createBroadcasterDataConsumer(
  308. {
  309. broadcasterId,
  310. transportId,
  311. dataProducerId
  312. });
  313. res.status(200).json(data);
  314. }
  315. catch (error)
  316. {
  317. next(error);
  318. }
  319. });
  320. /**
  321. * POST API to create a mediasoup DataProducer associated to a Broadcaster.
  322. * The exact Transport in which the DataProducer must be created is signaled in
  323. */
  324. expressApp.post(
  325. '/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/produce/data',
  326. async (req, res, next) =>
  327. {
  328. const { broadcasterId, transportId } = req.params;
  329. const { label, protocol, sctpStreamParameters, appData } = req.body;
  330. try
  331. {
  332. const data = await req.room.createBroadcasterDataProducer(
  333. {
  334. broadcasterId,
  335. transportId,
  336. label,
  337. protocol,
  338. sctpStreamParameters,
  339. appData
  340. });
  341. res.status(200).json(data);
  342. }
  343. catch (error)
  344. {
  345. next(error);
  346. }
  347. });
  348. /**
  349. * Error handler.
  350. */
  351. expressApp.use(
  352. (error, req, res, next) =>
  353. {
  354. if (error)
  355. {
  356. logger.warn('Express app %s', String(error));
  357. error.status = error.status || (error.name === 'TypeError' ? 400 : 500);
  358. res.statusMessage = error.message;
  359. res.status(error.status).send(String(error));
  360. }
  361. else
  362. {
  363. next();
  364. }
  365. });
  366. }
  367. /**
  368. * Create a Node.js HTTPS server. It listens in the IP and port given in the
  369. * configuration file and reuses the Express application as request listener.
  370. */
  371. async function runHttpsServer()
  372. {
  373. logger.info('running an HTTPS server...');
  374. // HTTPS server for the protoo WebSocket server.
  375. const tls =
  376. {
  377. cert : fs.readFileSync(config.https.tls.cert),
  378. key : fs.readFileSync(config.https.tls.key)
  379. };
  380. httpsServer = https.createServer(tls, expressApp);
  381. await new Promise((resolve) =>
  382. {
  383. httpsServer.listen(
  384. Number(config.https.listenPort), config.https.listenIp, resolve);
  385. });
  386. }
  387. /**
  388. * Create a protoo WebSocketServer to allow WebSocket connections from browsers.
  389. */
  390. async function runProtooWebSocketServer()
  391. {
  392. logger.info('running protoo WebSocketServer...');
  393. // Create the protoo WebSocket server.
  394. protooWebSocketServer = new protoo.WebSocketServer(httpsServer,
  395. {
  396. maxReceivedFrameSize : 960000, // 960 KBytes.
  397. maxReceivedMessageSize : 960000,
  398. fragmentOutgoingMessages : true,
  399. fragmentationThreshold : 960000
  400. });
  401. // Handle connections from clients.
  402. protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
  403. {
  404. // The client indicates the roomId and peerId in the URL query.
  405. const u = url.parse(info.request.url, true);
  406. const roomId = u.query['roomId'];
  407. const peerId = u.query['peerId'];
  408. if (!roomId || !peerId)
  409. {
  410. reject(400, 'Connection request without roomId and/or peerId');
  411. return;
  412. }
  413. logger.info(
  414. 'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
  415. roomId, peerId, info.socket.remoteAddress, info.origin);
  416. // Serialize this code into the queue to avoid that two peers connecting at
  417. // the same time with the same roomId create two separate rooms with same
  418. // roomId.
  419. //如果有很多用户进来,则需要排队,一个个去检查是否要创建或者获取他们要加入的房间
  420. queue.push(async () =>
  421. {
  422. const room = await getOrCreateRoom({ roomId });
  423. // Accept the protoo WebSocket connection.
  424. const protooWebSocketTransport = accept();
  425. room.handleProtooConnection({ peerId, protooWebSocketTransport });
  426. })
  427. .catch((error) =>
  428. {
  429. logger.error('room creation or room joining failed:%o', error);
  430. reject(error);
  431. });
  432. });
  433. }
  434. /**
  435. * Get next mediasoup Worker.
  436. */
  437. function getMediasoupWorker()
  438. {
  439. const worker = mediasoupWorkers[nextMediasoupWorkerIdx];
  440. if (++nextMediasoupWorkerIdx === mediasoupWorkers.length)
  441. nextMediasoupWorkerIdx = 0;
  442. return worker;
  443. }
  444. /**
  445. * Get a Room instance (or create one if it does not exist).
  446. */
  447. async function getOrCreateRoom({ roomId })
  448. {
  449. let room = rooms.get(roomId);
  450. // If the Room does not exist create a new one.
  451. if (!room)
  452. {
  453. logger.info('creating a new Room [roomId:%s]', roomId);
  454. const mediasoupWorker = getMediasoupWorker();
  455. room = await Room.create({ mediasoupWorker, roomId });
  456. rooms.set(roomId, room);
  457. room.on('close', () => rooms.delete(roomId));
  458. }
  459. return room;
  460. }

14-7 剖析roomjs

图片.png
图片.png
图片.png
这些请求,在 Room.js里面的 _handleProtooRequest(peer, request, accept, reject) 函数中处理。

server/lib/Room.js

  1. const EventEmitter = require('events').EventEmitter;
  2. const protoo = require('protoo-server');
  3. const throttle = require('@sitespeed.io/throttle');
  4. const Logger = require('./Logger');
  5. const config = require('../config');
  6. const Bot = require('./Bot');
  7. const logger = new Logger('Room');
  8. /**
  9. * Room class.
  10. *
  11. * This is not a "mediasoup Room" by itself, by a custom class that holds
  12. * a protoo Room (for signaling with WebSocket clients) and a mediasoup Router
  13. * (for sending and receiving media to/from those WebSocket peers).
  14. */
  15. class Room extends EventEmitter
  16. {
  17. /**
  18. * Factory function that creates and returns Room instance.
  19. *
  20. * @async
  21. *
  22. * @param {mediasoup.Worker} mediasoupWorker - The mediasoup Worker in which a new
  23. * mediasoup Router must be created.
  24. * @param {String} roomId - Id of the Room instance.
  25. */
  26. static async create({ mediasoupWorker, roomId })
  27. {
  28. logger.info('create() [roomId:%s]', roomId);
  29. // Create a protoo Room instance.
  30. const protooRoom = new protoo.Room();
  31. // Router media codecs.
  32. //获取config.js里面的routerOptions选项设置
  33. const { mediaCodecs } = config.mediasoup.routerOptions;
  34. // Create a mediasoup Router.
  35. const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });
  36. // Create a mediasoup AudioLevelObserver.
  37. const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver(
  38. {
  39. maxEntries : 1,
  40. threshold : -80,
  41. interval : 800
  42. });
  43. const bot = await Bot.create({ mediasoupRouter });
  44. //传入构造函数参数
  45. return new Room(
  46. {
  47. roomId,
  48. protooRoom,
  49. mediasoupRouter,
  50. audioLevelObserver,
  51. bot
  52. });
  53. }
  54. constructor({ roomId, protooRoom, mediasoupRouter, audioLevelObserver, bot })
  55. {
  56. super();
  57. this.setMaxListeners(Infinity);
  58. // Room id.
  59. // @type {String}
  60. this._roomId = roomId;
  61. // Closed flag.
  62. // @type {Boolean}
  63. this._closed = false;
  64. // protoo Room instance.
  65. // @type {protoo.Room}
  66. this._protooRoom = protooRoom;
  67. // Map of broadcasters indexed by id. Each Object has:
  68. // - {String} id
  69. // - {Object} data
  70. // - {String} displayName
  71. // - {Object} device
  72. // - {RTCRtpCapabilities} rtpCapabilities
  73. // - {Map<String, mediasoup.Transport>} transports
  74. // - {Map<String, mediasoup.Producer>} producers
  75. // - {Map<String, mediasoup.Consumers>} consumers
  76. // - {Map<String, mediasoup.DataProducer>} dataProducers
  77. // - {Map<String, mediasoup.DataConsumers>} dataConsumers
  78. // @type {Map<String, Object>}
  79. this._broadcasters = new Map();
  80. // mediasoup Router instance.
  81. // @type {mediasoup.Router}
  82. this._mediasoupRouter = mediasoupRouter;
  83. // mediasoup AudioLevelObserver.
  84. // @type {mediasoup.AudioLevelObserver}
  85. this._audioLevelObserver = audioLevelObserver;
  86. // DataChannel bot.
  87. // @type {Bot}
  88. this._bot = bot;
  89. // Network throttled.
  90. // @type {Boolean}
  91. this._networkThrottled = false;
  92. // Handle audioLevelObserver.
  93. this._handleAudioLevelObserver();
  94. // For debugging.
  95. global.audioLevelObserver = this._audioLevelObserver;
  96. global.bot = this._bot;
  97. }
  98. /**
  99. * Closes the Room instance by closing the protoo Room and the mediasoup Router.
  100. */
  101. close()
  102. {
  103. logger.debug('close()');
  104. this._closed = true;
  105. // Close the protoo Room.
  106. this._protooRoom.close();
  107. // Close the mediasoup Router.
  108. this._mediasoupRouter.close();
  109. // Close the Bot.
  110. this._bot.close();
  111. // Emit 'close' event.
  112. this.emit('close');
  113. // Stop network throttling.
  114. if (this._networkThrottled)
  115. {
  116. throttle.stop({})
  117. .catch(() => {});
  118. }
  119. }
  120. logStatus()
  121. {
  122. logger.info(
  123. 'logStatus() [roomId:%s, protoo Peers:%s, mediasoup Transports:%s]',
  124. this._roomId,
  125. this._protooRoom.peers.length,
  126. this._mediasoupRouter._transports.size); // NOTE: Private API.
  127. }
  128. /**
  129. * Called from server.js upon a protoo WebSocket connection request from a
  130. * browser.
  131. *
  132. * @param {String} peerId - The id of the protoo peer to be created.
  133. * @param {Boolean} consume - Whether this peer wants to consume from others.
  134. * @param {protoo.WebSocketTransport} protooWebSocketTransport - The associated
  135. * protoo WebSocket transport.
  136. */
  137. handleProtooConnection({ peerId, consume, protooWebSocketTransport })
  138. {
  139. const existingPeer = this._protooRoom.getPeer(peerId);
  140. if (existingPeer)
  141. {
  142. logger.warn(
  143. 'handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]',
  144. peerId);
  145. existingPeer.close();
  146. }
  147. let peer;
  148. // Create a new protoo Peer with the given peerId.
  149. try
  150. {
  151. peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport);
  152. }
  153. catch (error)
  154. {
  155. logger.error('protooRoom.createPeer() failed:%o', error);
  156. }
  157. // Use the peer.data object to store mediasoup related objects.
  158. // Not joined after a custom protoo 'join' request is later received.
  159. peer.data.consume = consume;
  160. peer.data.joined = false;
  161. peer.data.displayName = undefined;
  162. peer.data.device = undefined;
  163. peer.data.rtpCapabilities = undefined;
  164. peer.data.sctpCapabilities = undefined;
  165. // Have mediasoup related maps ready even before the Peer joins since we
  166. // allow creating Transports before joining.
  167. peer.data.transports = new Map();
  168. peer.data.producers = new Map();
  169. peer.data.consumers = new Map();
  170. peer.data.dataProducers = new Map();
  171. peer.data.dataConsumers = new Map();
  172. peer.on('request', (request, accept, reject) =>
  173. {
  174. logger.debug(
  175. 'protoo Peer "request" event [method:%s, peerId:%s]',
  176. request.method, peer.id);
  177. this._handleProtooRequest(peer, request, accept, reject)
  178. .catch((error) =>
  179. {
  180. logger.error('request failed:%o', error);
  181. reject(error);
  182. });
  183. });
  184. peer.on('close', () =>
  185. {
  186. if (this._closed)
  187. return;
  188. logger.debug('protoo Peer "close" event [peerId:%s]', peer.id);
  189. // If the Peer was joined, notify all Peers.
  190. if (peer.data.joined)
  191. {
  192. for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
  193. {
  194. otherPeer.notify('peerClosed', { peerId: peer.id })
  195. .catch(() => {});
  196. }
  197. }
  198. // Iterate and close all mediasoup Transport associated to this Peer, so all
  199. // its Producers and Consumers will also be closed.
  200. for (const transport of peer.data.transports.values())
  201. {
  202. transport.close();
  203. }
  204. // If this is the latest Peer in the room, close the room.
  205. if (this._protooRoom.peers.length === 0)
  206. {
  207. logger.info(
  208. 'last Peer in the room left, closing the room [roomId:%s]',
  209. this._roomId);
  210. this.close();
  211. }
  212. });
  213. }
  214. getRouterRtpCapabilities()
  215. {
  216. return this._mediasoupRouter.rtpCapabilities;
  217. }
  218. /**
  219. * Create a Broadcaster. This is for HTTP API requests (see server.js).
  220. *
  221. * @async
  222. *
  223. * @type {String} id - Broadcaster id.
  224. * @type {String} displayName - Descriptive name.
  225. * @type {Object} [device] - Additional info with name, version and flags fields.
  226. * @type {RTCRtpCapabilities} [rtpCapabilities] - Device RTP capabilities.
  227. */
  228. async createBroadcaster({ id, displayName, device = {}, rtpCapabilities })
  229. {
  230. if (typeof id !== 'string' || !id)
  231. throw new TypeError('missing body.id');
  232. else if (typeof displayName !== 'string' || !displayName)
  233. throw new TypeError('missing body.displayName');
  234. else if (typeof device.name !== 'string' || !device.name)
  235. throw new TypeError('missing body.device.name');
  236. else if (rtpCapabilities && typeof rtpCapabilities !== 'object')
  237. throw new TypeError('wrong body.rtpCapabilities');
  238. if (this._broadcasters.has(id))
  239. throw new Error(`broadcaster with id "${id}" already exists`);
  240. const broadcaster =
  241. {
  242. id,
  243. data :
  244. {
  245. displayName,
  246. device :
  247. {
  248. flag : 'broadcaster',
  249. name : device.name || 'Unknown device',
  250. version : device.version
  251. },
  252. rtpCapabilities,
  253. transports : new Map(),
  254. producers : new Map(),
  255. consumers : new Map(),
  256. dataProducers : new Map(),
  257. dataConsumers : new Map()
  258. }
  259. };
  260. // Store the Broadcaster into the map.
  261. this._broadcasters.set(broadcaster.id, broadcaster);
  262. // Notify the new Broadcaster to all Peers.
  263. for (const otherPeer of this._getJoinedPeers())
  264. {
  265. otherPeer.notify(
  266. 'newPeer',
  267. {
  268. id : broadcaster.id,
  269. displayName : broadcaster.data.displayName,
  270. device : broadcaster.data.device
  271. })
  272. .catch(() => {});
  273. }
  274. // Reply with the list of Peers and their Producers.
  275. const peerInfos = [];
  276. const joinedPeers = this._getJoinedPeers();
  277. // Just fill the list of Peers if the Broadcaster provided its rtpCapabilities.
  278. if (rtpCapabilities)
  279. {
  280. for (const joinedPeer of joinedPeers)
  281. {
  282. const peerInfo =
  283. {
  284. id : joinedPeer.id,
  285. displayName : joinedPeer.data.displayName,
  286. device : joinedPeer.data.device,
  287. producers : []
  288. };
  289. for (const producer of joinedPeer.data.producers.values())
  290. {
  291. // Ignore Producers that the Broadcaster cannot consume.
  292. if (
  293. !this._mediasoupRouter.canConsume(
  294. {
  295. producerId : producer.id,
  296. rtpCapabilities
  297. })
  298. )
  299. {
  300. continue;
  301. }
  302. peerInfo.producers.push(
  303. {
  304. id : producer.id,
  305. kind : producer.kind
  306. });
  307. }
  308. peerInfos.push(peerInfo);
  309. }
  310. }
  311. return { peers: peerInfos };
  312. }
  313. /**
  314. * Delete a Broadcaster.
  315. *
  316. * @type {String} broadcasterId
  317. */
  318. deleteBroadcaster({ broadcasterId })
  319. {
  320. const broadcaster = this._broadcasters.get(broadcasterId);
  321. if (!broadcaster)
  322. throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
  323. for (const transport of broadcaster.data.transports.values())
  324. {
  325. transport.close();
  326. }
  327. this._broadcasters.delete(broadcasterId);
  328. for (const peer of this._getJoinedPeers())
  329. {
  330. peer.notify('peerClosed', { peerId: broadcasterId })
  331. .catch(() => {});
  332. }
  333. }
  334. /**
  335. * Create a mediasoup Transport associated to a Broadcaster. It can be a
  336. * PlainTransport or a WebRtcTransport.
  337. *
  338. * @async
  339. *
  340. * @type {String} broadcasterId
  341. * @type {String} type - Can be 'plain' (PlainTransport) or 'webrtc'
  342. * (WebRtcTransport).
  343. * @type {Boolean} [rtcpMux=false] - Just for PlainTransport, use RTCP mux.
  344. * @type {Boolean} [comedia=true] - Just for PlainTransport, enable remote IP:port
  345. * autodetection.
  346. * @type {Object} [sctpCapabilities] - SCTP capabilities
  347. */
  348. async createBroadcasterTransport(
  349. {
  350. broadcasterId,
  351. type,
  352. rtcpMux = false,
  353. comedia = true,
  354. sctpCapabilities
  355. })
  356. {
  357. const broadcaster = this._broadcasters.get(broadcasterId);
  358. if (!broadcaster)
  359. throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
  360. switch (type)
  361. {
  362. case 'webrtc':
  363. {
  364. const webRtcTransportOptions =
  365. {
  366. ...config.mediasoup.webRtcTransportOptions,
  367. enableSctp : Boolean(sctpCapabilities),
  368. numSctpStreams : (sctpCapabilities || {}).numStreams
  369. };
  370. const transport = await this._mediasoupRouter.createWebRtcTransport(
  371. webRtcTransportOptions);
  372. // Store it.
  373. broadcaster.data.transports.set(transport.id, transport);
  374. return {
  375. id : transport.id,
  376. iceParameters : transport.iceParameters,
  377. iceCandidates : transport.iceCandidates,
  378. dtlsParameters : transport.dtlsParameters,
  379. sctpParameters : transport.sctpParameters
  380. };
  381. }
  382. case 'plain':
  383. {
  384. const plainTransportOptions =
  385. {
  386. ...config.mediasoup.plainTransportOptions,
  387. rtcpMux : rtcpMux,
  388. comedia : comedia
  389. };
  390. const transport = await this._mediasoupRouter.createPlainTransport(
  391. plainTransportOptions);
  392. // Store it.
  393. broadcaster.data.transports.set(transport.id, transport);
  394. return {
  395. id : transport.id,
  396. ip : transport.tuple.localIp,
  397. port : transport.tuple.localPort,
  398. rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined
  399. };
  400. }
  401. default:
  402. {
  403. throw new TypeError('invalid type');
  404. }
  405. }
  406. }
  407. /**
  408. * Connect a Broadcaster mediasoup WebRtcTransport.
  409. *
  410. * @async
  411. *
  412. * @type {String} broadcasterId
  413. * @type {String} transportId
  414. * @type {RTCDtlsParameters} dtlsParameters - Remote DTLS parameters.
  415. */
  416. async connectBroadcasterTransport(
  417. {
  418. broadcasterId,
  419. transportId,
  420. dtlsParameters
  421. }
  422. )
  423. {
  424. const broadcaster = this._broadcasters.get(broadcasterId);
  425. if (!broadcaster)
  426. throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
  427. const transport = broadcaster.data.transports.get(transportId);
  428. if (!transport)
  429. throw new Error(`transport with id "${transportId}" does not exist`);
  430. if (transport.constructor.name !== 'WebRtcTransport')
  431. {
  432. throw new Error(
  433. `transport with id "${transportId}" is not a WebRtcTransport`);
  434. }
  435. await transport.connect({ dtlsParameters });
  436. }
  437. /**
  438. * Create a mediasoup Producer associated to a Broadcaster.
  439. *
  440. * @async
  441. *
  442. * @type {String} broadcasterId
  443. * @type {String} transportId
  444. * @type {String} kind - 'audio' or 'video' kind for the Producer.
  445. * @type {RTCRtpParameters} rtpParameters - RTP parameters for the Producer.
  446. */
  447. async createBroadcasterProducer(
  448. {
  449. broadcasterId,
  450. transportId,
  451. kind,
  452. rtpParameters
  453. }
  454. )
  455. {
  456. const broadcaster = this._broadcasters.get(broadcasterId);
  457. if (!broadcaster)
  458. throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
  459. const transport = broadcaster.data.transports.get(transportId);
  460. if (!transport)
  461. throw new Error(`transport with id "${transportId}" does not exist`);
  462. const producer =
  463. await transport.produce({ kind, rtpParameters });
  464. // Store it.
  465. broadcaster.data.producers.set(producer.id, producer);
  466. // Set Producer events.
  467. // producer.on('score', (score) =>
  468. // {
  469. // logger.debug(
  470. // 'broadcaster producer "score" event [producerId:%s, score:%o]',
  471. // producer.id, score);
  472. // });
  473. producer.on('videoorientationchange', (videoOrientation) =>
  474. {
  475. logger.debug(
  476. 'broadcaster producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
  477. producer.id, videoOrientation);
  478. });
  479. // Optimization: Create a server-side Consumer for each Peer.
  480. for (const peer of this._getJoinedPeers())
  481. {
  482. this._createConsumer(
  483. {
  484. consumerPeer : peer,
  485. producerPeer : broadcaster,
  486. producer
  487. });
  488. }
  489. // Add into the audioLevelObserver.
  490. if (producer.kind === 'audio')
  491. {
  492. this._audioLevelObserver.addProducer({ producerId: producer.id })
  493. .catch(() => {});
  494. }
  495. return { id: producer.id };
  496. }
  497. /**
  498. * Create a mediasoup Consumer associated to a Broadcaster.
  499. *
  500. * @async
  501. *
  502. * @type {String} broadcasterId
  503. * @type {String} transportId
  504. * @type {String} producerId
  505. */
  506. async createBroadcasterConsumer(
  507. {
  508. broadcasterId,
  509. transportId,
  510. producerId
  511. }
  512. )
  513. {
  514. const broadcaster = this._broadcasters.get(broadcasterId);
  515. if (!broadcaster)
  516. throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
  517. if (!broadcaster.data.rtpCapabilities)
  518. throw new Error('broadcaster does not have rtpCapabilities');
  519. const transport = broadcaster.data.transports.get(transportId);
  520. if (!transport)
  521. throw new Error(`transport with id "${transportId}" does not exist`);
  522. const consumer = await transport.consume(
  523. {
  524. producerId,
  525. rtpCapabilities : broadcaster.data.rtpCapabilities
  526. });
  527. // Store it.
  528. broadcaster.data.consumers.set(consumer.id, consumer);
  529. // Set Consumer events.
  530. consumer.on('transportclose', () =>
  531. {
  532. // Remove from its map.
  533. broadcaster.data.consumers.delete(consumer.id);
  534. });
  535. consumer.on('producerclose', () =>
  536. {
  537. // Remove from its map.
  538. broadcaster.data.consumers.delete(consumer.id);
  539. });
  540. return {
  541. id : consumer.id,
  542. producerId,
  543. kind : consumer.kind,
  544. rtpParameters : consumer.rtpParameters,
  545. type : consumer.type
  546. };
  547. }
  548. /**
  549. * Create a mediasoup DataConsumer associated to a Broadcaster.
  550. *
  551. * @async
  552. *
  553. * @type {String} broadcasterId
  554. * @type {String} transportId
  555. * @type {String} dataProducerId
  556. */
  557. async createBroadcasterDataConsumer(
  558. {
  559. broadcasterId,
  560. transportId,
  561. dataProducerId
  562. }
  563. )
  564. {
  565. const broadcaster = this._broadcasters.get(broadcasterId);
  566. if (!broadcaster)
  567. throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
  568. if (!broadcaster.data.rtpCapabilities)
  569. throw new Error('broadcaster does not have rtpCapabilities');
  570. const transport = broadcaster.data.transports.get(transportId);
  571. if (!transport)
  572. throw new Error(`transport with id "${transportId}" does not exist`);
  573. const dataConsumer = await transport.consumeData(
  574. {
  575. dataProducerId
  576. });
  577. // Store it.
  578. broadcaster.data.dataConsumers.set(dataConsumer.id, dataConsumer);
  579. // Set Consumer events.
  580. dataConsumer.on('transportclose', () =>
  581. {
  582. // Remove from its map.
  583. broadcaster.data.dataConsumers.delete(dataConsumer.id);
  584. });
  585. dataConsumer.on('dataproducerclose', () =>
  586. {
  587. // Remove from its map.
  588. broadcaster.data.dataConsumers.delete(dataConsumer.id);
  589. });
  590. return {
  591. id : dataConsumer.id
  592. };
  593. }
  594. /**
  595. * Create a mediasoup DataProducer associated to a Broadcaster.
  596. *
  597. * @async
  598. *
  599. * @type {String} broadcasterId
  600. * @type {String} transportId
  601. */
  602. async createBroadcasterDataProducer(
  603. {
  604. broadcasterId,
  605. transportId,
  606. label,
  607. protocol,
  608. sctpStreamParameters,
  609. appData
  610. }
  611. )
  612. {
  613. const broadcaster = this._broadcasters.get(broadcasterId);
  614. if (!broadcaster)
  615. throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
  616. // if (!broadcaster.data.sctpCapabilities)
  617. // throw new Error('broadcaster does not have sctpCapabilities');
  618. const transport = broadcaster.data.transports.get(transportId);
  619. if (!transport)
  620. throw new Error(`transport with id "${transportId}" does not exist`);
  621. const dataProducer = await transport.produceData(
  622. {
  623. sctpStreamParameters,
  624. label,
  625. protocol,
  626. appData
  627. });
  628. // Store it.
  629. broadcaster.data.dataProducers.set(dataProducer.id, dataProducer);
  630. // Set Consumer events.
  631. dataProducer.on('transportclose', () =>
  632. {
  633. // Remove from its map.
  634. broadcaster.data.dataProducers.delete(dataProducer.id);
  635. });
  636. // // Optimization: Create a server-side Consumer for each Peer.
  637. // for (const peer of this._getJoinedPeers())
  638. // {
  639. // this._createDataConsumer(
  640. // {
  641. // dataConsumerPeer : peer,
  642. // dataProducerPeer : broadcaster,
  643. // dataProducer: dataProducer
  644. // });
  645. // }
  646. return {
  647. id : dataProducer.id
  648. };
  649. }
  650. _handleAudioLevelObserver()
  651. {
  652. this._audioLevelObserver.on('volumes', (volumes) =>
  653. {
  654. const { producer, volume } = volumes[0];
  655. // logger.debug(
  656. // 'audioLevelObserver "volumes" event [producerId:%s, volume:%s]',
  657. // producer.id, volume);
  658. // Notify all Peers.
  659. for (const peer of this._getJoinedPeers())
  660. {
  661. peer.notify(
  662. 'activeSpeaker',
  663. {
  664. peerId : producer.appData.peerId,
  665. volume : volume
  666. })
  667. .catch(() => {});
  668. }
  669. });
  670. this._audioLevelObserver.on('silence', () =>
  671. {
  672. // logger.debug('audioLevelObserver "silence" event');
  673. // Notify all Peers.
  674. for (const peer of this._getJoinedPeers())
  675. {
  676. peer.notify('activeSpeaker', { peerId: null })
  677. .catch(() => {});
  678. }
  679. });
  680. }
  681. /**
  682. * Handle protoo requests from browsers.
  683. *
  684. * @async
  685. */
  686. async _handleProtooRequest(peer, request, accept, reject)
  687. {
  688. switch (request.method)
  689. {
  690. case 'getRouterRtpCapabilities':
  691. {
  692. accept(this._mediasoupRouter.rtpCapabilities);
  693. break;
  694. }
  695. case 'join':
  696. {
  697. // Ensure the Peer is not already joined.
  698. if (peer.data.joined)
  699. throw new Error('Peer already joined');
  700. const {
  701. displayName,
  702. device,
  703. rtpCapabilities,
  704. sctpCapabilities
  705. } = request.data;
  706. // Store client data into the protoo Peer data object.
  707. peer.data.joined = true;
  708. peer.data.displayName = displayName;
  709. peer.data.device = device;
  710. peer.data.rtpCapabilities = rtpCapabilities;
  711. peer.data.sctpCapabilities = sctpCapabilities;
  712. // Tell the new Peer about already joined Peers.
  713. // And also create Consumers for existing Producers.
  714. const joinedPeers =
  715. [
  716. ...this._getJoinedPeers(),
  717. ...this._broadcasters.values()
  718. ];
  719. // Reply now the request with the list of joined peers (all but the new one).
  720. const peerInfos = joinedPeers
  721. .filter((joinedPeer) => joinedPeer.id !== peer.id)
  722. .map((joinedPeer) => ({
  723. id : joinedPeer.id,
  724. displayName : joinedPeer.data.displayName,
  725. device : joinedPeer.data.device
  726. }));
  727. accept({ peers: peerInfos });
  728. // Mark the new Peer as joined.
  729. peer.data.joined = true;
  730. for (const joinedPeer of joinedPeers)
  731. {
  732. // Create Consumers for existing Producers.
  733. for (const producer of joinedPeer.data.producers.values())
  734. {
  735. this._createConsumer(
  736. {
  737. consumerPeer : peer,
  738. producerPeer : joinedPeer,
  739. producer
  740. });
  741. }
  742. // Create DataConsumers for existing DataProducers.
  743. for (const dataProducer of joinedPeer.data.dataProducers.values())
  744. {
  745. if (dataProducer.label === 'bot')
  746. continue;
  747. this._createDataConsumer(
  748. {
  749. dataConsumerPeer : peer,
  750. dataProducerPeer : joinedPeer,
  751. dataProducer
  752. });
  753. }
  754. }
  755. // Create DataConsumers for bot DataProducer.
  756. this._createDataConsumer(
  757. {
  758. dataConsumerPeer : peer,
  759. dataProducerPeer : null,
  760. dataProducer : this._bot.dataProducer
  761. });
  762. // Notify the new Peer to all other Peers.
  763. for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
  764. {
  765. otherPeer.notify(
  766. 'newPeer',
  767. {
  768. id : peer.id,
  769. displayName : peer.data.displayName,
  770. device : peer.data.device
  771. })
  772. .catch(() => {});
  773. }
  774. break;
  775. }
  776. case 'createWebRtcTransport':
  777. {
  778. // NOTE: Don't require that the Peer is joined here, so the client can
  779. // initiate mediasoup Transports and be ready when he later joins.
  780. const {
  781. forceTcp,
  782. producing,
  783. consuming,
  784. sctpCapabilities
  785. } = request.data;
  786. const webRtcTransportOptions =
  787. {
  788. ...config.mediasoup.webRtcTransportOptions,
  789. enableSctp : Boolean(sctpCapabilities),
  790. numSctpStreams : (sctpCapabilities || {}).numStreams,
  791. appData : { producing, consuming }
  792. };
  793. if (forceTcp)
  794. {
  795. webRtcTransportOptions.enableUdp = false;
  796. webRtcTransportOptions.enableTcp = true;
  797. }
  798. const transport = await this._mediasoupRouter.createWebRtcTransport(
  799. webRtcTransportOptions);
  800. transport.on('sctpstatechange', (sctpState) =>
  801. {
  802. logger.debug('WebRtcTransport "sctpstatechange" event [sctpState:%s]', sctpState);
  803. });
  804. transport.on('dtlsstatechange', (dtlsState) =>
  805. {
  806. if (dtlsState === 'failed' || dtlsState === 'closed')
  807. logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState);
  808. });
  809. // NOTE: For testing.
  810. // await transport.enableTraceEvent([ 'probation', 'bwe' ]);
  811. await transport.enableTraceEvent([ 'bwe' ]);
  812. transport.on('trace', (trace) =>
  813. {
  814. logger.debug(
  815. 'transport "trace" event [transportId:%s, trace.type:%s, trace:%o]',
  816. transport.id, trace.type, trace);
  817. if (trace.type === 'bwe' && trace.direction === 'out')
  818. {
  819. peer.notify(
  820. 'downlinkBwe',
  821. {
  822. desiredBitrate : trace.info.desiredBitrate,
  823. effectiveDesiredBitrate : trace.info.effectiveDesiredBitrate,
  824. availableBitrate : trace.info.availableBitrate
  825. })
  826. .catch(() => {});
  827. }
  828. });
  829. // Store the WebRtcTransport into the protoo Peer data Object.
  830. peer.data.transports.set(transport.id, transport);
  831. accept(
  832. {
  833. id : transport.id,
  834. iceParameters : transport.iceParameters,
  835. iceCandidates : transport.iceCandidates,
  836. dtlsParameters : transport.dtlsParameters,
  837. sctpParameters : transport.sctpParameters
  838. });
  839. const { maxIncomingBitrate } = config.mediasoup.webRtcTransportOptions;
  840. // If set, apply max incoming bitrate limit.
  841. if (maxIncomingBitrate)
  842. {
  843. try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); }
  844. catch (error) {}
  845. }
  846. break;
  847. }
  848. case 'connectWebRtcTransport':
  849. {
  850. const { transportId, dtlsParameters } = request.data;
  851. const transport = peer.data.transports.get(transportId);
  852. if (!transport)
  853. throw new Error(`transport with id "${transportId}" not found`);
  854. await transport.connect({ dtlsParameters });
  855. accept();
  856. break;
  857. }
  858. case 'restartIce':
  859. {
  860. const { transportId } = request.data;
  861. const transport = peer.data.transports.get(transportId);
  862. if (!transport)
  863. throw new Error(`transport with id "${transportId}" not found`);
  864. const iceParameters = await transport.restartIce();
  865. accept(iceParameters);
  866. break;
  867. }
  868. case 'produce':
  869. {
  870. // Ensure the Peer is joined.
  871. if (!peer.data.joined)
  872. throw new Error('Peer not yet joined');
  873. const { transportId, kind, rtpParameters } = request.data;
  874. let { appData } = request.data;
  875. const transport = peer.data.transports.get(transportId);
  876. if (!transport)
  877. throw new Error(`transport with id "${transportId}" not found`);
  878. // Add peerId into appData to later get the associated Peer during
  879. // the 'loudest' event of the audioLevelObserver.
  880. appData = { ...appData, peerId: peer.id };
  881. const producer = await transport.produce(
  882. {
  883. kind,
  884. rtpParameters,
  885. appData
  886. // keyFrameRequestDelay: 5000
  887. });
  888. // Store the Producer into the protoo Peer data Object.
  889. peer.data.producers.set(producer.id, producer);
  890. // Set Producer events.
  891. producer.on('score', (score) =>
  892. {
  893. // logger.debug(
  894. // 'producer "score" event [producerId:%s, score:%o]',
  895. // producer.id, score);
  896. peer.notify('producerScore', { producerId: producer.id, score })
  897. .catch(() => {});
  898. });
  899. producer.on('videoorientationchange', (videoOrientation) =>
  900. {
  901. logger.debug(
  902. 'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
  903. producer.id, videoOrientation);
  904. });
  905. // NOTE: For testing.
  906. // await producer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
  907. // await producer.enableTraceEvent([ 'pli', 'fir' ]);
  908. // await producer.enableTraceEvent([ 'keyframe' ]);
  909. producer.on('trace', (trace) =>
  910. {
  911. logger.debug(
  912. 'producer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
  913. producer.id, trace.type, trace);
  914. });
  915. accept({ id: producer.id });
  916. // Optimization: Create a server-side Consumer for each Peer.
  917. for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
  918. {
  919. this._createConsumer(
  920. {
  921. consumerPeer : otherPeer,
  922. producerPeer : peer,
  923. producer
  924. });
  925. }
  926. // Add into the audioLevelObserver.
  927. if (producer.kind === 'audio')
  928. {
  929. this._audioLevelObserver.addProducer({ producerId: producer.id })
  930. .catch(() => {});
  931. }
  932. break;
  933. }
  934. case 'closeProducer':
  935. {
  936. // Ensure the Peer is joined.
  937. if (!peer.data.joined)
  938. throw new Error('Peer not yet joined');
  939. const { producerId } = request.data;
  940. const producer = peer.data.producers.get(producerId);
  941. if (!producer)
  942. throw new Error(`producer with id "${producerId}" not found`);
  943. producer.close();
  944. // Remove from its map.
  945. peer.data.producers.delete(producer.id);
  946. accept();
  947. break;
  948. }
  949. case 'pauseProducer':
  950. {
  951. // Ensure the Peer is joined.
  952. if (!peer.data.joined)
  953. throw new Error('Peer not yet joined');
  954. const { producerId } = request.data;
  955. const producer = peer.data.producers.get(producerId);
  956. if (!producer)
  957. throw new Error(`producer with id "${producerId}" not found`);
  958. await producer.pause();
  959. accept();
  960. break;
  961. }
  962. case 'resumeProducer':
  963. {
  964. // Ensure the Peer is joined.
  965. if (!peer.data.joined)
  966. throw new Error('Peer not yet joined');
  967. const { producerId } = request.data;
  968. const producer = peer.data.producers.get(producerId);
  969. if (!producer)
  970. throw new Error(`producer with id "${producerId}" not found`);
  971. await producer.resume();
  972. accept();
  973. break;
  974. }
  975. case 'pauseConsumer':
  976. {
  977. // Ensure the Peer is joined.
  978. if (!peer.data.joined)
  979. throw new Error('Peer not yet joined');
  980. const { consumerId } = request.data;
  981. const consumer = peer.data.consumers.get(consumerId);
  982. if (!consumer)
  983. throw new Error(`consumer with id "${consumerId}" not found`);
  984. await consumer.pause();
  985. accept();
  986. break;
  987. }
  988. case 'resumeConsumer':
  989. {
  990. // Ensure the Peer is joined.
  991. if (!peer.data.joined)
  992. throw new Error('Peer not yet joined');
  993. const { consumerId } = request.data;
  994. const consumer = peer.data.consumers.get(consumerId);
  995. if (!consumer)
  996. throw new Error(`consumer with id "${consumerId}" not found`);
  997. await consumer.resume();
  998. accept();
  999. break;
  1000. }
  1001. case 'setConsumerPreferredLayers':
  1002. {
  1003. // Ensure the Peer is joined.
  1004. if (!peer.data.joined)
  1005. throw new Error('Peer not yet joined');
  1006. const { consumerId, spatialLayer, temporalLayer } = request.data;
  1007. const consumer = peer.data.consumers.get(consumerId);
  1008. if (!consumer)
  1009. throw new Error(`consumer with id "${consumerId}" not found`);
  1010. await consumer.setPreferredLayers({ spatialLayer, temporalLayer });
  1011. accept();
  1012. break;
  1013. }
  1014. case 'setConsumerPriority':
  1015. {
  1016. // Ensure the Peer is joined.
  1017. if (!peer.data.joined)
  1018. throw new Error('Peer not yet joined');
  1019. const { consumerId, priority } = request.data;
  1020. const consumer = peer.data.consumers.get(consumerId);
  1021. if (!consumer)
  1022. throw new Error(`consumer with id "${consumerId}" not found`);
  1023. await consumer.setPriority(priority);
  1024. accept();
  1025. break;
  1026. }
  1027. case 'requestConsumerKeyFrame':
  1028. {
  1029. // Ensure the Peer is joined.
  1030. if (!peer.data.joined)
  1031. throw new Error('Peer not yet joined');
  1032. const { consumerId } = request.data;
  1033. const consumer = peer.data.consumers.get(consumerId);
  1034. if (!consumer)
  1035. throw new Error(`consumer with id "${consumerId}" not found`);
  1036. await consumer.requestKeyFrame();
  1037. accept();
  1038. break;
  1039. }
  1040. case 'produceData':
  1041. {
  1042. // Ensure the Peer is joined.
  1043. if (!peer.data.joined)
  1044. throw new Error('Peer not yet joined');
  1045. const {
  1046. transportId,
  1047. sctpStreamParameters,
  1048. label,
  1049. protocol,
  1050. appData
  1051. } = request.data;
  1052. const transport = peer.data.transports.get(transportId);
  1053. if (!transport)
  1054. throw new Error(`transport with id "${transportId}" not found`);
  1055. const dataProducer = await transport.produceData(
  1056. {
  1057. sctpStreamParameters,
  1058. label,
  1059. protocol,
  1060. appData
  1061. });
  1062. // Store the Producer into the protoo Peer data Object.
  1063. peer.data.dataProducers.set(dataProducer.id, dataProducer);
  1064. accept({ id: dataProducer.id });
  1065. switch (dataProducer.label)
  1066. {
  1067. case 'chat':
  1068. {
  1069. // Create a server-side DataConsumer for each Peer.
  1070. for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
  1071. {
  1072. this._createDataConsumer(
  1073. {
  1074. dataConsumerPeer : otherPeer,
  1075. dataProducerPeer : peer,
  1076. dataProducer
  1077. });
  1078. }
  1079. break;
  1080. }
  1081. case 'bot':
  1082. {
  1083. // Pass it to the bot.
  1084. this._bot.handlePeerDataProducer(
  1085. {
  1086. dataProducerId : dataProducer.id,
  1087. peer
  1088. });
  1089. break;
  1090. }
  1091. }
  1092. break;
  1093. }
  1094. case 'changeDisplayName':
  1095. {
  1096. // Ensure the Peer is joined.
  1097. if (!peer.data.joined)
  1098. throw new Error('Peer not yet joined');
  1099. const { displayName } = request.data;
  1100. const oldDisplayName = peer.data.displayName;
  1101. // Store the display name into the custom data Object of the protoo
  1102. // Peer.
  1103. peer.data.displayName = displayName;
  1104. // Notify other joined Peers.
  1105. for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
  1106. {
  1107. otherPeer.notify(
  1108. 'peerDisplayNameChanged',
  1109. {
  1110. peerId : peer.id,
  1111. displayName,
  1112. oldDisplayName
  1113. })
  1114. .catch(() => {});
  1115. }
  1116. accept();
  1117. break;
  1118. }
  1119. case 'getTransportStats':
  1120. {
  1121. const { transportId } = request.data;
  1122. const transport = peer.data.transports.get(transportId);
  1123. if (!transport)
  1124. throw new Error(`transport with id "${transportId}" not found`);
  1125. const stats = await transport.getStats();
  1126. accept(stats);
  1127. break;
  1128. }
  1129. case 'getProducerStats':
  1130. {
  1131. const { producerId } = request.data;
  1132. const producer = peer.data.producers.get(producerId);
  1133. if (!producer)
  1134. throw new Error(`producer with id "${producerId}" not found`);
  1135. const stats = await producer.getStats();
  1136. accept(stats);
  1137. break;
  1138. }
  1139. case 'getConsumerStats':
  1140. {
  1141. const { consumerId } = request.data;
  1142. const consumer = peer.data.consumers.get(consumerId);
  1143. if (!consumer)
  1144. throw new Error(`consumer with id "${consumerId}" not found`);
  1145. const stats = await consumer.getStats();
  1146. accept(stats);
  1147. break;
  1148. }
  1149. case 'getDataProducerStats':
  1150. {
  1151. const { dataProducerId } = request.data;
  1152. const dataProducer = peer.data.dataProducers.get(dataProducerId);
  1153. if (!dataProducer)
  1154. throw new Error(`dataProducer with id "${dataProducerId}" not found`);
  1155. const stats = await dataProducer.getStats();
  1156. accept(stats);
  1157. break;
  1158. }
  1159. case 'getDataConsumerStats':
  1160. {
  1161. const { dataConsumerId } = request.data;
  1162. const dataConsumer = peer.data.dataConsumers.get(dataConsumerId);
  1163. if (!dataConsumer)
  1164. throw new Error(`dataConsumer with id "${dataConsumerId}" not found`);
  1165. const stats = await dataConsumer.getStats();
  1166. accept(stats);
  1167. break;
  1168. }
  1169. case 'applyNetworkThrottle':
  1170. {
  1171. const DefaultUplink = 1000000;
  1172. const DefaultDownlink = 1000000;
  1173. const DefaultRtt = 0;
  1174. const { uplink, downlink, rtt, secret } = request.data;
  1175. if (!secret || secret !== process.env.NETWORK_THROTTLE_SECRET)
  1176. {
  1177. reject(403, 'operation NOT allowed, modda fuckaa');
  1178. return;
  1179. }
  1180. try
  1181. {
  1182. await throttle.start(
  1183. {
  1184. up : uplink || DefaultUplink,
  1185. down : downlink || DefaultDownlink,
  1186. rtt : rtt || DefaultRtt
  1187. });
  1188. logger.warn(
  1189. 'network throttle set [uplink:%s, downlink:%s, rtt:%s]',
  1190. uplink || DefaultUplink,
  1191. downlink || DefaultDownlink,
  1192. rtt || DefaultRtt);
  1193. accept();
  1194. }
  1195. catch (error)
  1196. {
  1197. logger.error('network throttle apply failed: %o', error);
  1198. reject(500, error.toString());
  1199. }
  1200. break;
  1201. }
  1202. case 'resetNetworkThrottle':
  1203. {
  1204. const { secret } = request.data;
  1205. if (!secret || secret !== process.env.NETWORK_THROTTLE_SECRET)
  1206. {
  1207. reject(403, 'operation NOT allowed, modda fuckaa');
  1208. return;
  1209. }
  1210. try
  1211. {
  1212. await throttle.stop({});
  1213. logger.warn('network throttle stopped');
  1214. accept();
  1215. }
  1216. catch (error)
  1217. {
  1218. logger.error('network throttle stop failed: %o', error);
  1219. reject(500, error.toString());
  1220. }
  1221. break;
  1222. }
  1223. default:
  1224. {
  1225. logger.error('unknown request.method "%s"', request.method);
  1226. reject(500, `unknown request.method "${request.method}"`);
  1227. }
  1228. }
  1229. }
  1230. /**
  1231. * Helper to get the list of joined protoo peers.
  1232. */
  1233. _getJoinedPeers({ excludePeer = undefined } = {})
  1234. {
  1235. return this._protooRoom.peers
  1236. .filter((peer) => peer.data.joined && peer !== excludePeer);
  1237. }
  1238. /**
  1239. * Creates a mediasoup Consumer for the given mediasoup Producer.
  1240. *
  1241. * @async
  1242. */
  1243. async _createConsumer({ consumerPeer, producerPeer, producer })
  1244. {
  1245. // Optimization:
  1246. // - Create the server-side Consumer in paused mode.
  1247. // - Tell its Peer about it and wait for its response.
  1248. // - Upon receipt of the response, resume the server-side Consumer.
  1249. // - If video, this will mean a single key frame requested by the
  1250. // server-side Consumer (when resuming it).
  1251. // - If audio (or video), it will avoid that RTP packets are received by the
  1252. // remote endpoint *before* the Consumer is locally created in the endpoint
  1253. // (and before the local SDP O/A procedure ends). If that happens (RTP
  1254. // packets are received before the SDP O/A is done) the PeerConnection may
  1255. // fail to associate the RTP stream.
  1256. // NOTE: Don't create the Consumer if the remote Peer cannot consume it.
  1257. if (
  1258. !consumerPeer.data.rtpCapabilities ||
  1259. !this._mediasoupRouter.canConsume(
  1260. {
  1261. producerId : producer.id,
  1262. rtpCapabilities : consumerPeer.data.rtpCapabilities
  1263. })
  1264. )
  1265. {
  1266. return;
  1267. }
  1268. // Must take the Transport the remote Peer is using for consuming.
  1269. const transport = Array.from(consumerPeer.data.transports.values())
  1270. .find((t) => t.appData.consuming);
  1271. // This should not happen.
  1272. if (!transport)
  1273. {
  1274. logger.warn('_createConsumer() | Transport for consuming not found');
  1275. return;
  1276. }
  1277. // Create the Consumer in paused mode.
  1278. let consumer;
  1279. try
  1280. {
  1281. consumer = await transport.consume(
  1282. {
  1283. producerId : producer.id,
  1284. rtpCapabilities : consumerPeer.data.rtpCapabilities,
  1285. paused : true
  1286. });
  1287. }
  1288. catch (error)
  1289. {
  1290. logger.warn('_createConsumer() | transport.consume():%o', error);
  1291. return;
  1292. }
  1293. // Store the Consumer into the protoo consumerPeer data Object.
  1294. consumerPeer.data.consumers.set(consumer.id, consumer);
  1295. // Set Consumer events.
  1296. consumer.on('transportclose', () =>
  1297. {
  1298. // Remove from its map.
  1299. consumerPeer.data.consumers.delete(consumer.id);
  1300. });
  1301. consumer.on('producerclose', () =>
  1302. {
  1303. // Remove from its map.
  1304. consumerPeer.data.consumers.delete(consumer.id);
  1305. consumerPeer.notify('consumerClosed', { consumerId: consumer.id })
  1306. .catch(() => {});
  1307. });
  1308. consumer.on('producerpause', () =>
  1309. {
  1310. consumerPeer.notify('consumerPaused', { consumerId: consumer.id })
  1311. .catch(() => {});
  1312. });
  1313. consumer.on('producerresume', () =>
  1314. {
  1315. consumerPeer.notify('consumerResumed', { consumerId: consumer.id })
  1316. .catch(() => {});
  1317. });
  1318. consumer.on('score', (score) =>
  1319. {
  1320. // logger.debug(
  1321. // 'consumer "score" event [consumerId:%s, score:%o]',
  1322. // consumer.id, score);
  1323. consumerPeer.notify('consumerScore', { consumerId: consumer.id, score })
  1324. .catch(() => {});
  1325. });
  1326. consumer.on('layerschange', (layers) =>
  1327. {
  1328. consumerPeer.notify(
  1329. 'consumerLayersChanged',
  1330. {
  1331. consumerId : consumer.id,
  1332. spatialLayer : layers ? layers.spatialLayer : null,
  1333. temporalLayer : layers ? layers.temporalLayer : null
  1334. })
  1335. .catch(() => {});
  1336. });
  1337. // NOTE: For testing.
  1338. // await consumer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
  1339. // await consumer.enableTraceEvent([ 'pli', 'fir' ]);
  1340. // await consumer.enableTraceEvent([ 'keyframe' ]);
  1341. consumer.on('trace', (trace) =>
  1342. {
  1343. logger.debug(
  1344. 'consumer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
  1345. consumer.id, trace.type, trace);
  1346. });
  1347. // Send a protoo request to the remote Peer with Consumer parameters.
  1348. try
  1349. {
  1350. await consumerPeer.request(
  1351. 'newConsumer',
  1352. {
  1353. peerId : producerPeer.id,
  1354. producerId : producer.id,
  1355. id : consumer.id,
  1356. kind : consumer.kind,
  1357. rtpParameters : consumer.rtpParameters,
  1358. type : consumer.type,
  1359. appData : producer.appData,
  1360. producerPaused : consumer.producerPaused
  1361. });
  1362. // Now that we got the positive response from the remote endpoint, resume
  1363. // the Consumer so the remote endpoint will receive the a first RTP packet
  1364. // of this new stream once its PeerConnection is already ready to process
  1365. // and associate it.
  1366. await consumer.resume();
  1367. consumerPeer.notify(
  1368. 'consumerScore',
  1369. {
  1370. consumerId : consumer.id,
  1371. score : consumer.score
  1372. })
  1373. .catch(() => {});
  1374. }
  1375. catch (error)
  1376. {
  1377. logger.warn('_createConsumer() | failed:%o', error);
  1378. }
  1379. }
  1380. /**
  1381. * Creates a mediasoup DataConsumer for the given mediasoup DataProducer.
  1382. *
  1383. * @async
  1384. */
  1385. async _createDataConsumer(
  1386. {
  1387. dataConsumerPeer,
  1388. dataProducerPeer = null, // This is null for the bot DataProducer.
  1389. dataProducer
  1390. })
  1391. {
  1392. // NOTE: Don't create the DataConsumer if the remote Peer cannot consume it.
  1393. if (!dataConsumerPeer.data.sctpCapabilities)
  1394. return;
  1395. // Must take the Transport the remote Peer is using for consuming.
  1396. const transport = Array.from(dataConsumerPeer.data.transports.values())
  1397. .find((t) => t.appData.consuming);
  1398. // This should not happen.
  1399. if (!transport)
  1400. {
  1401. logger.warn('_createDataConsumer() | Transport for consuming not found');
  1402. return;
  1403. }
  1404. // Create the DataConsumer.
  1405. let dataConsumer;
  1406. try
  1407. {
  1408. dataConsumer = await transport.consumeData(
  1409. {
  1410. dataProducerId : dataProducer.id
  1411. });
  1412. }
  1413. catch (error)
  1414. {
  1415. logger.warn('_createDataConsumer() | transport.consumeData():%o', error);
  1416. return;
  1417. }
  1418. // Store the DataConsumer into the protoo dataConsumerPeer data Object.
  1419. dataConsumerPeer.data.dataConsumers.set(dataConsumer.id, dataConsumer);
  1420. // Set DataConsumer events.
  1421. dataConsumer.on('transportclose', () =>
  1422. {
  1423. // Remove from its map.
  1424. dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);
  1425. });
  1426. dataConsumer.on('dataproducerclose', () =>
  1427. {
  1428. // Remove from its map.
  1429. dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);
  1430. dataConsumerPeer.notify(
  1431. 'dataConsumerClosed', { dataConsumerId: dataConsumer.id })
  1432. .catch(() => {});
  1433. });
  1434. // Send a protoo request to the remote Peer with Consumer parameters.
  1435. try
  1436. {
  1437. await dataConsumerPeer.request(
  1438. 'newDataConsumer',
  1439. {
  1440. // This is null for bot DataProducer.
  1441. peerId : dataProducerPeer ? dataProducerPeer.id : null,
  1442. dataProducerId : dataProducer.id,
  1443. id : dataConsumer.id,
  1444. sctpStreamParameters : dataConsumer.sctpStreamParameters,
  1445. label : dataConsumer.label,
  1446. protocol : dataConsumer.protocol,
  1447. appData : dataProducer.appData
  1448. });
  1449. }
  1450. catch (error)
  1451. {
  1452. logger.warn('_createDataConsumer() | failed:%o', error);
  1453. }
  1454. }
  1455. }
  1456. module.exports = Room;

14-8 如何调试MediasoupDemo

本课程讲的是本机调试,也就是服务器和Chrome是在同一台PC上。
图片.png
图片.png

  1. node --inspect-brk server.js
  2. chrome://inspect

14-9 运行时查看Mediasoup的核心信息

图片.png
进行调试时,使用方法1.
查看线上运行状况,使用方法2.

方法1:

  1. cd server
  2. export INTERACTIVE=1;node server.js

此时会进入命令行模式图片.png看到帮助,可以知道,输入usage,可以查看进程占用CPU情况,
图片.png
输入t后,会进入terminal界面, 再单击Ctrl+C则回到cmd命令界面。

方法2

先运行server.js

  1. cd server
  2. node server.js &

如果是后台运行,则新建会话,再次进入server,否则直接运行node connnect.js

  1. cd server
  2. node connect.js

此时也会像方法1一样进入到cmd命令行模式。

可以启动客户端,然后加入两个用户,然后再在cmd命令行输入dw、dr等命令查看信息。