项目背景
由于武汉疫情的爆发,导致丁香人才服务的医药企业、医院等B端客户在招聘上遇到了困难。为了解决招聘难题,提供无接触性云上招聘,所以团队紧急立项了一个关于远程视频面试的服务。
在即时通信类项目中,不可避免的需要用到服务端推送的功能,允许服务端主动给客户端推送数据。
比如我们有面试房间的场景:
- 面试过程中,可能会有新的面试官进入房间,这时候就需要通知房间内的每一个人;
- 房间中的求职者面试状态改为已准备,此时也需要通知到房间内的面试官,求职者已经准备好了,可以开始面试。
WS 简介
在有 websocket 之前,客户端想要知道服务端的处理进度,大致有两种方式:
- 使用 ajax 轮询,每隔一段时间发起一个请求,确认进度,但这种方式会造成同步不精准;
- 第二种是使用长连接(long poll),就是客户端发起连接后,直到服务端返回 Response 后才会断开,这就造成连接阶段的阻塞情况,资源开销比较大。
接下来,我们可以打开 Chrome 控制台来查看一下 ws:
如果有 websocket 连接存在则会显示:
**
那 websocket 和我们日常接触的 HTTP 请求有什么不同呢?两者皆是基于传输层 TCP 协议,但其实两者几乎无关系。
WebSocket 是一种网络传输协议,可在单个 TCP 连接上进行全双工通信,位于 OSI模型的应用层。WebSocket 协议在2011年由 IETF标准化为RFC 6455,后由 RFC 7936 补充规范。Web IDL中的WebSocket API由W3C标准化。 WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。
Websocket | HTTP |
---|---|
持久化协议(只需建立一次连接) | 非持久化协议(无连接、无状态) |
全双工通信 | 单向通信(客户端->服务端) |
C-S连接需要进行HTTP协议进行3次握手、4次挥手 | 无连接 |
基于这种全双工通信的特点,客户端和服务端只需要建立一次连接,后续两端就都可以主动地向对方传输数据了。
demo 案例
https://github.com/sockjs/sockjs-node/tree/v0.3.19/examples/koa
可以借助 socket-client 和 socket-node 两个库来实现(当然 websokcet 是浏览器原生支持的)。
打开控制台:
我们可以看到第一次连接成功,服务端会下行一个o(open)连接成功。当客户端上行发送一个“hello”时,服务端自动下行一个同样的消息。这样,两端就可以自由的长久双向通信了。
WS 连接请求(如何识别)
前面说了,websocket 只需要客户端和服务端建立一次连接。其实这一次连接就是一次 HTTP 连接,就如上图。上图中的 Request Headers 中向我们展示了一些信息:
- Connection: Upgrade(表示我需要连接升级)
- Upgrade: websocket(我发起的是 websocket 协议)
- Sec-**Websocket-Version: 13(表示 WebSocket 的版本)**
- Sec-Websocket-Key: 浏览器随机生成的 Base64 Encode 值,防止恶意连接
项目实践
丁香人才远程面试项目中客户端使用 socket-client 和 stompjs。
SockJS是一个浏览器JavaScript库,提供类似于WebSocket的对象。SockJS为您提供了一个一致的,跨浏览器的Javascript API,该API在浏览器和Web服务器之间创建了低延迟,全双工,跨域的通信通道。
对于STOMP协议来说, 客户端会扮演下列两种角色的任意一种:
- 作为生产者,通过SEND帧发送消息到指定的地址
- 作为消费者,通过发送SUBSCRIBE帧到已知地址来进行消息订阅,而当生产者发送消息到这个订阅地址后,订阅该地址的其他消费者会受到通过MESSAGE帧收到该消息
实际上,WebSocket结合STOMP相当于构建了一个消息分发队列,客户端可以在上述两个角色间转换,订阅机制保证了一个客户端消息可以通过服务器广播到多个其他客户端,作为生产者,又可以通过服务器来发送点对点消息。
这边只做客户端的代码实现,创建一个 socket 类方法。
**socket.js**
**
import SockJS from 'sockjs-client/dist/sockjs.min'
import { Stomp } from '@stomp/stompjs'
/**
* 客户端 websocket 类方法
*/
export default class Socket {
constructor(options) {
this.host = 'http://0.0.0.0:9999/'
}
// 连接
connect () {
let self = this
self.stompClient = Stomp.over(() => {
return new SockJS(self.host)
})
self.stompClient.connect(
{},
function(frame) {
// connect success
},
)
}
// 断开连接
disconnect(res) {
this.stompClient.disconnect()
console.log('Disconnected, 断开链接', res)
}
}
调用
import Socket from '@/utils/socket'
mySocket = new Socket({})
// 建立连接
mySocket.connect()
消息订阅与成功回调**socket.js**
**
import SockJS from 'sockjs-client/dist/sockjs.min'
import { Stomp } from '@stomp/stompjs'
/**
* 客户端 websocket 类方法
*/
export default class Socket {
constructor(options) {
this.onJoin = options.onJoin
this.host = 'http://0.0.0.0:9999/'
}
// 连接
connect () {
let self = this
self.stompClient = Stomp.over(() => {
return new SockJS(self.host)
})
self.stompClient.connect(
{},
function(frame) {
// connect success
self.initSubscribe()
},
)
}
// 断开连接
disconnect(res) {
this.stompClient.disconnect()
console.log('Disconnected, 断开链接', res)
}
// 消息订阅(与后端约定订阅地址和返回type)
initSubscribe () {
const self = this
// 测试
this.stompClient.subscribe('/topic', function(output) {
// this.chatOutput(JSON.parse(output.body))
console.log('*******' + output + '*******')
// 有人加入
if (output.msgType === 0) {
self.onJoin && self.onJoin(parsedOutput)
console.log('*******' + 有人加入 + '*******')
}
})
}
}
调用**
import Socket from '@/utils/socket'
mySocket = new Socket({
// 有人加入
onJoin: async function(parsedOutput) {
console.log('onJoin', parsedOutput)
// do somethings
},
})
// 建立连接
mySocket.connect()
客户端发送、通知消息**socket.js**
import SockJS from 'sockjs-client/dist/sockjs.min'
import { Stomp } from '@stomp/stompjs'
/**
* 客户端 websocket 类方法
*/
export default class Socket {
constructor(options) {
this.onJoin = options.onJoin
this.host = 'http://0.0.0.0:9999/'
this.commonParameterGather = {
token: '',
userId: ''
}
}
// 连接
connect () {
let self = this
self.stompClient = Stomp.over(() => {
return new SockJS(self.host)
})
self.stompClient.connect(
{},
function(frame) {
// connect success
self.initSubscribe()
},
)
}
// 断开连接
disconnect(res) {
this.stompClient.disconnect()
console.log('Disconnected, 断开链接', res)
}
// 消息订阅(与后端约定订阅地址和返回type)
initSubscribe () {
const self = this
// 测试
this.stompClient.subscribe('/topic', function(output) {
// this.chatOutput(JSON.parse(output.body))
console.log('*******' + output + '*******')
// 有人加入
if (output.msgType === 0) {
self.onJoin && self.onJoin(parsedOutput)
console.log('*******' + 有人加入 + '*******')
}
})
}
/**
* 发送、通知信息
* @param {string} path - 发送地址(与后端约定)
* @param {object} extraParameter - 额外传参数
*/
sendMssage(path, extraParameter = {}) {
// 面试者已准备
if (path === '/ws/userInRoom/ready') {
this.sendEvent(path, {
...extraParameter
})
}
}
/**
* 通用信息发送方法
* @param {string} path - 请求路径
* @param {object} extraParameter - 除了通用 token、userId,额外需要传递的参数
*/
sendEvent(path, extraParameter = {}) {
this.stompClient.send(
path,
{},
JSON.stringify(
Object.keys(extraParameter).length
? Object.assign({}, this.commonParameterGather, {
...extraParameter
})
: this.commonParameterGather
)
)
}
}
调用**
import Socket from '@/utils/socket'
mySocket = new Socket({
// 有人加入
onJoin: async function(parsedOutput) {
console.log('onJoin', parsedOutput)
// do somethings
},
})
// 建立连接
mySocket.connect()
// 发送面试者已准备
mySocket.sendMssage('/ws/userInRoom/ready', {
status: 1
})
异常处理与重连机制**socket.js**
import SockJS from 'sockjs-client/dist/sockjs.min'
import { Stomp } from '@stomp/stompjs'
/**
* 客户端 websocket 类方法
*/
export default class Socket {
constructor(options) {
this.onJoin = options.onJoin
this.host = 'http://0.0.0.0:9999/'
this.commonParameterGather = {
token: '',
userId: ''
}
}
// 异常处理
exceptionHandling() {
return new Promise((resolve, reject) => {
const errMsg = {}
if (!this.roomId) {
errMsg.roomIdError = '房间 id 错误!'
}
if (!this.userId) {
errMsg.userIdError = '用户 id 错误!'
}
if (!this.token) {
errMsg.tokenError = 'token 错误!'
}
if (Object.keys(errMsg).length > 0) {
resolve({
success: false,
errMsg: errMsg
})
} else {
resolve({
success: true
})
}
})
}
// 连接
connect () {
let self = this
return new Promise(resolve => {
this.exceptionHandling().then(res => {
if (res.success) {
self.connectAndReconnect(() => {
resolve()
})
} else {
throw new Error(
Object.keys(res.errMsg).reduce((acc, cur) => {
return acc + res.errMsg[cur]
}, '')
)
}
})
})
}
connectAndReconnect(successCallback) {
const self = this
self.stompClient = Stomp.over(() => {
return new SockJS(self.host)
})
self.stompClient.connect(
'',
'',
frame => {
console.log('******* Connected: *******')
self.initSubscribe()
successCallback()
},
res => {
setTimeout(() => {
console.log('websocket链接失败', res)
self.connectAndReconnect(successCallback)
}, 2000)
},
close => {
// 重连
}
)
}
// 断开连接
disconnect(res) {
this.stompClient.disconnect()
console.log('Disconnected, 断开链接', res)
}
// 消息订阅(与后端约定订阅地址和返回type)
initSubscribe () {
const self = this
// 测试
this.stompClient.subscribe('/topic', function(output) {
// this.chatOutput(JSON.parse(output.body))
console.log('*******' + output + '*******')
// 有人加入
if (output.msgType === 0) {
self.onJoin && self.onJoin(parsedOutput)
console.log('*******' + 有人加入 + '*******')
}
})
}
/**
* 发送、通知信息
* @param {string} path - 发送地址(与后端约定)
* @param {object} extraParameter - 额外传参数
*/
sendMssage(path, extraParameter = {}) {
// 面试者已准备
if (path === '/ws/userInRoom/ready') {
this.sendEvent(path, {
...extraParameter
})
}
}
/**
* 通用信息发送方法
* @param {string} path - 请求路径
* @param {object} extraParameter - 除了通用 token、userId,额外需要传递的参数
*/
sendEvent(path, extraParameter = {}) {
this.stompClient.send(
path,
{},
JSON.stringify(
Object.keys(extraParameter).length
? Object.assign({}, this.commonParameterGather, {
...extraParameter
})
: this.commonParameterGather
)
)
}
}