本章将讲解如何实现流媒体服务器的信令系统,以及信令与数据转发之间是如何进行配合的。一般信令系统都是整个系统的灵魂,知道了信令的运转就撑握了系统的脉落,这样就能很轻松的知道Mediasoup的运行逻辑了。
14-1 mediasoup-demo整体分析


aiortc: 使用asyncio 库实现的Webrtc 和ortc 库
14-2 JavaScript基本语法一
14-3 JavaScript基本语法二
https://www.yuque.com/caokunchao/pelvyg/dypyoo
14-4 JavaScriptES6高级特性

14-5 Promise与EventEmitter详解
14-6 剖析serverjs
const mediasoup = require(‘mediasoup’);
runMediasoupWorkers函数的mediasoup.createWorker函数,来自于mediasoup源码里面的index.js
下载mediasoup源码
git clone https://github.com/versatica/mediasoupcd mediasoupgrep "createWorker" * -Rn
添加注释后的server.js
#!/usr/bin/env node//进程名称process.title = 'mediasoup-demo-server';//设置DEBUG环境变量process.env.DEBUG = process.env.DEBUG || '*INFO* *WARN* *ERROR*';//引用config.js内容const config = require('./config');/* eslint-disable no-console */console.log('process.env.DEBUG:', process.env.DEBUG);console.log('config.js:\n%s', JSON.stringify(config, null, ' '));/* eslint-enable no-console *///引用需要的一些库const fs = require('fs');const https = require('https');const url = require('url');//提供websocket服务器const protoo = require('protoo-server');//mediasoup库,获取代码https://github.com/versatica/mediasoupconst mediasoup = require('mediasoup');const express = require('express');const bodyParser = require('body-parser');const { AwaitQueue } = require('awaitqueue');const Logger = require('./lib/Logger');//房间管理const Room = require('./lib/Room');const interactiveServer = require('./lib/interactiveServer');const interactiveClient = require('./lib/interactiveClient');const logger = new Logger();// Async queue to manage rooms.// @type {AwaitQueue}const queue = new AwaitQueue();// Map of Room instances indexed by roomId.// @type {Map<Number, Room>}const rooms = new Map();// HTTPS server.// @type {https.Server}let httpsServer;// Express application.// @type {Function}let expressApp;// Protoo WebSocket server.// @type {protoo.WebSocketServer}let protooWebSocketServer;// mediasoup Workers.// @type {Array<mediasoup.Worker>}const mediasoupWorkers = [];// Index of next mediasoup Worker to use.// @type {Number}let nextMediasoupWorkerIdx = 0;run();//核心函数,相当于main函数async function run(){// Open the interactive server.await interactiveServer();// Open the interactive client.if (process.env.INTERACTIVE === 'true' || process.env.INTERACTIVE === '1')await interactiveClient();// Run a mediasoup Worker.await runMediasoupWorkers();// Create Express app.await createExpressApp();// Run HTTPS server.await runHttpsServer();// Run a protoo WebSocketServer.//引进protoo库,创建WebSocketServerawait runProtooWebSocketServer();// Log rooms status every X seconds.setInterval(() =>{for (const room of rooms.values()){room.logStatus();}}, 120000);}/*** Launch as many mediasoup Workers as given in the configuration file.*/async function runMediasoupWorkers(){//获取config.js里面读取到的cpu核数const { numWorkers } = config.mediasoup;logger.info('running %d mediasoup Workers...', numWorkers);//创建numWorkers个Worker工作进程for (let i = 0; i < numWorkers; ++i){const worker = await mediasoup.createWorker({logLevel : config.mediasoup.workerSettings.logLevel,logTags : config.mediasoup.workerSettings.logTags,rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)});worker.on('died', () =>{logger.error('mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid);setTimeout(() => process.exit(1), 2000);});//每创建一个worker进程,都保存一下mediasoupWorkers.push(worker);// Log worker resource usage every X seconds.setInterval(async () =>{const usage = await worker.getResourceUsage();logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);}, 120000);}}/*** Create an Express based API server to manage Broadcaster requests.*/async function createExpressApp(){logger.info('creating Express app...');expressApp = express();expressApp.use(bodyParser.json());/*** For every API request, verify that the roomId in the path matches and* existing room.*/expressApp.param('roomId', (req, res, next, roomId) =>{// The room must exist for all API requests.if (!rooms.has(roomId)){const error = new Error(`room with id "${roomId}" not found`);error.status = 404;throw error;}req.room = rooms.get(roomId);next();});/*** API GET resource that returns the mediasoup Router RTP capabilities of* the room.*/expressApp.get('/rooms/:roomId', (req, res) =>{const data = req.room.getRouterRtpCapabilities();res.status(200).json(data);});/*** POST API to create a Broadcaster.*/expressApp.post('/rooms/:roomId/broadcasters', async (req, res, next) =>{const {id,displayName,device,rtpCapabilities} = req.body;try{const data = await req.room.createBroadcaster({id,displayName,device,rtpCapabilities});res.status(200).json(data);}catch (error){next(error);}});/*** DELETE API to delete a Broadcaster.*/expressApp.delete('/rooms/:roomId/broadcasters/:broadcasterId', (req, res) =>{const { broadcasterId } = req.params;req.room.deleteBroadcaster({ broadcasterId });res.status(200).send('broadcaster deleted');});/*** POST API to create a mediasoup Transport associated to a Broadcaster.* It can be a PlainTransport or a WebRtcTransport depending on the* type parameters in the body. There are also additional parameters for* PlainTransport.*/expressApp.post('/rooms/:roomId/broadcasters/:broadcasterId/transports',async (req, res, next) =>{const { broadcasterId } = req.params;const { type, rtcpMux, comedia, sctpCapabilities } = req.body;try{const data = await req.room.createBroadcasterTransport({broadcasterId,type,rtcpMux,comedia,sctpCapabilities});res.status(200).json(data);}catch (error){next(error);}});/*** POST API to connect a Transport belonging to a Broadcaster. Not needed* for PlainTransport if it was created with comedia option set to true.*/expressApp.post('/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/connect',async (req, res, next) =>{const { broadcasterId, transportId } = req.params;const { dtlsParameters } = req.body;try{const data = await req.room.connectBroadcasterTransport({broadcasterId,transportId,dtlsParameters});res.status(200).json(data);}catch (error){next(error);}});/*** POST API to create a mediasoup Producer associated to a Broadcaster.* The exact Transport in which the Producer must be created is signaled in* the URL path. Body parameters include kind and rtpParameters of the* Producer.*/expressApp.post('/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/producers',async (req, res, next) =>{const { broadcasterId, transportId } = req.params;const { kind, rtpParameters } = req.body;try{const data = await req.room.createBroadcasterProducer({broadcasterId,transportId,kind,rtpParameters});res.status(200).json(data);}catch (error){next(error);}});/*** POST API to create a mediasoup Consumer associated to a Broadcaster.* The exact Transport in which the Consumer must be created is signaled in* the URL path. Query parameters must include the desired producerId to* consume.*/expressApp.post('/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/consume',async (req, res, next) =>{const { broadcasterId, transportId } = req.params;const { producerId } = req.query;try{const data = await req.room.createBroadcasterConsumer({broadcasterId,transportId,producerId});res.status(200).json(data);}catch (error){next(error);}});/*** POST API to create a mediasoup DataConsumer associated to a Broadcaster.* The exact Transport in which the DataConsumer must be created is signaled in* the URL path. Query body must include the desired producerId to* consume.*/expressApp.post('/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/consume/data',async (req, res, next) =>{const { broadcasterId, transportId } = req.params;const { dataProducerId } = req.body;try{const data = await req.room.createBroadcasterDataConsumer({broadcasterId,transportId,dataProducerId});res.status(200).json(data);}catch (error){next(error);}});/*** POST API to create a mediasoup DataProducer associated to a Broadcaster.* The exact Transport in which the DataProducer must be created is signaled in*/expressApp.post('/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/produce/data',async (req, res, next) =>{const { broadcasterId, transportId } = req.params;const { label, protocol, sctpStreamParameters, appData } = req.body;try{const data = await req.room.createBroadcasterDataProducer({broadcasterId,transportId,label,protocol,sctpStreamParameters,appData});res.status(200).json(data);}catch (error){next(error);}});/*** Error handler.*/expressApp.use((error, req, res, next) =>{if (error){logger.warn('Express app %s', String(error));error.status = error.status || (error.name === 'TypeError' ? 400 : 500);res.statusMessage = error.message;res.status(error.status).send(String(error));}else{next();}});}/*** Create a Node.js HTTPS server. It listens in the IP and port given in the* configuration file and reuses the Express application as request listener.*/async function runHttpsServer(){logger.info('running an HTTPS server...');// HTTPS server for the protoo WebSocket server.const tls ={cert : fs.readFileSync(config.https.tls.cert),key : fs.readFileSync(config.https.tls.key)};httpsServer = https.createServer(tls, expressApp);await new Promise((resolve) =>{httpsServer.listen(Number(config.https.listenPort), config.https.listenIp, resolve);});}/*** Create a protoo WebSocketServer to allow WebSocket connections from browsers.*/async function runProtooWebSocketServer(){logger.info('running protoo WebSocketServer...');// Create the protoo WebSocket server.protooWebSocketServer = new protoo.WebSocketServer(httpsServer,{maxReceivedFrameSize : 960000, // 960 KBytes.maxReceivedMessageSize : 960000,fragmentOutgoingMessages : true,fragmentationThreshold : 960000});// Handle connections from clients.protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>{// The client indicates the roomId and peerId in the URL query.const u = url.parse(info.request.url, true);const roomId = u.query['roomId'];const peerId = u.query['peerId'];if (!roomId || !peerId){reject(400, 'Connection request without roomId and/or peerId');return;}logger.info('protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',roomId, peerId, info.socket.remoteAddress, info.origin);// Serialize this code into the queue to avoid that two peers connecting at// the same time with the same roomId create two separate rooms with same// roomId.//如果有很多用户进来,则需要排队,一个个去检查是否要创建或者获取他们要加入的房间queue.push(async () =>{const room = await getOrCreateRoom({ roomId });// Accept the protoo WebSocket connection.const protooWebSocketTransport = accept();room.handleProtooConnection({ peerId, protooWebSocketTransport });}).catch((error) =>{logger.error('room creation or room joining failed:%o', error);reject(error);});});}/*** Get next mediasoup Worker.*/function getMediasoupWorker(){const worker = mediasoupWorkers[nextMediasoupWorkerIdx];if (++nextMediasoupWorkerIdx === mediasoupWorkers.length)nextMediasoupWorkerIdx = 0;return worker;}/*** Get a Room instance (or create one if it does not exist).*/async function getOrCreateRoom({ roomId }){let room = rooms.get(roomId);// If the Room does not exist create a new one.if (!room){logger.info('creating a new Room [roomId:%s]', roomId);const mediasoupWorker = getMediasoupWorker();room = await Room.create({ mediasoupWorker, roomId });rooms.set(roomId, room);room.on('close', () => rooms.delete(roomId));}return room;}
14-7 剖析roomjs



