本章将讲解如何实现流媒体服务器的信令系统,以及信令与数据转发之间是如何进行配合的。一般信令系统都是整个系统的灵魂,知道了信令的运转就撑握了系统的脉落,这样就能很轻松的知道Mediasoup的运行逻辑了。
14-1 mediasoup-demo整体分析
aiortc: 使用asyncio 库实现的Webrtc 和ortc 库
14-2 JavaScript基本语法一
14-3 JavaScript基本语法二
https://www.yuque.com/caokunchao/pelvyg/dypyoo
14-4 JavaScriptES6高级特性
14-5 Promise与EventEmitter详解
14-6 剖析serverjs
const mediasoup = require(‘mediasoup’);
runMediasoupWorkers函数的mediasoup.createWorker函数,来自于mediasoup源码里面的index.js
下载mediasoup源码
git clone https://github.com/versatica/mediasoup
cd mediasoup
grep "createWorker" * -Rn
添加注释后的server.js
#!/usr/bin/env node
//进程名称
process.title = 'mediasoup-demo-server';
//设置DEBUG环境变量
process.env.DEBUG = process.env.DEBUG || '*INFO* *WARN* *ERROR*';
//引用config.js内容
const config = require('./config');
/* eslint-disable no-console */
console.log('process.env.DEBUG:', process.env.DEBUG);
console.log('config.js:\n%s', JSON.stringify(config, null, ' '));
/* eslint-enable no-console */
//引用需要的一些库
const fs = require('fs');
const https = require('https');
const url = require('url');
//提供websocket服务器
const protoo = require('protoo-server');
//mediasoup库,获取代码https://github.com/versatica/mediasoup
const mediasoup = require('mediasoup');
const express = require('express');
const bodyParser = require('body-parser');
const { AwaitQueue } = require('awaitqueue');
const Logger = require('./lib/Logger');
//房间管理
const Room = require('./lib/Room');
const interactiveServer = require('./lib/interactiveServer');
const interactiveClient = require('./lib/interactiveClient');
const logger = new Logger();
// Async queue to manage rooms.
// @type {AwaitQueue}
const queue = new AwaitQueue();
// Map of Room instances indexed by roomId.
// @type {Map<Number, Room>}
const rooms = new Map();
// HTTPS server.
// @type {https.Server}
let httpsServer;
// Express application.
// @type {Function}
let expressApp;
// Protoo WebSocket server.
// @type {protoo.WebSocketServer}
let protooWebSocketServer;
// mediasoup Workers.
// @type {Array<mediasoup.Worker>}
const mediasoupWorkers = [];
// Index of next mediasoup Worker to use.
// @type {Number}
let nextMediasoupWorkerIdx = 0;
run();
//核心函数,相当于main函数
async function run()
{
// Open the interactive server.
await interactiveServer();
// Open the interactive client.
if (process.env.INTERACTIVE === 'true' || process.env.INTERACTIVE === '1')
await interactiveClient();
// Run a mediasoup Worker.
await runMediasoupWorkers();
// Create Express app.
await createExpressApp();
// Run HTTPS server.
await runHttpsServer();
// Run a protoo WebSocketServer.
//引进protoo库,创建WebSocketServer
await runProtooWebSocketServer();
// Log rooms status every X seconds.
setInterval(() =>
{
for (const room of rooms.values())
{
room.logStatus();
}
}, 120000);
}
/**
* Launch as many mediasoup Workers as given in the configuration file.
*/
async function runMediasoupWorkers()
{
//获取config.js里面读取到的cpu核数
const { numWorkers } = config.mediasoup;
logger.info('running %d mediasoup Workers...', numWorkers);
//创建numWorkers个Worker工作进程
for (let i = 0; i < numWorkers; ++i)
{
const worker = await mediasoup.createWorker(
{
logLevel : config.mediasoup.workerSettings.logLevel,
logTags : config.mediasoup.workerSettings.logTags,
rtcMinPort : Number(config.mediasoup.workerSettings.rtcMinPort),
rtcMaxPort : Number(config.mediasoup.workerSettings.rtcMaxPort)
});
worker.on('died', () =>
{
logger.error(
'mediasoup Worker died, exiting in 2 seconds... [pid:%d]', worker.pid);
setTimeout(() => process.exit(1), 2000);
});
//每创建一个worker进程,都保存一下
mediasoupWorkers.push(worker);
// Log worker resource usage every X seconds.
setInterval(async () =>
{
const usage = await worker.getResourceUsage();
logger.info('mediasoup Worker resource usage [pid:%d]: %o', worker.pid, usage);
}, 120000);
}
}
/**
* Create an Express based API server to manage Broadcaster requests.
*/
async function createExpressApp()
{
logger.info('creating Express app...');
expressApp = express();
expressApp.use(bodyParser.json());
/**
* For every API request, verify that the roomId in the path matches and
* existing room.
*/
expressApp.param(
'roomId', (req, res, next, roomId) =>
{
// The room must exist for all API requests.
if (!rooms.has(roomId))
{
const error = new Error(`room with id "${roomId}" not found`);
error.status = 404;
throw error;
}
req.room = rooms.get(roomId);
next();
});
/**
* API GET resource that returns the mediasoup Router RTP capabilities of
* the room.
*/
expressApp.get(
'/rooms/:roomId', (req, res) =>
{
const data = req.room.getRouterRtpCapabilities();
res.status(200).json(data);
});
/**
* POST API to create a Broadcaster.
*/
expressApp.post(
'/rooms/:roomId/broadcasters', async (req, res, next) =>
{
const {
id,
displayName,
device,
rtpCapabilities
} = req.body;
try
{
const data = await req.room.createBroadcaster(
{
id,
displayName,
device,
rtpCapabilities
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* DELETE API to delete a Broadcaster.
*/
expressApp.delete(
'/rooms/:roomId/broadcasters/:broadcasterId', (req, res) =>
{
const { broadcasterId } = req.params;
req.room.deleteBroadcaster({ broadcasterId });
res.status(200).send('broadcaster deleted');
});
/**
* POST API to create a mediasoup Transport associated to a Broadcaster.
* It can be a PlainTransport or a WebRtcTransport depending on the
* type parameters in the body. There are also additional parameters for
* PlainTransport.
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports',
async (req, res, next) =>
{
const { broadcasterId } = req.params;
const { type, rtcpMux, comedia, sctpCapabilities } = req.body;
try
{
const data = await req.room.createBroadcasterTransport(
{
broadcasterId,
type,
rtcpMux,
comedia,
sctpCapabilities
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* POST API to connect a Transport belonging to a Broadcaster. Not needed
* for PlainTransport if it was created with comedia option set to true.
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/connect',
async (req, res, next) =>
{
const { broadcasterId, transportId } = req.params;
const { dtlsParameters } = req.body;
try
{
const data = await req.room.connectBroadcasterTransport(
{
broadcasterId,
transportId,
dtlsParameters
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* POST API to create a mediasoup Producer associated to a Broadcaster.
* The exact Transport in which the Producer must be created is signaled in
* the URL path. Body parameters include kind and rtpParameters of the
* Producer.
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/producers',
async (req, res, next) =>
{
const { broadcasterId, transportId } = req.params;
const { kind, rtpParameters } = req.body;
try
{
const data = await req.room.createBroadcasterProducer(
{
broadcasterId,
transportId,
kind,
rtpParameters
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* POST API to create a mediasoup Consumer associated to a Broadcaster.
* The exact Transport in which the Consumer must be created is signaled in
* the URL path. Query parameters must include the desired producerId to
* consume.
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/consume',
async (req, res, next) =>
{
const { broadcasterId, transportId } = req.params;
const { producerId } = req.query;
try
{
const data = await req.room.createBroadcasterConsumer(
{
broadcasterId,
transportId,
producerId
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* POST API to create a mediasoup DataConsumer associated to a Broadcaster.
* The exact Transport in which the DataConsumer must be created is signaled in
* the URL path. Query body must include the desired producerId to
* consume.
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/consume/data',
async (req, res, next) =>
{
const { broadcasterId, transportId } = req.params;
const { dataProducerId } = req.body;
try
{
const data = await req.room.createBroadcasterDataConsumer(
{
broadcasterId,
transportId,
dataProducerId
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* POST API to create a mediasoup DataProducer associated to a Broadcaster.
* The exact Transport in which the DataProducer must be created is signaled in
*/
expressApp.post(
'/rooms/:roomId/broadcasters/:broadcasterId/transports/:transportId/produce/data',
async (req, res, next) =>
{
const { broadcasterId, transportId } = req.params;
const { label, protocol, sctpStreamParameters, appData } = req.body;
try
{
const data = await req.room.createBroadcasterDataProducer(
{
broadcasterId,
transportId,
label,
protocol,
sctpStreamParameters,
appData
});
res.status(200).json(data);
}
catch (error)
{
next(error);
}
});
/**
* Error handler.
*/
expressApp.use(
(error, req, res, next) =>
{
if (error)
{
logger.warn('Express app %s', String(error));
error.status = error.status || (error.name === 'TypeError' ? 400 : 500);
res.statusMessage = error.message;
res.status(error.status).send(String(error));
}
else
{
next();
}
});
}
/**
* Create a Node.js HTTPS server. It listens in the IP and port given in the
* configuration file and reuses the Express application as request listener.
*/
async function runHttpsServer()
{
logger.info('running an HTTPS server...');
// HTTPS server for the protoo WebSocket server.
const tls =
{
cert : fs.readFileSync(config.https.tls.cert),
key : fs.readFileSync(config.https.tls.key)
};
httpsServer = https.createServer(tls, expressApp);
await new Promise((resolve) =>
{
httpsServer.listen(
Number(config.https.listenPort), config.https.listenIp, resolve);
});
}
/**
* Create a protoo WebSocketServer to allow WebSocket connections from browsers.
*/
async function runProtooWebSocketServer()
{
logger.info('running protoo WebSocketServer...');
// Create the protoo WebSocket server.
protooWebSocketServer = new protoo.WebSocketServer(httpsServer,
{
maxReceivedFrameSize : 960000, // 960 KBytes.
maxReceivedMessageSize : 960000,
fragmentOutgoingMessages : true,
fragmentationThreshold : 960000
});
// Handle connections from clients.
protooWebSocketServer.on('connectionrequest', (info, accept, reject) =>
{
// The client indicates the roomId and peerId in the URL query.
const u = url.parse(info.request.url, true);
const roomId = u.query['roomId'];
const peerId = u.query['peerId'];
if (!roomId || !peerId)
{
reject(400, 'Connection request without roomId and/or peerId');
return;
}
logger.info(
'protoo connection request [roomId:%s, peerId:%s, address:%s, origin:%s]',
roomId, peerId, info.socket.remoteAddress, info.origin);
// Serialize this code into the queue to avoid that two peers connecting at
// the same time with the same roomId create two separate rooms with same
// roomId.
//如果有很多用户进来,则需要排队,一个个去检查是否要创建或者获取他们要加入的房间
queue.push(async () =>
{
const room = await getOrCreateRoom({ roomId });
// Accept the protoo WebSocket connection.
const protooWebSocketTransport = accept();
room.handleProtooConnection({ peerId, protooWebSocketTransport });
})
.catch((error) =>
{
logger.error('room creation or room joining failed:%o', error);
reject(error);
});
});
}
/**
* Get next mediasoup Worker.
*/
function getMediasoupWorker()
{
const worker = mediasoupWorkers[nextMediasoupWorkerIdx];
if (++nextMediasoupWorkerIdx === mediasoupWorkers.length)
nextMediasoupWorkerIdx = 0;
return worker;
}
/**
* Get a Room instance (or create one if it does not exist).
*/
async function getOrCreateRoom({ roomId })
{
let room = rooms.get(roomId);
// If the Room does not exist create a new one.
if (!room)
{
logger.info('creating a new Room [roomId:%s]', roomId);
const mediasoupWorker = getMediasoupWorker();
room = await Room.create({ mediasoupWorker, roomId });
rooms.set(roomId, room);
room.on('close', () => rooms.delete(roomId));
}
return room;
}
14-7 剖析roomjs
这些请求,在 Room.js里面的 _handleProtooRequest(peer, request, accept, reject) 函数中处理。
server/lib/Room.js
const EventEmitter = require('events').EventEmitter;
const protoo = require('protoo-server');
const throttle = require('@sitespeed.io/throttle');
const Logger = require('./Logger');
const config = require('../config');
const Bot = require('./Bot');
const logger = new Logger('Room');
/**
* Room class.
*
* This is not a "mediasoup Room" by itself, by a custom class that holds
* a protoo Room (for signaling with WebSocket clients) and a mediasoup Router
* (for sending and receiving media to/from those WebSocket peers).
*/
class Room extends EventEmitter
{
/**
* Factory function that creates and returns Room instance.
*
* @async
*
* @param {mediasoup.Worker} mediasoupWorker - The mediasoup Worker in which a new
* mediasoup Router must be created.
* @param {String} roomId - Id of the Room instance.
*/
static async create({ mediasoupWorker, roomId })
{
logger.info('create() [roomId:%s]', roomId);
// Create a protoo Room instance.
const protooRoom = new protoo.Room();
// Router media codecs.
//获取config.js里面的routerOptions选项设置
const { mediaCodecs } = config.mediasoup.routerOptions;
// Create a mediasoup Router.
const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });
// Create a mediasoup AudioLevelObserver.
const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver(
{
maxEntries : 1,
threshold : -80,
interval : 800
});
const bot = await Bot.create({ mediasoupRouter });
//传入构造函数参数
return new Room(
{
roomId,
protooRoom,
mediasoupRouter,
audioLevelObserver,
bot
});
}
constructor({ roomId, protooRoom, mediasoupRouter, audioLevelObserver, bot })
{
super();
this.setMaxListeners(Infinity);
// Room id.
// @type {String}
this._roomId = roomId;
// Closed flag.
// @type {Boolean}
this._closed = false;
// protoo Room instance.
// @type {protoo.Room}
this._protooRoom = protooRoom;
// Map of broadcasters indexed by id. Each Object has:
// - {String} id
// - {Object} data
// - {String} displayName
// - {Object} device
// - {RTCRtpCapabilities} rtpCapabilities
// - {Map<String, mediasoup.Transport>} transports
// - {Map<String, mediasoup.Producer>} producers
// - {Map<String, mediasoup.Consumers>} consumers
// - {Map<String, mediasoup.DataProducer>} dataProducers
// - {Map<String, mediasoup.DataConsumers>} dataConsumers
// @type {Map<String, Object>}
this._broadcasters = new Map();
// mediasoup Router instance.
// @type {mediasoup.Router}
this._mediasoupRouter = mediasoupRouter;
// mediasoup AudioLevelObserver.
// @type {mediasoup.AudioLevelObserver}
this._audioLevelObserver = audioLevelObserver;
// DataChannel bot.
// @type {Bot}
this._bot = bot;
// Network throttled.
// @type {Boolean}
this._networkThrottled = false;
// Handle audioLevelObserver.
this._handleAudioLevelObserver();
// For debugging.
global.audioLevelObserver = this._audioLevelObserver;
global.bot = this._bot;
}
/**
* Closes the Room instance by closing the protoo Room and the mediasoup Router.
*/
close()
{
logger.debug('close()');
this._closed = true;
// Close the protoo Room.
this._protooRoom.close();
// Close the mediasoup Router.
this._mediasoupRouter.close();
// Close the Bot.
this._bot.close();
// Emit 'close' event.
this.emit('close');
// Stop network throttling.
if (this._networkThrottled)
{
throttle.stop({})
.catch(() => {});
}
}
logStatus()
{
logger.info(
'logStatus() [roomId:%s, protoo Peers:%s, mediasoup Transports:%s]',
this._roomId,
this._protooRoom.peers.length,
this._mediasoupRouter._transports.size); // NOTE: Private API.
}
/**
* Called from server.js upon a protoo WebSocket connection request from a
* browser.
*
* @param {String} peerId - The id of the protoo peer to be created.
* @param {Boolean} consume - Whether this peer wants to consume from others.
* @param {protoo.WebSocketTransport} protooWebSocketTransport - The associated
* protoo WebSocket transport.
*/
handleProtooConnection({ peerId, consume, protooWebSocketTransport })
{
const existingPeer = this._protooRoom.getPeer(peerId);
if (existingPeer)
{
logger.warn(
'handleProtooConnection() | there is already a protoo Peer with same peerId, closing it [peerId:%s]',
peerId);
existingPeer.close();
}
let peer;
// Create a new protoo Peer with the given peerId.
try
{
peer = this._protooRoom.createPeer(peerId, protooWebSocketTransport);
}
catch (error)
{
logger.error('protooRoom.createPeer() failed:%o', error);
}
// Use the peer.data object to store mediasoup related objects.
// Not joined after a custom protoo 'join' request is later received.
peer.data.consume = consume;
peer.data.joined = false;
peer.data.displayName = undefined;
peer.data.device = undefined;
peer.data.rtpCapabilities = undefined;
peer.data.sctpCapabilities = undefined;
// Have mediasoup related maps ready even before the Peer joins since we
// allow creating Transports before joining.
peer.data.transports = new Map();
peer.data.producers = new Map();
peer.data.consumers = new Map();
peer.data.dataProducers = new Map();
peer.data.dataConsumers = new Map();
peer.on('request', (request, accept, reject) =>
{
logger.debug(
'protoo Peer "request" event [method:%s, peerId:%s]',
request.method, peer.id);
this._handleProtooRequest(peer, request, accept, reject)
.catch((error) =>
{
logger.error('request failed:%o', error);
reject(error);
});
});
peer.on('close', () =>
{
if (this._closed)
return;
logger.debug('protoo Peer "close" event [peerId:%s]', peer.id);
// If the Peer was joined, notify all Peers.
if (peer.data.joined)
{
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
otherPeer.notify('peerClosed', { peerId: peer.id })
.catch(() => {});
}
}
// Iterate and close all mediasoup Transport associated to this Peer, so all
// its Producers and Consumers will also be closed.
for (const transport of peer.data.transports.values())
{
transport.close();
}
// If this is the latest Peer in the room, close the room.
if (this._protooRoom.peers.length === 0)
{
logger.info(
'last Peer in the room left, closing the room [roomId:%s]',
this._roomId);
this.close();
}
});
}
getRouterRtpCapabilities()
{
return this._mediasoupRouter.rtpCapabilities;
}
/**
* Create a Broadcaster. This is for HTTP API requests (see server.js).
*
* @async
*
* @type {String} id - Broadcaster id.
* @type {String} displayName - Descriptive name.
* @type {Object} [device] - Additional info with name, version and flags fields.
* @type {RTCRtpCapabilities} [rtpCapabilities] - Device RTP capabilities.
*/
async createBroadcaster({ id, displayName, device = {}, rtpCapabilities })
{
if (typeof id !== 'string' || !id)
throw new TypeError('missing body.id');
else if (typeof displayName !== 'string' || !displayName)
throw new TypeError('missing body.displayName');
else if (typeof device.name !== 'string' || !device.name)
throw new TypeError('missing body.device.name');
else if (rtpCapabilities && typeof rtpCapabilities !== 'object')
throw new TypeError('wrong body.rtpCapabilities');
if (this._broadcasters.has(id))
throw new Error(`broadcaster with id "${id}" already exists`);
const broadcaster =
{
id,
data :
{
displayName,
device :
{
flag : 'broadcaster',
name : device.name || 'Unknown device',
version : device.version
},
rtpCapabilities,
transports : new Map(),
producers : new Map(),
consumers : new Map(),
dataProducers : new Map(),
dataConsumers : new Map()
}
};
// Store the Broadcaster into the map.
this._broadcasters.set(broadcaster.id, broadcaster);
// Notify the new Broadcaster to all Peers.
for (const otherPeer of this._getJoinedPeers())
{
otherPeer.notify(
'newPeer',
{
id : broadcaster.id,
displayName : broadcaster.data.displayName,
device : broadcaster.data.device
})
.catch(() => {});
}
// Reply with the list of Peers and their Producers.
const peerInfos = [];
const joinedPeers = this._getJoinedPeers();
// Just fill the list of Peers if the Broadcaster provided its rtpCapabilities.
if (rtpCapabilities)
{
for (const joinedPeer of joinedPeers)
{
const peerInfo =
{
id : joinedPeer.id,
displayName : joinedPeer.data.displayName,
device : joinedPeer.data.device,
producers : []
};
for (const producer of joinedPeer.data.producers.values())
{
// Ignore Producers that the Broadcaster cannot consume.
if (
!this._mediasoupRouter.canConsume(
{
producerId : producer.id,
rtpCapabilities
})
)
{
continue;
}
peerInfo.producers.push(
{
id : producer.id,
kind : producer.kind
});
}
peerInfos.push(peerInfo);
}
}
return { peers: peerInfos };
}
/**
* Delete a Broadcaster.
*
* @type {String} broadcasterId
*/
deleteBroadcaster({ broadcasterId })
{
const broadcaster = this._broadcasters.get(broadcasterId);
if (!broadcaster)
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
for (const transport of broadcaster.data.transports.values())
{
transport.close();
}
this._broadcasters.delete(broadcasterId);
for (const peer of this._getJoinedPeers())
{
peer.notify('peerClosed', { peerId: broadcasterId })
.catch(() => {});
}
}
/**
* Create a mediasoup Transport associated to a Broadcaster. It can be a
* PlainTransport or a WebRtcTransport.
*
* @async
*
* @type {String} broadcasterId
* @type {String} type - Can be 'plain' (PlainTransport) or 'webrtc'
* (WebRtcTransport).
* @type {Boolean} [rtcpMux=false] - Just for PlainTransport, use RTCP mux.
* @type {Boolean} [comedia=true] - Just for PlainTransport, enable remote IP:port
* autodetection.
* @type {Object} [sctpCapabilities] - SCTP capabilities
*/
async createBroadcasterTransport(
{
broadcasterId,
type,
rtcpMux = false,
comedia = true,
sctpCapabilities
})
{
const broadcaster = this._broadcasters.get(broadcasterId);
if (!broadcaster)
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
switch (type)
{
case 'webrtc':
{
const webRtcTransportOptions =
{
...config.mediasoup.webRtcTransportOptions,
enableSctp : Boolean(sctpCapabilities),
numSctpStreams : (sctpCapabilities || {}).numStreams
};
const transport = await this._mediasoupRouter.createWebRtcTransport(
webRtcTransportOptions);
// Store it.
broadcaster.data.transports.set(transport.id, transport);
return {
id : transport.id,
iceParameters : transport.iceParameters,
iceCandidates : transport.iceCandidates,
dtlsParameters : transport.dtlsParameters,
sctpParameters : transport.sctpParameters
};
}
case 'plain':
{
const plainTransportOptions =
{
...config.mediasoup.plainTransportOptions,
rtcpMux : rtcpMux,
comedia : comedia
};
const transport = await this._mediasoupRouter.createPlainTransport(
plainTransportOptions);
// Store it.
broadcaster.data.transports.set(transport.id, transport);
return {
id : transport.id,
ip : transport.tuple.localIp,
port : transport.tuple.localPort,
rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined
};
}
default:
{
throw new TypeError('invalid type');
}
}
}
/**
* Connect a Broadcaster mediasoup WebRtcTransport.
*
* @async
*
* @type {String} broadcasterId
* @type {String} transportId
* @type {RTCDtlsParameters} dtlsParameters - Remote DTLS parameters.
*/
async connectBroadcasterTransport(
{
broadcasterId,
transportId,
dtlsParameters
}
)
{
const broadcaster = this._broadcasters.get(broadcasterId);
if (!broadcaster)
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
const transport = broadcaster.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" does not exist`);
if (transport.constructor.name !== 'WebRtcTransport')
{
throw new Error(
`transport with id "${transportId}" is not a WebRtcTransport`);
}
await transport.connect({ dtlsParameters });
}
/**
* Create a mediasoup Producer associated to a Broadcaster.
*
* @async
*
* @type {String} broadcasterId
* @type {String} transportId
* @type {String} kind - 'audio' or 'video' kind for the Producer.
* @type {RTCRtpParameters} rtpParameters - RTP parameters for the Producer.
*/
async createBroadcasterProducer(
{
broadcasterId,
transportId,
kind,
rtpParameters
}
)
{
const broadcaster = this._broadcasters.get(broadcasterId);
if (!broadcaster)
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
const transport = broadcaster.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" does not exist`);
const producer =
await transport.produce({ kind, rtpParameters });
// Store it.
broadcaster.data.producers.set(producer.id, producer);
// Set Producer events.
// producer.on('score', (score) =>
// {
// logger.debug(
// 'broadcaster producer "score" event [producerId:%s, score:%o]',
// producer.id, score);
// });
producer.on('videoorientationchange', (videoOrientation) =>
{
logger.debug(
'broadcaster producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
producer.id, videoOrientation);
});
// Optimization: Create a server-side Consumer for each Peer.
for (const peer of this._getJoinedPeers())
{
this._createConsumer(
{
consumerPeer : peer,
producerPeer : broadcaster,
producer
});
}
// Add into the audioLevelObserver.
if (producer.kind === 'audio')
{
this._audioLevelObserver.addProducer({ producerId: producer.id })
.catch(() => {});
}
return { id: producer.id };
}
/**
* Create a mediasoup Consumer associated to a Broadcaster.
*
* @async
*
* @type {String} broadcasterId
* @type {String} transportId
* @type {String} producerId
*/
async createBroadcasterConsumer(
{
broadcasterId,
transportId,
producerId
}
)
{
const broadcaster = this._broadcasters.get(broadcasterId);
if (!broadcaster)
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
if (!broadcaster.data.rtpCapabilities)
throw new Error('broadcaster does not have rtpCapabilities');
const transport = broadcaster.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" does not exist`);
const consumer = await transport.consume(
{
producerId,
rtpCapabilities : broadcaster.data.rtpCapabilities
});
// Store it.
broadcaster.data.consumers.set(consumer.id, consumer);
// Set Consumer events.
consumer.on('transportclose', () =>
{
// Remove from its map.
broadcaster.data.consumers.delete(consumer.id);
});
consumer.on('producerclose', () =>
{
// Remove from its map.
broadcaster.data.consumers.delete(consumer.id);
});
return {
id : consumer.id,
producerId,
kind : consumer.kind,
rtpParameters : consumer.rtpParameters,
type : consumer.type
};
}
/**
* Create a mediasoup DataConsumer associated to a Broadcaster.
*
* @async
*
* @type {String} broadcasterId
* @type {String} transportId
* @type {String} dataProducerId
*/
async createBroadcasterDataConsumer(
{
broadcasterId,
transportId,
dataProducerId
}
)
{
const broadcaster = this._broadcasters.get(broadcasterId);
if (!broadcaster)
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
if (!broadcaster.data.rtpCapabilities)
throw new Error('broadcaster does not have rtpCapabilities');
const transport = broadcaster.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" does not exist`);
const dataConsumer = await transport.consumeData(
{
dataProducerId
});
// Store it.
broadcaster.data.dataConsumers.set(dataConsumer.id, dataConsumer);
// Set Consumer events.
dataConsumer.on('transportclose', () =>
{
// Remove from its map.
broadcaster.data.dataConsumers.delete(dataConsumer.id);
});
dataConsumer.on('dataproducerclose', () =>
{
// Remove from its map.
broadcaster.data.dataConsumers.delete(dataConsumer.id);
});
return {
id : dataConsumer.id
};
}
/**
* Create a mediasoup DataProducer associated to a Broadcaster.
*
* @async
*
* @type {String} broadcasterId
* @type {String} transportId
*/
async createBroadcasterDataProducer(
{
broadcasterId,
transportId,
label,
protocol,
sctpStreamParameters,
appData
}
)
{
const broadcaster = this._broadcasters.get(broadcasterId);
if (!broadcaster)
throw new Error(`broadcaster with id "${broadcasterId}" does not exist`);
// if (!broadcaster.data.sctpCapabilities)
// throw new Error('broadcaster does not have sctpCapabilities');
const transport = broadcaster.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" does not exist`);
const dataProducer = await transport.produceData(
{
sctpStreamParameters,
label,
protocol,
appData
});
// Store it.
broadcaster.data.dataProducers.set(dataProducer.id, dataProducer);
// Set Consumer events.
dataProducer.on('transportclose', () =>
{
// Remove from its map.
broadcaster.data.dataProducers.delete(dataProducer.id);
});
// // Optimization: Create a server-side Consumer for each Peer.
// for (const peer of this._getJoinedPeers())
// {
// this._createDataConsumer(
// {
// dataConsumerPeer : peer,
// dataProducerPeer : broadcaster,
// dataProducer: dataProducer
// });
// }
return {
id : dataProducer.id
};
}
_handleAudioLevelObserver()
{
this._audioLevelObserver.on('volumes', (volumes) =>
{
const { producer, volume } = volumes[0];
// logger.debug(
// 'audioLevelObserver "volumes" event [producerId:%s, volume:%s]',
// producer.id, volume);
// Notify all Peers.
for (const peer of this._getJoinedPeers())
{
peer.notify(
'activeSpeaker',
{
peerId : producer.appData.peerId,
volume : volume
})
.catch(() => {});
}
});
this._audioLevelObserver.on('silence', () =>
{
// logger.debug('audioLevelObserver "silence" event');
// Notify all Peers.
for (const peer of this._getJoinedPeers())
{
peer.notify('activeSpeaker', { peerId: null })
.catch(() => {});
}
});
}
/**
* Handle protoo requests from browsers.
*
* @async
*/
async _handleProtooRequest(peer, request, accept, reject)
{
switch (request.method)
{
case 'getRouterRtpCapabilities':
{
accept(this._mediasoupRouter.rtpCapabilities);
break;
}
case 'join':
{
// Ensure the Peer is not already joined.
if (peer.data.joined)
throw new Error('Peer already joined');
const {
displayName,
device,
rtpCapabilities,
sctpCapabilities
} = request.data;
// Store client data into the protoo Peer data object.
peer.data.joined = true;
peer.data.displayName = displayName;
peer.data.device = device;
peer.data.rtpCapabilities = rtpCapabilities;
peer.data.sctpCapabilities = sctpCapabilities;
// Tell the new Peer about already joined Peers.
// And also create Consumers for existing Producers.
const joinedPeers =
[
...this._getJoinedPeers(),
...this._broadcasters.values()
];
// Reply now the request with the list of joined peers (all but the new one).
const peerInfos = joinedPeers
.filter((joinedPeer) => joinedPeer.id !== peer.id)
.map((joinedPeer) => ({
id : joinedPeer.id,
displayName : joinedPeer.data.displayName,
device : joinedPeer.data.device
}));
accept({ peers: peerInfos });
// Mark the new Peer as joined.
peer.data.joined = true;
for (const joinedPeer of joinedPeers)
{
// Create Consumers for existing Producers.
for (const producer of joinedPeer.data.producers.values())
{
this._createConsumer(
{
consumerPeer : peer,
producerPeer : joinedPeer,
producer
});
}
// Create DataConsumers for existing DataProducers.
for (const dataProducer of joinedPeer.data.dataProducers.values())
{
if (dataProducer.label === 'bot')
continue;
this._createDataConsumer(
{
dataConsumerPeer : peer,
dataProducerPeer : joinedPeer,
dataProducer
});
}
}
// Create DataConsumers for bot DataProducer.
this._createDataConsumer(
{
dataConsumerPeer : peer,
dataProducerPeer : null,
dataProducer : this._bot.dataProducer
});
// Notify the new Peer to all other Peers.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
otherPeer.notify(
'newPeer',
{
id : peer.id,
displayName : peer.data.displayName,
device : peer.data.device
})
.catch(() => {});
}
break;
}
case 'createWebRtcTransport':
{
// NOTE: Don't require that the Peer is joined here, so the client can
// initiate mediasoup Transports and be ready when he later joins.
const {
forceTcp,
producing,
consuming,
sctpCapabilities
} = request.data;
const webRtcTransportOptions =
{
...config.mediasoup.webRtcTransportOptions,
enableSctp : Boolean(sctpCapabilities),
numSctpStreams : (sctpCapabilities || {}).numStreams,
appData : { producing, consuming }
};
if (forceTcp)
{
webRtcTransportOptions.enableUdp = false;
webRtcTransportOptions.enableTcp = true;
}
const transport = await this._mediasoupRouter.createWebRtcTransport(
webRtcTransportOptions);
transport.on('sctpstatechange', (sctpState) =>
{
logger.debug('WebRtcTransport "sctpstatechange" event [sctpState:%s]', sctpState);
});
transport.on('dtlsstatechange', (dtlsState) =>
{
if (dtlsState === 'failed' || dtlsState === 'closed')
logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState);
});
// NOTE: For testing.
// await transport.enableTraceEvent([ 'probation', 'bwe' ]);
await transport.enableTraceEvent([ 'bwe' ]);
transport.on('trace', (trace) =>
{
logger.debug(
'transport "trace" event [transportId:%s, trace.type:%s, trace:%o]',
transport.id, trace.type, trace);
if (trace.type === 'bwe' && trace.direction === 'out')
{
peer.notify(
'downlinkBwe',
{
desiredBitrate : trace.info.desiredBitrate,
effectiveDesiredBitrate : trace.info.effectiveDesiredBitrate,
availableBitrate : trace.info.availableBitrate
})
.catch(() => {});
}
});
// Store the WebRtcTransport into the protoo Peer data Object.
peer.data.transports.set(transport.id, transport);
accept(
{
id : transport.id,
iceParameters : transport.iceParameters,
iceCandidates : transport.iceCandidates,
dtlsParameters : transport.dtlsParameters,
sctpParameters : transport.sctpParameters
});
const { maxIncomingBitrate } = config.mediasoup.webRtcTransportOptions;
// If set, apply max incoming bitrate limit.
if (maxIncomingBitrate)
{
try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); }
catch (error) {}
}
break;
}
case 'connectWebRtcTransport':
{
const { transportId, dtlsParameters } = request.data;
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
await transport.connect({ dtlsParameters });
accept();
break;
}
case 'restartIce':
{
const { transportId } = request.data;
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
const iceParameters = await transport.restartIce();
accept(iceParameters);
break;
}
case 'produce':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { transportId, kind, rtpParameters } = request.data;
let { appData } = request.data;
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
// Add peerId into appData to later get the associated Peer during
// the 'loudest' event of the audioLevelObserver.
appData = { ...appData, peerId: peer.id };
const producer = await transport.produce(
{
kind,
rtpParameters,
appData
// keyFrameRequestDelay: 5000
});
// Store the Producer into the protoo Peer data Object.
peer.data.producers.set(producer.id, producer);
// Set Producer events.
producer.on('score', (score) =>
{
// logger.debug(
// 'producer "score" event [producerId:%s, score:%o]',
// producer.id, score);
peer.notify('producerScore', { producerId: producer.id, score })
.catch(() => {});
});
producer.on('videoorientationchange', (videoOrientation) =>
{
logger.debug(
'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
producer.id, videoOrientation);
});
// NOTE: For testing.
// await producer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
// await producer.enableTraceEvent([ 'pli', 'fir' ]);
// await producer.enableTraceEvent([ 'keyframe' ]);
producer.on('trace', (trace) =>
{
logger.debug(
'producer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
producer.id, trace.type, trace);
});
accept({ id: producer.id });
// Optimization: Create a server-side Consumer for each Peer.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
this._createConsumer(
{
consumerPeer : otherPeer,
producerPeer : peer,
producer
});
}
// Add into the audioLevelObserver.
if (producer.kind === 'audio')
{
this._audioLevelObserver.addProducer({ producerId: producer.id })
.catch(() => {});
}
break;
}
case 'closeProducer':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { producerId } = request.data;
const producer = peer.data.producers.get(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
producer.close();
// Remove from its map.
peer.data.producers.delete(producer.id);
accept();
break;
}
case 'pauseProducer':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { producerId } = request.data;
const producer = peer.data.producers.get(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
await producer.pause();
accept();
break;
}
case 'resumeProducer':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { producerId } = request.data;
const producer = peer.data.producers.get(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
await producer.resume();
accept();
break;
}
case 'pauseConsumer':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
await consumer.pause();
accept();
break;
}
case 'resumeConsumer':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
await consumer.resume();
accept();
break;
}
case 'setConsumerPreferredLayers':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { consumerId, spatialLayer, temporalLayer } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
await consumer.setPreferredLayers({ spatialLayer, temporalLayer });
accept();
break;
}
case 'setConsumerPriority':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { consumerId, priority } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
await consumer.setPriority(priority);
accept();
break;
}
case 'requestConsumerKeyFrame':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
await consumer.requestKeyFrame();
accept();
break;
}
case 'produceData':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const {
transportId,
sctpStreamParameters,
label,
protocol,
appData
} = request.data;
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
const dataProducer = await transport.produceData(
{
sctpStreamParameters,
label,
protocol,
appData
});
// Store the Producer into the protoo Peer data Object.
peer.data.dataProducers.set(dataProducer.id, dataProducer);
accept({ id: dataProducer.id });
switch (dataProducer.label)
{
case 'chat':
{
// Create a server-side DataConsumer for each Peer.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
this._createDataConsumer(
{
dataConsumerPeer : otherPeer,
dataProducerPeer : peer,
dataProducer
});
}
break;
}
case 'bot':
{
// Pass it to the bot.
this._bot.handlePeerDataProducer(
{
dataProducerId : dataProducer.id,
peer
});
break;
}
}
break;
}
case 'changeDisplayName':
{
// Ensure the Peer is joined.
if (!peer.data.joined)
throw new Error('Peer not yet joined');
const { displayName } = request.data;
const oldDisplayName = peer.data.displayName;
// Store the display name into the custom data Object of the protoo
// Peer.
peer.data.displayName = displayName;
// Notify other joined Peers.
for (const otherPeer of this._getJoinedPeers({ excludePeer: peer }))
{
otherPeer.notify(
'peerDisplayNameChanged',
{
peerId : peer.id,
displayName,
oldDisplayName
})
.catch(() => {});
}
accept();
break;
}
case 'getTransportStats':
{
const { transportId } = request.data;
const transport = peer.data.transports.get(transportId);
if (!transport)
throw new Error(`transport with id "${transportId}" not found`);
const stats = await transport.getStats();
accept(stats);
break;
}
case 'getProducerStats':
{
const { producerId } = request.data;
const producer = peer.data.producers.get(producerId);
if (!producer)
throw new Error(`producer with id "${producerId}" not found`);
const stats = await producer.getStats();
accept(stats);
break;
}
case 'getConsumerStats':
{
const { consumerId } = request.data;
const consumer = peer.data.consumers.get(consumerId);
if (!consumer)
throw new Error(`consumer with id "${consumerId}" not found`);
const stats = await consumer.getStats();
accept(stats);
break;
}
case 'getDataProducerStats':
{
const { dataProducerId } = request.data;
const dataProducer = peer.data.dataProducers.get(dataProducerId);
if (!dataProducer)
throw new Error(`dataProducer with id "${dataProducerId}" not found`);
const stats = await dataProducer.getStats();
accept(stats);
break;
}
case 'getDataConsumerStats':
{
const { dataConsumerId } = request.data;
const dataConsumer = peer.data.dataConsumers.get(dataConsumerId);
if (!dataConsumer)
throw new Error(`dataConsumer with id "${dataConsumerId}" not found`);
const stats = await dataConsumer.getStats();
accept(stats);
break;
}
case 'applyNetworkThrottle':
{
const DefaultUplink = 1000000;
const DefaultDownlink = 1000000;
const DefaultRtt = 0;
const { uplink, downlink, rtt, secret } = request.data;
if (!secret || secret !== process.env.NETWORK_THROTTLE_SECRET)
{
reject(403, 'operation NOT allowed, modda fuckaa');
return;
}
try
{
await throttle.start(
{
up : uplink || DefaultUplink,
down : downlink || DefaultDownlink,
rtt : rtt || DefaultRtt
});
logger.warn(
'network throttle set [uplink:%s, downlink:%s, rtt:%s]',
uplink || DefaultUplink,
downlink || DefaultDownlink,
rtt || DefaultRtt);
accept();
}
catch (error)
{
logger.error('network throttle apply failed: %o', error);
reject(500, error.toString());
}
break;
}
case 'resetNetworkThrottle':
{
const { secret } = request.data;
if (!secret || secret !== process.env.NETWORK_THROTTLE_SECRET)
{
reject(403, 'operation NOT allowed, modda fuckaa');
return;
}
try
{
await throttle.stop({});
logger.warn('network throttle stopped');
accept();
}
catch (error)
{
logger.error('network throttle stop failed: %o', error);
reject(500, error.toString());
}
break;
}
default:
{
logger.error('unknown request.method "%s"', request.method);
reject(500, `unknown request.method "${request.method}"`);
}
}
}
/**
* Helper to get the list of joined protoo peers.
*/
_getJoinedPeers({ excludePeer = undefined } = {})
{
return this._protooRoom.peers
.filter((peer) => peer.data.joined && peer !== excludePeer);
}
/**
* Creates a mediasoup Consumer for the given mediasoup Producer.
*
* @async
*/
async _createConsumer({ consumerPeer, producerPeer, producer })
{
// Optimization:
// - Create the server-side Consumer in paused mode.
// - Tell its Peer about it and wait for its response.
// - Upon receipt of the response, resume the server-side Consumer.
// - If video, this will mean a single key frame requested by the
// server-side Consumer (when resuming it).
// - If audio (or video), it will avoid that RTP packets are received by the
// remote endpoint *before* the Consumer is locally created in the endpoint
// (and before the local SDP O/A procedure ends). If that happens (RTP
// packets are received before the SDP O/A is done) the PeerConnection may
// fail to associate the RTP stream.
// NOTE: Don't create the Consumer if the remote Peer cannot consume it.
if (
!consumerPeer.data.rtpCapabilities ||
!this._mediasoupRouter.canConsume(
{
producerId : producer.id,
rtpCapabilities : consumerPeer.data.rtpCapabilities
})
)
{
return;
}
// Must take the Transport the remote Peer is using for consuming.
const transport = Array.from(consumerPeer.data.transports.values())
.find((t) => t.appData.consuming);
// This should not happen.
if (!transport)
{
logger.warn('_createConsumer() | Transport for consuming not found');
return;
}
// Create the Consumer in paused mode.
let consumer;
try
{
consumer = await transport.consume(
{
producerId : producer.id,
rtpCapabilities : consumerPeer.data.rtpCapabilities,
paused : true
});
}
catch (error)
{
logger.warn('_createConsumer() | transport.consume():%o', error);
return;
}
// Store the Consumer into the protoo consumerPeer data Object.
consumerPeer.data.consumers.set(consumer.id, consumer);
// Set Consumer events.
consumer.on('transportclose', () =>
{
// Remove from its map.
consumerPeer.data.consumers.delete(consumer.id);
});
consumer.on('producerclose', () =>
{
// Remove from its map.
consumerPeer.data.consumers.delete(consumer.id);
consumerPeer.notify('consumerClosed', { consumerId: consumer.id })
.catch(() => {});
});
consumer.on('producerpause', () =>
{
consumerPeer.notify('consumerPaused', { consumerId: consumer.id })
.catch(() => {});
});
consumer.on('producerresume', () =>
{
consumerPeer.notify('consumerResumed', { consumerId: consumer.id })
.catch(() => {});
});
consumer.on('score', (score) =>
{
// logger.debug(
// 'consumer "score" event [consumerId:%s, score:%o]',
// consumer.id, score);
consumerPeer.notify('consumerScore', { consumerId: consumer.id, score })
.catch(() => {});
});
consumer.on('layerschange', (layers) =>
{
consumerPeer.notify(
'consumerLayersChanged',
{
consumerId : consumer.id,
spatialLayer : layers ? layers.spatialLayer : null,
temporalLayer : layers ? layers.temporalLayer : null
})
.catch(() => {});
});
// NOTE: For testing.
// await consumer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
// await consumer.enableTraceEvent([ 'pli', 'fir' ]);
// await consumer.enableTraceEvent([ 'keyframe' ]);
consumer.on('trace', (trace) =>
{
logger.debug(
'consumer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
consumer.id, trace.type, trace);
});
// Send a protoo request to the remote Peer with Consumer parameters.
try
{
await consumerPeer.request(
'newConsumer',
{
peerId : producerPeer.id,
producerId : producer.id,
id : consumer.id,
kind : consumer.kind,
rtpParameters : consumer.rtpParameters,
type : consumer.type,
appData : producer.appData,
producerPaused : consumer.producerPaused
});
// Now that we got the positive response from the remote endpoint, resume
// the Consumer so the remote endpoint will receive the a first RTP packet
// of this new stream once its PeerConnection is already ready to process
// and associate it.
await consumer.resume();
consumerPeer.notify(
'consumerScore',
{
consumerId : consumer.id,
score : consumer.score
})
.catch(() => {});
}
catch (error)
{
logger.warn('_createConsumer() | failed:%o', error);
}
}
/**
* Creates a mediasoup DataConsumer for the given mediasoup DataProducer.
*
* @async
*/
async _createDataConsumer(
{
dataConsumerPeer,
dataProducerPeer = null, // This is null for the bot DataProducer.
dataProducer
})
{
// NOTE: Don't create the DataConsumer if the remote Peer cannot consume it.
if (!dataConsumerPeer.data.sctpCapabilities)
return;
// Must take the Transport the remote Peer is using for consuming.
const transport = Array.from(dataConsumerPeer.data.transports.values())
.find((t) => t.appData.consuming);
// This should not happen.
if (!transport)
{
logger.warn('_createDataConsumer() | Transport for consuming not found');
return;
}
// Create the DataConsumer.
let dataConsumer;
try
{
dataConsumer = await transport.consumeData(
{
dataProducerId : dataProducer.id
});
}
catch (error)
{
logger.warn('_createDataConsumer() | transport.consumeData():%o', error);
return;
}
// Store the DataConsumer into the protoo dataConsumerPeer data Object.
dataConsumerPeer.data.dataConsumers.set(dataConsumer.id, dataConsumer);
// Set DataConsumer events.
dataConsumer.on('transportclose', () =>
{
// Remove from its map.
dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);
});
dataConsumer.on('dataproducerclose', () =>
{
// Remove from its map.
dataConsumerPeer.data.dataConsumers.delete(dataConsumer.id);
dataConsumerPeer.notify(
'dataConsumerClosed', { dataConsumerId: dataConsumer.id })
.catch(() => {});
});
// Send a protoo request to the remote Peer with Consumer parameters.
try
{
await dataConsumerPeer.request(
'newDataConsumer',
{
// This is null for bot DataProducer.
peerId : dataProducerPeer ? dataProducerPeer.id : null,
dataProducerId : dataProducer.id,
id : dataConsumer.id,
sctpStreamParameters : dataConsumer.sctpStreamParameters,
label : dataConsumer.label,
protocol : dataConsumer.protocol,
appData : dataProducer.appData
});
}
catch (error)
{
logger.warn('_createDataConsumer() | failed:%o', error);
}
}
}
module.exports = Room;
14-8 如何调试MediasoupDemo
本课程讲的是本机调试,也就是服务器和Chrome是在同一台PC上。
node --inspect-brk server.js
chrome://inspect
14-9 运行时查看Mediasoup的核心信息
进行调试时,使用方法1.
查看线上运行状况,使用方法2.
方法1:
cd server
export INTERACTIVE=1;node server.js
此时会进入命令行模式看到帮助,可以知道,输入usage,可以查看进程占用CPU情况,
输入t后,会进入terminal界面, 再单击Ctrl+C则回到cmd命令界面。
方法2
先运行server.js
cd server
node server.js &
如果是后台运行,则新建会话,再次进入server,否则直接运行node connnect.js
cd server
node connect.js
此时也会像方法1一样进入到cmd命令行模式。
可以启动客户端,然后加入两个用户,然后再在cmd命令行输入dw、dr等命令查看信息。