详情页模拟课从bff到server的RPC请求。

协议

  1. message Column {
  2. required int32 id = 1;
  3. required string column_cover = 2;
  4. required string column_title = 3;
  5. required string column_subtitle = 4;
  6. required string author_name = 5;
  7. required string author_intro = 6;
  8. required string column_intro = 7;
  9. required string column_unit = 8;
  10. required uint32 sub_count = 9;
  11. required string update_frequency = 10;
  12. required uint32 column_price = 11;
  13. optional uint32 column_price_market = 12;
  14. repeated Article articles = 13;
  15. }
  16. message Article {
  17. required uint32 id = 1;
  18. required bool is_video_preview = 2;
  19. required string article_title = 3;
  20. }
  21. message ColumnResponse {
  22. required Column column = 1;
  23. repeated Column recommendColumns = 2;
  24. }
  25. message ColumnRequest {
  26. required int32 columnid = 1;
  27. }

bff部分

  1. const mount = require('koa-mount');
  2. const static = require('koa-static')
  3. const app = new (require('koa'));
  4. const rpcClient = require('./client');
  5. const template = require('./template');
  6. const detailTemplate = template(__dirname + '/template/index.html');
  7. app.use(mount('/static', static(`${__dirname}/source/static/`)))
  8. app.use(async (ctx) => {
  9. if (!ctx.query.columnid) {
  10. ctx.status = 400;
  11. ctx.body = 'invalid columnid';
  12. return
  13. }
  14. const result = await new Promise((resolve, reject) => {
  15. rpcClient.write({
  16. columnid: ctx.query.columnid
  17. }, function (err, data) {
  18. err ? reject(err) : resolve(data)
  19. })
  20. })
  21. ctx.status = 200;
  22. ctx.body = detailTemplate(result);
  23. })
  24. app.listen(3000)
  25. // module.exports = app;

client.js

  1. const EasySock = require('easy_sock');
  2. const protobuf = require('protocol-buffers')
  3. const fs = require('fs');
  4. const schemas = protobuf(fs.readFileSync(`${__dirname}/detail.proto`));
  5. // EasySock
  6. // https://www.npmjs.com/package/easy_sock
  7. // 腾讯的一个socket工具
  8. const easySock = new EasySock({
  9. ip: '127.0.0.1',
  10. port: 4000,
  11. timeout: 500,
  12. keepAlive: true
  13. })
  14. easySock.encode = function(data, seq) {
  15. const body = schemas.ColumnRequest.encode(data);
  16. const head = Buffer.alloc(8);
  17. head.writeInt32BE(seq);
  18. head.writeInt32BE(body.length, 4);
  19. return Buffer.concat([head, body])
  20. }
  21. easySock.decode = function(buffer) {
  22. const seq = buffer.readInt32BE();
  23. const body = schemas.ColumnResponse.decode(buffer.slice(8));
  24. return {
  25. result: body,
  26. seq
  27. }
  28. }
  29. easySock.isReceiveComplete = function(buffer) {
  30. if (buffer.length < 8) {
  31. return 0
  32. }
  33. const bodyLength = buffer.readInt32BE(4);
  34. if (buffer.length >= bodyLength + 8) {
  35. return bodyLength + 8
  36. } else {
  37. return 0
  38. }
  39. }
  40. module.exports = easySock;

template就省略了

server部分

模拟的

  1. const fs = require('fs')
  2. const protobuf = require('protocol-buffers');
  3. const schemas = protobuf(
  4. fs.readFileSync(`${__dirname}/detail.proto`)
  5. );
  6. // 假数据
  7. const columnData = require('./mockdata/column')
  8. /**
  9. * 服务端的编解包逻辑
  10. */
  11. const server = require('./lib/geeknode-rpc-server')(schemas.ColumnRequest, schemas.ColumnResponse);
  12. server
  13. .createServer((request, response) => {
  14. // 因为都是假数据,这里就没有使用栏目id。真实项目会拿这个columnid去请求数据库
  15. const columnid = request.body;
  16. // 直接返回假数据
  17. response.end({
  18. column: columnData[0],
  19. recommendColumns: [columnData[1], columnData[2]]
  20. });
  21. })
  22. .listen(4000, ()=> {
  23. console.log('rpc server listened: 4000')
  24. });

rpc-server.js

  1. // 'use strict';
  2. // const debug = require("debug")('easysock-server');
  3. const net = require("net");
  4. module.exports = class RPC {
  5. constructor({ encodeResponse, decodeRequest, isCompleteRequest }) {
  6. this.encodeResponse = encodeResponse;
  7. this.decodeRequest = decodeRequest;
  8. this.isCompleteRequest = isCompleteRequest;
  9. }
  10. createServer(callback) {
  11. let buffer = null;
  12. const tcpServer = net.createServer((socket) => {
  13. socket.on('data', (data) => {
  14. buffer = (buffer && buffer.length > 0) ?
  15. Buffer.concat([buffer, data]) : // 有遗留数据才做拼接操作
  16. data;
  17. let checkLength = null;
  18. while (buffer && (checkLength = this.isCompleteRequest(buffer))) {
  19. let requestBuffer = null;
  20. if (checkLength == buffer.length) {
  21. requestBuffer = buffer;
  22. buffer = null;
  23. } else {
  24. requestBuffer = buffer.slice(0, checkLength);
  25. buffer = buffer.slice(checkLength);
  26. }
  27. const request = this.decodeRequest(requestBuffer);
  28. callback(
  29. { // request
  30. body: request.result,
  31. socket
  32. },
  33. { // response
  34. end: (data) => {
  35. const buffer = this.encodeResponse(data, request.seq)
  36. socket.write(buffer);
  37. }
  38. }
  39. );
  40. }
  41. })
  42. });
  43. return {
  44. listen() {
  45. tcpServer.listen.apply(tcpServer, arguments)
  46. }
  47. }
  48. }
  49. }

geeknode-rpc-server.js

  1. const RPC = require('./rpc-server');
  2. /**
  3. * 因为所有服务用的包头格式都一样,不一样的只有protobuf协议,所以这里可以将这段逻辑封成一个模块
  4. *
  5. * 日常做项目的时候一定要注意把重复代码做封装
  6. */
  7. module.exports = function (protobufRequestSchema, protobufResponseSchema) {
  8. return new RPC({
  9. // 解码请求包
  10. decodeRequest(buffer) {
  11. const seq = buffer.readUInt32BE();
  12. return {
  13. seq: seq,
  14. result: protobufRequestSchema.decode(buffer.slice(8))
  15. }
  16. },
  17. // 判断请求包是不是接收完成
  18. isCompleteRequest(buffer) {
  19. const bodyLength = buffer.readUInt32BE(4);
  20. return 8 + bodyLength
  21. },
  22. // 编码返回包
  23. encodeResponse(data, seq) {
  24. const body = protobufResponseSchema.encode(data);
  25. const head = Buffer.alloc(8);
  26. head.writeUInt32BE(seq);
  27. head.writeUInt32BE(body.length, 4);
  28. return Buffer.concat([head, body]);
  29. }
  30. })
  31. }