这些请求,在 Room.js里面的 _handleProtooRequest(peer, request, accept, reject) 函数中处理。
server/lib/Room.js
const EventEmitter = require('events').EventEmitter;const protoo = require('protoo-server');const throttle = require('@sitespeed.io/throttle');const Logger = require('./Logger');const config = require('../config');const Bot = require('./Bot');const logger = new Logger('Room');/*** Room class.** This is not a "mediasoup Room" by itself, by a custom class that holds* a protoo Room (for signaling with WebSocket clients) and a mediasoup Router* (for sending and receiving media to/from those WebSocket peers).*/class Room extends EventEmitter{/*** Factory function that creates and returns Room instance.** @async** @param {mediasoup.Worker} mediasoupWorker - The mediasoup Worker in which a new* mediasoup Router must be created.* @param {String} roomId - Id of the Room instance.*/static async create({ mediasoupWorker, roomId }){logger.info('create() [roomId:%s]', roomId);// Create a protoo Room instance.const protooRoom = new protoo.Room();// Router media codecs.//获取config.js里面的routerOptions选项设置const { mediaCodecs } = config.mediasoup.routerOptions;// Create a mediasoup Router.const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });// Create a mediasoup AudioLevelObserver.const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver({maxEntries : 1,threshold : -80,interval : 800});const bot = await Bot.create({ mediasoupRouter });//传入构造函数参数return new Room({roomId,protooRoom,mediasoupRouter,audioLevelObserver,bot});}constructor({ roomId, protooRoom, mediasoupRouter, audioLevelObserver, bot }){super();this.setMaxListeners(Infinity);// Room id.// @type {String}this._roomId = roomId;// Closed flag.// @type {Boolean}this._closed = false;// protoo Room instance.// @type {protoo.Room}this._protooRoom = protooRoom;// Map of broadcasters indexed by id. Each Object has:// - {String} id// - {Object} data// - {String} displayName// - {Object} device// - {RTCRtpCapabilities} rtpCapabilities// - {Map<String, mediasoup.Transport>} transports// - {Map<String, mediasoup.Producer>} producers// - {Map<String, mediasoup.Consumers>} consumers// - {Map<String, mediasoup.DataProducer>} dataProducers// - {Map<String, mediasoup.DataConsumers>} dataConsumers// @type {Map<String, Object>}this._broadcasters = new Map();// mediasoup Router instance.// @type {mediasoup.Router}this._mediasoupRouter = mediasoupRouter;// mediasoup AudioLevelObserver.// @type {mediasoup.AudioLevelObserver}this._audioLevelObserver = audioLevelObserver;// DataChannel bot.// @type {Bot}this._bot = bot;// Network throttled.// @type {Boolean}this._networkThrottled = false;// Handle audioLevelObserver.this._handleAudioLevelObserver();// For debugging.global.audioLevelObserver = this._audioLevelObserver;global.bot = this._bot;}/*** Closes the Room instance by closing the protoo Room and the mediasoup Router.*/close(){logger.debug('close()');this._closed = true;// Close the protoo Room.this._protooRoom.close();// Close the mediasoup Router.this._mediasoupRouter.close();// Close the Bot.this._bot.close();// Emit 'close' event.this.emit('close');// Stop network throttling.if (this._networkThrottled){throttle.stop({}).catch(() => {});}}logStatus(){logger.info('logStatus() [roomId:%s, protoo Peers:%s, mediasoup Transports:%s]',this._roomId,this._protooRoom.peers.length,this._mediasoupRouter._transports.size); // NOTE: Private API.}/*** Called from server.js upon a protoo WebSocket connection request from a* browser.** @param {String} peerId - The id of the protoo peer to be created.* @param {Boolean} consume - Whether this peer wants to consume from others.* @param {protoo.WebSocketTransport} protooWebSocketTransport - The associated* protoo WebSocket transport.*/handleProtooConnection({ peerId, consume, protooWebSocketTransport }){const existingPeer = this._protooRoom.getPeer(peerId);if (existingPeer){logger.warn('handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]',peerId);existingPeer.close();}let peer;// Create a new protoo Peer with the given peerId.try{peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport);}catch (error){logger.error('protooRoom.createPeer() failed:%o', error);}// Use the peer.data object to store mediasoup related objects.// Not joined after a custom protoo 'join' request is later received.peer.data.consume = consume;peer.data.joined = false;peer.data.displayName = undefined;peer.data.device = undefined;peer.data.rtpCapabilities = undefined;peer.data.sctpCapabilities = undefined;// Have mediasoup related maps ready even before the Peer joins since we// allow creating Transports before joining.peer.data.transports = new Map();peer.data.producers = new Map();peer.data.consumers = new Map();peer.data.dataProducers = new Map();peer.data.dataConsumers = new Map();peer.on('request', (request, accept, reject) =>{logger.debug('protoo Peer "request" event [method:%s, peerId:%s]',request.method, peer.id);this._handleProtooRequest(peer, request, accept, reject).catch((error) =>{logger.error('request failed:%o', error);reject(error);});});peer.on('close', () =>{if (this._closed)return;logger.debug('protoo Peer "close" event [peerId:%s]', peer.id);// If the Peer was joined, notify all Peers.if (peer.data.joined){for (const otherPeer of this._getJoinedPeers({ excludePeer: peer })){otherPeer.notify('peerClosed', { peerId: peer.id }).catch(() => {});}}// Iterate and close all mediasoup Transport associated to this Peer, so all// its Producers and Consumers will also be closed.for (const transport of peer.data.transports.values()){transport.close();}// If this is the latest Peer in the room, close the room.if (this._protooRoom.peers.length === 0){logger.info('last Peer in the room left, closing the room [roomId:%s]',this._roomId);this.close();}});}getRouterRtpCapabilities(){return this._mediasoupRouter.rtpCapabilities;}/*** Create a Broadcaster. This is for HTTP API requests (see server.js).** @async** @type {String} id - Broadcaster id.* @type {String} displayName - Descriptive name.* @type {Object} [device] - Additional info with name, version and flags fields.* @type {RTCRtpCapabilities} [rtpCapabilities] - Device RTP capabilities.*/async createBroadcaster({ id, displayName, device = {}, rtpCapabilities }){if (typeof id !== 'string' || !id)throw new TypeError('missing body.id');else if (typeof displayName !== 'string' || !displayName)throw new TypeError('missing body.displayName');else if (typeof device.name !== 'string' || !device.name)throw new TypeError('missing body.device.name');else if (rtpCapabilities && typeof rtpCapabilities !== 'object')throw new TypeError('wrong body.rtpCapabilities');if (this._broadcasters.has(id))throw new Error(`broadcaster with id "${id}" already exists`);const broadcaster ={id,data :{displayName,device :{flag : 'broadcaster',name : device.name || 'Unknown device',version : device.version},rtpCapabilities,transports : new Map(),producers : new Map(),consumers : new Map(),dataProducers : new Map(),dataConsumers : new Map()}};// Store the Broadcaster into the map.this._broadcasters.set(broadcaster.id, broadcaster);// Notify the new Broadcaster to all Peers.for (const otherPeer of this._getJoinedPeers()){otherPeer.notify('newPeer',{id : broadcaster.id,displayName : broadcaster.data.displayName,device : broadcaster.data.device}).catch(() => {});}// Reply with the list of Peers and their Producers.const peerInfos = [];const joinedPeers = this._getJoinedPeers();// Just fill the list of Peers if the Broadcaster provided its rtpCapabilities.if (rtpCapabilities){for (const joinedPeer of joinedPeers){const peerInfo ={id : joinedPeer.id,displayName : joinedPeer.data.displayName,device : joinedPeer.data.device,producers : []};for (const producer of joinedPeer.data.producers.values()){// Ignore Producers that the Broadcaster cannot consume.if (!this._mediasoupRouter.canConsume({producerId : producer.id,rtpCapabilities})){continue;}peerInfo.producers.push({id : producer.id,kind : producer.kind});}peerInfos.push(peerInfo);}}return { peers: peerInfos };}/*** Delete a Broadcaster.** @type {String} broadcasterId*/deleteBroadcaster({ broadcasterId }){const broadcaster = this._broadcasters.get(broadcasterId);if (!broadcaster)throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);for (const transport of broadcaster.data.transports.values()){transport.close();}this._broadcasters.delete(broadcasterId);for (const peer of this._getJoinedPeers()){peer.notify('peerClosed', { peerId: broadcasterId }).catch(() => {});}}/*** Create a mediasoup Transport associated to a Broadcaster. It can be a* PlainTransport or a WebRtcTransport.** @async** @type {String} broadcasterId* @type {String} type - Can be 'plain' (PlainTransport) or 'webrtc'* (WebRtcTransport).* @type {Boolean} [rtcpMux=false] - Just for PlainTransport, use RTCP mux.* @type {Boolean} [comedia=true] - Just for PlainTransport, enable remote IP:port* autodetection.* @type {Object} [sctpCapabilities] - SCTP capabilities*/async createBroadcasterTransport({broadcasterId,type,rtcpMux = false,comedia = true,sctpCapabilities}){const broadcaster = this._broadcasters.get(broadcasterId);if (!broadcaster)throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);switch (type){case 'webrtc':{const webRtcTransportOptions ={...config.mediasoup.webRtcTransportOptions,enableSctp : Boolean(sctpCapabilities),numSctpStreams : (sctpCapabilities || {}).numStreams};const transport = await this._mediasoupRouter.createWebRtcTransport(webRtcTransportOptions);// Store it.broadcaster.data.transports.set(transport.id, transport);return {id : transport.id,iceParameters : transport.iceParameters,iceCandidates : transport.iceCandidates,dtlsParameters : transport.dtlsParameters,sctpParameters : transport.sctpParameters};}case 'plain':{const plainTransportOptions ={...config.mediasoup.plainTransportOptions,rtcpMux : rtcpMux,comedia : comedia};const transport = await this._mediasoupRouter.createPlainTransport(plainTransportOptions);// Store it.broadcaster.data.transports.set(transport.id, transport);return {id : transport.id,ip : transport.tuple.localIp,port : transport.tuple.localPort,rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined};}default:{throw new TypeError('invalid type');}}}/*** Connect a Broadcaster mediasoup WebRtcTransport.** @async** @type {String} broadcasterId* @type {String} transportId* @type {RTCDtlsParameters} dtlsParameters - Remote DTLS parameters.*/async connectBroadcasterTransport({broadcasterId,transportId,dtlsParameters}){const broadcaster = this._broadcasters.get(broadcasterId);if (!broadcaster)throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);const transport = broadcaster.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" does not exist`);if (transport.constructor.name !== 'WebRtcTransport'){throw new Error(`transport with id "${transportId}" is not a WebRtcTransport`);}await transport.connect({ dtlsParameters });}/*** Create a mediasoup Producer associated to a Broadcaster.** @async** @type {String} broadcasterId* @type {String} transportId* @type {String} kind - 'audio' or 'video' kind for the Producer.* @type {RTCRtpParameters} rtpParameters - RTP parameters for the Producer.*/async createBroadcasterProducer({broadcasterId,transportId,kind,rtpParameters}){const broadcaster = this._broadcasters.get(broadcasterId);if (!broadcaster)throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);const transport = broadcaster.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" does not exist`);const producer =await transport.produce({ kind, rtpParameters });// Store it.broadcaster.data.producers.set(producer.id, producer);// Set Producer events.// producer.on('score', (score) =>// {// logger.debug(// 'broadcaster producer "score" event [producerId:%s, score:%o]',// producer.id, score);// });producer.on('videoorientationchange', (videoOrientation) =>{logger.debug('broadcaster producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',producer.id, videoOrientation);});// Optimization: Create a server-side Consumer for each Peer.for (const peer of this._getJoinedPeers()){this._createConsumer({consumerPeer : peer,producerPeer : broadcaster,producer});}// Add into the audioLevelObserver.if (producer.kind === 'audio'){this._audioLevelObserver.addProducer({ producerId: producer.id }).catch(() => {});}return { id: producer.id };}/*** Create a mediasoup Consumer associated to a Broadcaster.** @async** @type {String} broadcasterId* @type {String} transportId* @type {String} producerId*/async createBroadcasterConsumer({broadcasterId,transportId,producerId}){const broadcaster = this._broadcasters.get(broadcasterId);if (!broadcaster)throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);if (!broadcaster.data.rtpCapabilities)throw new Error('broadcaster does not have rtpCapabilities');const transport = broadcaster.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" does not exist`);const consumer = await transport.consume({producerId,rtpCapabilities : broadcaster.data.rtpCapabilities});// Store it.broadcaster.data.consumers.set(consumer.id, consumer);// Set Consumer events.consumer.on('transportclose', () =>{// Remove from its map.broadcaster.data.consumers.delete(consumer.id);});consumer.on('producerclose', () =>{// Remove from its map.broadcaster.data.consumers.delete(consumer.id);});return {id : consumer.id,producerId,kind : consumer.kind,rtpParameters : consumer.rtpParameters,type : consumer.type};}/*** Create a mediasoup DataConsumer associated to a Broadcaster.** @async** @type {String} broadcasterId* @type {String} transportId* @type {String} dataProducerId*/async createBroadcasterDataConsumer({broadcasterId,transportId,dataProducerId}){const broadcaster = this._broadcasters.get(broadcasterId);if (!broadcaster)throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);if (!broadcaster.data.rtpCapabilities)throw new Error('broadcaster does not have rtpCapabilities');const transport = broadcaster.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" does not exist`);const dataConsumer = await transport.consumeData({dataProducerId});// Store it.broadcaster.data.dataConsumers.set(dataConsumer.id, dataConsumer);// Set Consumer events.dataConsumer.on('transportclose', () =>{// Remove from its map.broadcaster.data.dataConsumers.delete(dataConsumer.id);});dataConsumer.on('dataproducerclose', () =>{// Remove from its map.broadcaster.data.dataConsumers.delete(dataConsumer.id);});return {id : dataConsumer.id};}/*** Create a mediasoup DataProducer associated to a Broadcaster.** @async** @type {String} broadcasterId* @type {String} transportId*/async createBroadcasterDataProducer({broadcasterId,transportId,label,protocol,sctpStreamParameters,appData}){const broadcaster = this._broadcasters.get(broadcasterId);if (!broadcaster)throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);// if (!broadcaster.data.sctpCapabilities)// throw new Error('broadcaster does not have sctpCapabilities');const transport = broadcaster.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" does not exist`);const dataProducer = await transport.produceData({sctpStreamParameters,label,protocol,appData});// Store it.broadcaster.data.dataProducers.set(dataProducer.id, dataProducer);// Set Consumer events.dataProducer.on('transportclose', () =>{// Remove from its map.broadcaster.data.dataProducers.delete(dataProducer.id);});// // Optimization: Create a server-side Consumer for each Peer.// for (const peer of this._getJoinedPeers())// {// this._createDataConsumer(// {// dataConsumerPeer : peer,// dataProducerPeer : broadcaster,// dataProducer: dataProducer// });// }return {id : dataProducer.id};}_handleAudioLevelObserver(){this._audioLevelObserver.on('volumes', (volumes) =>{const { producer, volume } = volumes[0];// logger.debug(// 'audioLevelObserver "volumes" event [producerId:%s, volume:%s]',// producer.id, volume);// Notify all Peers.for (const peer of this._getJoinedPeers()){peer.notify('activeSpeaker',{peerId : producer.appData.peerId,volume : volume}).catch(() => {});}});this._audioLevelObserver.on('silence', () =>{// logger.debug('audioLevelObserver "silence" event');// Notify all Peers.for (const peer of this._getJoinedPeers()){peer.notify('activeSpeaker', { peerId: null }).catch(() => {});}});}/*** Handle protoo requests from browsers.** @async*/async _handleProtooRequest(peer, request, accept, reject){switch (request.method){case 'getRouterRtpCapabilities':{accept(this._mediasoupRouter.rtpCapabilities);break;}case 'join':{// Ensure the Peer is not already joined.if (peer.data.joined)throw new Error('Peer already joined');const {displayName,device,rtpCapabilities,sctpCapabilities} = request.data;// Store client data into the protoo Peer data object.peer.data.joined = true;peer.data.displayName = displayName;peer.data.device = device;peer.data.rtpCapabilities = rtpCapabilities;peer.data.sctpCapabilities = sctpCapabilities;// Tell the new Peer about already joined Peers.// And also create Consumers for existing Producers.const joinedPeers =[...this._getJoinedPeers(),...this._broadcasters.values()];// Reply now the request with the list of joined peers (all but the new one).const peerInfos = joinedPeers.filter((joinedPeer) => joinedPeer.id !== peer.id).map((joinedPeer) => ({id : joinedPeer.id,displayName : joinedPeer.data.displayName,device : joinedPeer.data.device}));accept({ peers: peerInfos });// Mark the new Peer as joined.peer.data.joined = true;for (const joinedPeer of joinedPeers){// Create Consumers for existing Producers.for (const producer of joinedPeer.data.producers.values()){this._createConsumer({consumerPeer : peer,producerPeer : joinedPeer,producer});}// Create DataConsumers for existing DataProducers.for (const dataProducer of joinedPeer.data.dataProducers.values()){if (dataProducer.label === 'bot')continue;this._createDataConsumer({dataConsumerPeer : peer,dataProducerPeer : joinedPeer,dataProducer});}}// Create DataConsumers for bot DataProducer.this._createDataConsumer({dataConsumerPeer : peer,dataProducerPeer : null,dataProducer : this._bot.dataProducer});// Notify the new Peer to all other Peers.for (const otherPeer of this._getJoinedPeers({ excludePeer: peer })){otherPeer.notify('newPeer',{id : peer.id,displayName : peer.data.displayName,device : peer.data.device}).catch(() => {});}break;}case 'createWebRtcTransport':{// NOTE: Don't require that the Peer is joined here, so the client can// initiate mediasoup Transports and be ready when he later joins.const {forceTcp,producing,consuming,sctpCapabilities} = request.data;const webRtcTransportOptions ={...config.mediasoup.webRtcTransportOptions,enableSctp : Boolean(sctpCapabilities),numSctpStreams : (sctpCapabilities || {}).numStreams,appData : { producing, consuming }};if (forceTcp){webRtcTransportOptions.enableUdp = false;webRtcTransportOptions.enableTcp = true;}const transport = await this._mediasoupRouter.createWebRtcTransport(webRtcTransportOptions);transport.on('sctpstatechange', (sctpState) =>{logger.debug('WebRtcTransport "sctpstatechange" event [sctpState:%s]', sctpState);});transport.on('dtlsstatechange', (dtlsState) =>{if (dtlsState === 'failed' || dtlsState === 'closed')logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState);});// NOTE: For testing.// await transport.enableTraceEvent([ 'probation', 'bwe' ]);await transport.enableTraceEvent([ 'bwe' ]);transport.on('trace', (trace) =>{logger.debug('transport "trace" event [transportId:%s, trace.type:%s, trace:%o]',transport.id, trace.type, trace);if (trace.type === 'bwe' && trace.direction === 'out'){peer.notify('downlinkBwe',{desiredBitrate : trace.info.desiredBitrate,effectiveDesiredBitrate : trace.info.effectiveDesiredBitrate,availableBitrate : trace.info.availableBitrate}).catch(() => {});}});// Store the WebRtcTransport into the protoo Peer data Object.peer.data.transports.set(transport.id, transport);accept({id : transport.id,iceParameters : transport.iceParameters,iceCandidates : transport.iceCandidates,dtlsParameters : transport.dtlsParameters,sctpParameters : transport.sctpParameters});const { maxIncomingBitrate } = config.mediasoup.webRtcTransportOptions;// If set, apply max incoming bitrate limit.if (maxIncomingBitrate){try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); }catch (error) {}}break;}case 'connectWebRtcTransport':{const { transportId, dtlsParameters } = request.data;const transport = peer.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" not found`);await transport.connect({ dtlsParameters });accept();break;}case 'restartIce':{const { transportId } = request.data;const transport = peer.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" not found`);const iceParameters = await transport.restartIce();accept(iceParameters);break;}case 'produce':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const { transportId, kind, rtpParameters } = request.data;let { appData } = request.data;const transport = peer.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" not found`);// Add peerId into appData to later get the associated Peer during// the 'loudest' event of the audioLevelObserver.appData = { ...appData, peerId: peer.id };const producer = await transport.produce({kind,rtpParameters,appData// keyFrameRequestDelay: 5000});// Store the Producer into the protoo Peer data Object.peer.data.producers.set(producer.id, producer);// Set Producer events.producer.on('score', (score) =>{// logger.debug(// 'producer "score" event [producerId:%s, score:%o]',// producer.id, score);peer.notify('producerScore', { producerId: producer.id, score }).catch(() => {});});producer.on('videoorientationchange', (videoOrientation) =>{logger.debug('producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',producer.id, videoOrientation);});// NOTE: For testing.// await producer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);// await producer.enableTraceEvent([ 'pli', 'fir' ]);// await producer.enableTraceEvent([ 'keyframe' ]);producer.on('trace', (trace) =>{logger.debug('producer "trace" event [producerId:%s, trace.type:%s, trace:%o]',producer.id, trace.type, trace);});accept({ id: producer.id });// Optimization: Create a server-side Consumer for each Peer.for (const otherPeer of this._getJoinedPeers({ excludePeer: peer })){this._createConsumer({consumerPeer : otherPeer,producerPeer : peer,producer});}// Add into the audioLevelObserver.if (producer.kind === 'audio'){this._audioLevelObserver.addProducer({ producerId: producer.id }).catch(() => {});}break;}case 'closeProducer':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const { producerId } = request.data;const producer = peer.data.producers.get(producerId);if (!producer)throw new Error(`producer with id "${producerId}" not found`);producer.close();// Remove from its map.peer.data.producers.delete(producer.id);accept();break;}case 'pauseProducer':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const { producerId } = request.data;const producer = peer.data.producers.get(producerId);if (!producer)throw new Error(`producer with id "${producerId}" not found`);await producer.pause();accept();break;}case 'resumeProducer':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const { producerId } = request.data;const producer = peer.data.producers.get(producerId);if (!producer)throw new Error(`producer with id "${producerId}" not found`);await producer.resume();accept();break;}case 'pauseConsumer':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const { consumerId } = request.data;const consumer = peer.data.consumers.get(consumerId);if (!consumer)throw new Error(`consumer with id "${consumerId}" not found`);await consumer.pause();accept();break;}case 'resumeConsumer':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const { consumerId } = request.data;const consumer = peer.data.consumers.get(consumerId);if (!consumer)throw new Error(`consumer with id "${consumerId}" not found`);await consumer.resume();accept();break;}case 'setConsumerPreferredLayers':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const { consumerId, spatialLayer, temporalLayer } = request.data;const consumer = peer.data.consumers.get(consumerId);if (!consumer)throw new Error(`consumer with id "${consumerId}" not found`);await consumer.setPreferredLayers({ spatialLayer, temporalLayer });accept();break;}case 'setConsumerPriority':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const { consumerId, priority } = request.data;const consumer = peer.data.consumers.get(consumerId);if (!consumer)throw new Error(`consumer with id "${consumerId}" not found`);await consumer.setPriority(priority);accept();break;}case 'requestConsumerKeyFrame':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const { consumerId } = request.data;const consumer = peer.data.consumers.get(consumerId);if (!consumer)throw new Error(`consumer with id "${consumerId}" not found`);await consumer.requestKeyFrame();accept();break;}case 'produceData':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const {transportId,sctpStreamParameters,label,protocol,appData} = request.data;const transport = peer.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" not found`);const dataProducer = await transport.produceData({sctpStreamParameters,label,protocol,appData});// Store the Producer into the protoo Peer data Object.peer.data.dataProducers.set(dataProducer.id, dataProducer);accept({ id: dataProducer.id });switch (dataProducer.label){case 'chat':{// Create a server-side DataConsumer for each Peer.for (const otherPeer of this._getJoinedPeers({ excludePeer: peer })){this._createDataConsumer({dataConsumerPeer : otherPeer,dataProducerPeer : peer,dataProducer});}break;}case 'bot':{// Pass it to the bot.this._bot.handlePeerDataProducer({dataProducerId : dataProducer.id,peer});break;}}break;}case 'changeDisplayName':{// Ensure the Peer is joined.if (!peer.data.joined)throw new Error('Peer not yet joined');const { displayName } = request.data;const oldDisplayName = peer.data.displayName;// Store the display name into the custom data Object of the protoo// Peer.peer.data.displayName = displayName;// Notify other joined Peers.for (const otherPeer of this._getJoinedPeers({ excludePeer: peer })){otherPeer.notify('peerDisplayNameChanged',{peerId : peer.id,displayName,oldDisplayName}).catch(() => {});}accept();break;}case 'getTransportStats':{const { transportId } = request.data;const transport = peer.data.transports.get(transportId);if (!transport)throw new Error(`transport with id "${transportId}" not found`);const stats = await transport.getStats();accept(stats);break;}case 'getProducerStats':{const { producerId } = request.data;const producer = peer.data.producers.get(producerId);if (!producer)throw new Error(`producer with id "${producerId}" not found`);const stats = await producer.getStats();accept(stats);break;}case 'getConsumerStats':{const { consumerId } = request.data;const consumer = peer.data.consumers.get(consumerId);if (!consumer)throw new Error(`consumer with id "${consumerId}" not found`);const stats = await consumer.getStats();accept(stats);break;}case 'getDataProducerStats':{const { dataProducerId } = request.data;const dataProducer = peer.data.dataProducers.get(dataProducerId);if (!dataProducer)throw new Error(`dataProducer with id "${dataProducerId}" not found`);const stats = await dataProducer.getStats();accept(stats);break;}case 'getDataConsumerStats':{const { dataConsumerId } = request.data;const dataConsumer = peer.data.dataConsumers.get(dataConsumerId);if (!dataConsumer)throw new Error(`dataConsumer with id "${dataConsumerId}" not found`);const stats = await dataConsumer.getStats();accept(stats);break;}case 'applyNetworkThrottle':{const DefaultUplink = 1000000;const DefaultDownlink = 1000000;const DefaultRtt = 0;const { uplink, downlink, rtt, secret } = request.data;if (!secret || secret !== process.env.NETWORK_THROTTLE_SECRET){reject(403, 'operation NOT allowed, modda fuckaa');return;}try{await throttle.start({up : uplink || DefaultUplink,down : downlink || DefaultDownlink,rtt : rtt || DefaultRtt});logger.warn('network throttle set [uplink:%s, downlink:%s, rtt:%s]',uplink || DefaultUplink,downlink || DefaultDownlink,rtt || DefaultRtt);accept();}catch (error){logger.error('network throttle apply failed: %o', error);reject(500, error.toString());}break;}case 'resetNetworkThrottle':{const { secret } = request.data;if (!secret || secret !== process.env.NETWORK_THROTTLE_SECRET){reject(403, 'operation NOT allowed, modda fuckaa');return;}try{await throttle.stop({});logger.warn('network throttle stopped');accept();}catch (error){logger.error('network throttle stop failed: %o', error);reject(500, error.toString());}break;}default:{logger.error('unknown request.method "%s"', request.method);reject(500, `unknown request.method "${request.method}"`);}}}/*** Helper to get the list of joined protoo peers.*/_getJoinedPeers({ excludePeer = undefined } = {}){return this._protooRoom.peers.filter((peer) => peer.data.joined && peer !== excludePeer);}/*** Creates a mediasoup Consumer for the given mediasoup Producer.** @async*/async _createConsumer({ consumerPeer, producerPeer, producer }){// Optimization:// - Create the server-side Consumer in paused mode.// - Tell its Peer about it and wait for its response.// - Upon receipt of the response, resume the server-side Consumer.// - If video, this will mean a single key frame requested by the// server-side Consumer (when resuming it).// - If audio (or video), it will avoid that RTP packets are received by the// remote endpoint *before* the Consumer is locally created in the endpoint// (and before the local SDP O/A procedure ends). If that happens (RTP// packets are received before the SDP O/A is done) the PeerConnection may// fail to associate the RTP stream.// NOTE: Don't create the Consumer if the remote Peer cannot consume it.if (!consumerPeer.data.rtpCapabilities ||!this._mediasoupRouter.canConsume({producerId : producer.id,rtpCapabilities : consumerPeer.data.rtpCapabilities})){return;}// Must take the Transport the remote Peer is using for consuming.const transport = Array.from(consumerPeer.data.transports.values()).find((t) => t.appData.consuming);// This should not happen.if (!transport){logger.warn('_createConsumer() | Transport for consuming not found');return;}// Create the Consumer in paused mode.let consumer;try{consumer = await transport.consume({producerId : producer.id,rtpCapabilities : consumerPeer.data.rtpCapabilities,paused : true});}catch (error){logger.warn('_createConsumer() | transport.consume():%o', error);return;}// Store the Consumer into the protoo consumerPeer data Object.consumerPeer.data.consumers.set(consumer.id, consumer);// Set Consumer events.consumer.on('transportclose', () =>{// Remove from its map.consumerPeer.data.consumers.delete(consumer.id);});consumer.on('producerclose', () =>{// Remove from its map.consumerPeer.data.consumers.delete(consumer.id);consumerPeer.notify('consumerClosed', { consumerId: consumer.id }).catch(() => {});});consumer.on('producerpause', () =>{consumerPeer.notify('consumerPaused', { consumerId: consumer.id }).catch(() => {});});consumer.on('producerresume', () =>{consumerPeer.notify('consumerResumed', { consumerId: consumer.id }).catch(() => {});});consumer.on('score', (score) =>{// logger.debug(// 'consumer "score" event [consumerId:%s, score:%o]',// consumer.id, score);consumerPeer.notify('consumerScore', { consumerId: consumer.id, score }).catch(() => {});});consumer.on('layerschange', (layers) =>{consumerPeer.notify('consumerLayersChanged',{consumerId : consumer.id,spatialLayer : layers ? layers.spatialLayer : null,temporalLayer : layers ? layers.temporalLayer : null}).catch(() => {});});// NOTE: For testing.// await consumer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);// await consumer.enableTraceEvent([ 'pli', 'fir' ]);// await consumer.enableTraceEvent([ 'keyframe' ]);consumer.on('trace', (trace) =>{logger.debug('consumer "trace" event [producerId:%s, trace.type:%s, trace:%o]',consumer.id, trace.type, trace);});// Send a protoo request to the remote Peer with Consumer parameters.try{await consumerPeer.request('newConsumer',{peerId : producerPeer.id,producerId : producer.id,id : consumer.id,kind : consumer.kind,rtpParameters : consumer.rtpParameters,type : consumer.type,appData : producer.appData,producerPaused : consumer.producerPaused});// Now that we got the positive response from the remote endpoint, resume// the Consumer so the remote endpoint will receive the a first RTP packet// of this new stream once its PeerConnection is already ready to process// and associate it.await consumer.resume();consumerPeer.notify('consumerScore',{consumerId : consumer.id,score : consumer.score}).catch(() => {});}catch (error){logger.warn('_createConsumer() | failed:%o', error);}}/*** Creates a mediasoup DataConsumer for the given mediasoup DataProducer.** @async*/async _createDataConsumer({dataConsumerPeer,dataProducerPeer = null, // This is null for the bot DataProducer.dataProducer}){// NOTE: Don't create the DataConsumer if the remote Peer cannot consume it.if (!dataConsumerPeer.data.sctpCapabilities)return;// Must take the Transport the remote Peer is using for consuming.const transport = Array.from(dataConsumerPeer.data.transports.values()).find((t) => t.appData.consuming);// This should not happen.if (!transport){logger.warn('_createDataConsumer() | Transport for consuming not found');return;}// Create the DataConsumer.let dataConsumer;try{dataConsumer = await transport.consumeData({dataProducerId : dataProducer.id});}catch (error){logger.warn('_createDataConsumer() | transport.consumeData():%o', error);return;}// Store the DataConsumer into the protoo dataConsumerPeer data Object.dataConsumerPeer.data.dataConsumers.set(dataConsumer.id, dataConsumer);// Set DataConsumer events.dataConsumer.on('transportclose', () =>{// Remove from its map.dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);});dataConsumer.on('dataproducerclose', () =>{// Remove from its map.dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);dataConsumerPeer.notify('dataConsumerClosed', { dataConsumerId: dataConsumer.id }).catch(() => {});});// Send a protoo request to the remote Peer with Consumer parameters.try{await dataConsumerPeer.request('newDataConsumer',{// This is null for bot DataProducer.peerId : dataProducerPeer ? dataProducerPeer.id : null,dataProducerId : dataProducer.id,id : dataConsumer.id,sctpStreamParameters : dataConsumer.sctpStreamParameters,label : dataConsumer.label,protocol : dataConsumer.protocol,appData : dataProducer.appData});}catch (error){logger.warn('_createDataConsumer() | failed:%o', error);}}}module.exports = Room;
14-8 如何调试MediasoupDemo
本课程讲的是本机调试,也就是服务器和Chrome是在同一台PC上。

node --inspect-brk server.jschrome://inspect
14-9 运行时查看Mediasoup的核心信息

进行调试时,使用方法1.
查看线上运行状况,使用方法2.
方法1:
cd serverexport INTERACTIVE=1;node server.js
此时会进入命令行模式
看到帮助,可以知道,输入usage,可以查看进程占用CPU情况,
输入t后,会进入terminal界面, 再单击Ctrl+C则回到cmd命令界面。
方法2
先运行server.js
cd servernode server.js &
如果是后台运行,则新建会话,再次进入server,否则直接运行node connnect.js
cd servernode connect.js
此时也会像方法1一样进入到cmd命令行模式。
可以启动客户端,然后加入两个用户,然后再在cmd命令行输入dw、dr等命令查看信息。

