协议
message Column {required int32 id = 1;required string column_cover = 2;required string column_title = 3;required string column_subtitle = 4;required string author_name = 5;required string author_intro = 6;required string column_intro = 7;required string column_unit = 8;required uint32 sub_count = 9;required string update_frequency = 10;required uint32 column_price = 11;optional uint32 column_price_market = 12;repeated Article articles = 13;}message Article {required uint32 id = 1;required bool is_video_preview = 2;required string article_title = 3;}message ColumnResponse {required Column column = 1;repeated Column recommendColumns = 2;}message ColumnRequest {required int32 columnid = 1;}
bff部分
const mount = require('koa-mount');const static = require('koa-static')const app = new (require('koa'));const rpcClient = require('./client');const template = require('./template');const detailTemplate = template(__dirname + '/template/index.html');app.use(mount('/static', static(`${__dirname}/source/static/`)))app.use(async (ctx) => {if (!ctx.query.columnid) {ctx.status = 400;ctx.body = 'invalid columnid';return}const result = await new Promise((resolve, reject) => {rpcClient.write({columnid: ctx.query.columnid}, function (err, data) {err ? reject(err) : resolve(data)})})ctx.status = 200;ctx.body = detailTemplate(result);})app.listen(3000)// module.exports = app;
client.js
const EasySock = require('easy_sock');const protobuf = require('protocol-buffers')const fs = require('fs');const schemas = protobuf(fs.readFileSync(`${__dirname}/detail.proto`));// EasySock// https://www.npmjs.com/package/easy_sock// 腾讯的一个socket工具const easySock = new EasySock({ip: '127.0.0.1',port: 4000,timeout: 500,keepAlive: true})easySock.encode = function(data, seq) {const body = schemas.ColumnRequest.encode(data);const head = Buffer.alloc(8);head.writeInt32BE(seq);head.writeInt32BE(body.length, 4);return Buffer.concat([head, body])}easySock.decode = function(buffer) {const seq = buffer.readInt32BE();const body = schemas.ColumnResponse.decode(buffer.slice(8));return {result: body,seq}}easySock.isReceiveComplete = function(buffer) {if (buffer.length < 8) {return 0}const bodyLength = buffer.readInt32BE(4);if (buffer.length >= bodyLength + 8) {return bodyLength + 8} else {return 0}}module.exports = easySock;
server部分
模拟的
const fs = require('fs')const protobuf = require('protocol-buffers');const schemas = protobuf(fs.readFileSync(`${__dirname}/detail.proto`));// 假数据const columnData = require('./mockdata/column')/*** 服务端的编解包逻辑*/const server = require('./lib/geeknode-rpc-server')(schemas.ColumnRequest, schemas.ColumnResponse);server.createServer((request, response) => {// 因为都是假数据,这里就没有使用栏目id。真实项目会拿这个columnid去请求数据库const columnid = request.body;// 直接返回假数据response.end({column: columnData[0],recommendColumns: [columnData[1], columnData[2]]});}).listen(4000, ()=> {console.log('rpc server listened: 4000')});
rpc-server.js
// 'use strict';// const debug = require("debug")('easysock-server');const net = require("net");module.exports = class RPC {constructor({ encodeResponse, decodeRequest, isCompleteRequest }) {this.encodeResponse = encodeResponse;this.decodeRequest = decodeRequest;this.isCompleteRequest = isCompleteRequest;}createServer(callback) {let buffer = null;const tcpServer = net.createServer((socket) => {socket.on('data', (data) => {buffer = (buffer && buffer.length > 0) ?Buffer.concat([buffer, data]) : // 有遗留数据才做拼接操作data;let checkLength = null;while (buffer && (checkLength = this.isCompleteRequest(buffer))) {let requestBuffer = null;if (checkLength == buffer.length) {requestBuffer = buffer;buffer = null;} else {requestBuffer = buffer.slice(0, checkLength);buffer = buffer.slice(checkLength);}const request = this.decodeRequest(requestBuffer);callback({ // requestbody: request.result,socket},{ // responseend: (data) => {const buffer = this.encodeResponse(data, request.seq)socket.write(buffer);}});}})});return {listen() {tcpServer.listen.apply(tcpServer, arguments)}}}}
geeknode-rpc-server.js
const RPC = require('./rpc-server');/*** 因为所有服务用的包头格式都一样,不一样的只有protobuf协议,所以这里可以将这段逻辑封成一个模块** 日常做项目的时候一定要注意把重复代码做封装*/module.exports = function (protobufRequestSchema, protobufResponseSchema) {return new RPC({// 解码请求包decodeRequest(buffer) {const seq = buffer.readUInt32BE();return {seq: seq,result: protobufRequestSchema.decode(buffer.slice(8))}},// 判断请求包是不是接收完成isCompleteRequest(buffer) {const bodyLength = buffer.readUInt32BE(4);return 8 + bodyLength},// 编码返回包encodeResponse(data, seq) {const body = protobufResponseSchema.encode(data);const head = Buffer.alloc(8);head.writeUInt32BE(seq);head.writeUInt32BE(body.length, 4);return Buffer.concat([head, body]);}})}
