协议
message Column {
required int32 id = 1;
required string column_cover = 2;
required string column_title = 3;
required string column_subtitle = 4;
required string author_name = 5;
required string author_intro = 6;
required string column_intro = 7;
required string column_unit = 8;
required uint32 sub_count = 9;
required string update_frequency = 10;
required uint32 column_price = 11;
optional uint32 column_price_market = 12;
repeated Article articles = 13;
}
message Article {
required uint32 id = 1;
required bool is_video_preview = 2;
required string article_title = 3;
}
message ColumnResponse {
required Column column = 1;
repeated Column recommendColumns = 2;
}
message ColumnRequest {
required int32 columnid = 1;
}
bff部分
const mount = require('koa-mount');
const static = require('koa-static')
const app = new (require('koa'));
const rpcClient = require('./client');
const template = require('./template');
const detailTemplate = template(__dirname + '/template/index.html');
app.use(mount('/static', static(`${__dirname}/source/static/`)))
app.use(async (ctx) => {
if (!ctx.query.columnid) {
ctx.status = 400;
ctx.body = 'invalid columnid';
return
}
const result = await new Promise((resolve, reject) => {
rpcClient.write({
columnid: ctx.query.columnid
}, function (err, data) {
err ? reject(err) : resolve(data)
})
})
ctx.status = 200;
ctx.body = detailTemplate(result);
})
app.listen(3000)
// module.exports = app;
client.js
const EasySock = require('easy_sock');
const protobuf = require('protocol-buffers')
const fs = require('fs');
const schemas = protobuf(fs.readFileSync(`${__dirname}/detail.proto`));
// EasySock
// https://www.npmjs.com/package/easy_sock
// 腾讯的一个socket工具
const easySock = new EasySock({
ip: '127.0.0.1',
port: 4000,
timeout: 500,
keepAlive: true
})
easySock.encode = function(data, seq) {
const body = schemas.ColumnRequest.encode(data);
const head = Buffer.alloc(8);
head.writeInt32BE(seq);
head.writeInt32BE(body.length, 4);
return Buffer.concat([head, body])
}
easySock.decode = function(buffer) {
const seq = buffer.readInt32BE();
const body = schemas.ColumnResponse.decode(buffer.slice(8));
return {
result: body,
seq
}
}
easySock.isReceiveComplete = function(buffer) {
if (buffer.length < 8) {
return 0
}
const bodyLength = buffer.readInt32BE(4);
if (buffer.length >= bodyLength + 8) {
return bodyLength + 8
} else {
return 0
}
}
module.exports = easySock;
server部分
模拟的
const fs = require('fs')
const protobuf = require('protocol-buffers');
const schemas = protobuf(
fs.readFileSync(`${__dirname}/detail.proto`)
);
// 假数据
const columnData = require('./mockdata/column')
/**
* 服务端的编解包逻辑
*/
const server = require('./lib/geeknode-rpc-server')(schemas.ColumnRequest, schemas.ColumnResponse);
server
.createServer((request, response) => {
// 因为都是假数据,这里就没有使用栏目id。真实项目会拿这个columnid去请求数据库
const columnid = request.body;
// 直接返回假数据
response.end({
column: columnData[0],
recommendColumns: [columnData[1], columnData[2]]
});
})
.listen(4000, ()=> {
console.log('rpc server listened: 4000')
});
rpc-server.js
// 'use strict';
// const debug = require("debug")('easysock-server');
const net = require("net");
module.exports = class RPC {
constructor({ encodeResponse, decodeRequest, isCompleteRequest }) {
this.encodeResponse = encodeResponse;
this.decodeRequest = decodeRequest;
this.isCompleteRequest = isCompleteRequest;
}
createServer(callback) {
let buffer = null;
const tcpServer = net.createServer((socket) => {
socket.on('data', (data) => {
buffer = (buffer && buffer.length > 0) ?
Buffer.concat([buffer, data]) : // 有遗留数据才做拼接操作
data;
let checkLength = null;
while (buffer && (checkLength = this.isCompleteRequest(buffer))) {
let requestBuffer = null;
if (checkLength == buffer.length) {
requestBuffer = buffer;
buffer = null;
} else {
requestBuffer = buffer.slice(0, checkLength);
buffer = buffer.slice(checkLength);
}
const request = this.decodeRequest(requestBuffer);
callback(
{ // request
body: request.result,
socket
},
{ // response
end: (data) => {
const buffer = this.encodeResponse(data, request.seq)
socket.write(buffer);
}
}
);
}
})
});
return {
listen() {
tcpServer.listen.apply(tcpServer, arguments)
}
}
}
}
geeknode-rpc-server.js
const RPC = require('./rpc-server');
/**
* 因为所有服务用的包头格式都一样,不一样的只有protobuf协议,所以这里可以将这段逻辑封成一个模块
*
* 日常做项目的时候一定要注意把重复代码做封装
*/
module.exports = function (protobufRequestSchema, protobufResponseSchema) {
return new RPC({
// 解码请求包
decodeRequest(buffer) {
const seq = buffer.readUInt32BE();
return {
seq: seq,
result: protobufRequestSchema.decode(buffer.slice(8))
}
},
// 判断请求包是不是接收完成
isCompleteRequest(buffer) {
const bodyLength = buffer.readUInt32BE(4);
return 8 + bodyLength
},
// 编码返回包
encodeResponse(data, seq) {
const body = protobufResponseSchema.encode(data);
const head = Buffer.alloc(8);
head.writeUInt32BE(seq);
head.writeUInt32BE(body.length, 4);
return Buffer.concat([head, body]);
}
})
}