gRPC 基本概念介绍:《微服务通信:gRPC 介绍》

gRPC 的使用通常包括如下几个步骤:

  • 通过 protobuf 来定义接口和数据类型
  • 编写 gRPC server 端代码
  • 编写 gRPC client 端代码

本文通过一个实例来详细讲解上述的三步。

运行环境:Node version >= 8.13.0

1. Quick start 示例

1.1 初始化示例项目

  1. $ npm init --yes
  2. # 向 package.json 文件添加 grpc 依赖
  3. "dependencies": {
  4. "@grpc/proto-loader": "^0.5.0",
  5. "async": "^1.5.2",
  6. "google-protobuf": "^3.0.0",
  7. "@grpc/grpc-js": "^1.1.0",
  8. "lodash": "^4.6.1",
  9. "minimist": "^1.2.0"
  10. }
  11. $ npm inistall

1.2 编写 helloworld.proto 文件来定义接口和数据类型

  1. syntax = "proto3";
  2. option java_multiple_files = true;
  3. option java_package = "io.grpc.examples.helloworld";
  4. option java_outer_classname = "HelloWorldProto";
  5. option objc_class_prefix = "HLW";
  6. package helloworld;
  7. // The greeting service definition.
  8. service Greeter {
  9. // Sends a greeting
  10. rpc SayHello (HelloRequest) returns (HelloReply) {}
  11. // Sends another greeting
  12. rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
  13. }
  14. // The request message containing the user's name.
  15. message HelloRequest {
  16. string name = 1;
  17. }
  18. // The response message containing the greetings
  19. message HelloReply {
  20. string message = 1;
  21. }

1.3 编写 server 端代码 greeter_server.js

  1. var PROTO_PATH = __dirname + '/helloworld.proto';
  2. var grpc = require('@grpc/grpc-js');
  3. var protoLoader = require('@grpc/proto-loader');
  4. var packageDefinition = protoLoader.loadSync(
  5. PROTO_PATH,
  6. {keepCase: true,
  7. longs: String,
  8. enums: String,
  9. defaults: true,
  10. oneofs: true
  11. });
  12. var hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld;
  13. /**
  14. * Implements the SayHello RPC method.
  15. */
  16. function sayHello(call, callback) {
  17. callback(null, {message: 'Hello ' + call.request.name});
  18. }
  19. function sayHelloAgain(call, callback) {
  20. callback(null, {message: 'Hello again, ' + call.request.name});
  21. }
  22. /**
  23. * Starts an RPC server that receives requests for the Greeter service at the
  24. * sample server port
  25. */
  26. function main() {
  27. var server = new grpc.Server();
  28. server.addService(hello_proto.Greeter.service,
  29. {sayHello: sayHello, sayHelloAgain: sayHelloAgain});
  30. server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
  31. server.start();
  32. });
  33. }
  34. main();

1.4 编写 client 端代码 greeter_client.js

  1. var PROTO_PATH = __dirname + '/helloworld.proto';
  2. var parseArgs = require('minimist');
  3. var grpc = require('@grpc/grpc-js');
  4. var protoLoader = require('@grpc/proto-loader');
  5. var packageDefinition = protoLoader.loadSync(
  6. PROTO_PATH,
  7. {keepCase: true,
  8. longs: String,
  9. enums: String,
  10. defaults: true,
  11. oneofs: true
  12. });
  13. var hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld;
  14. function main() {
  15. var argv = parseArgs(process.argv.slice(2), {
  16. string: 'target'
  17. });
  18. var target;
  19. if (argv.target) {
  20. target = argv.target;
  21. } else {
  22. target = 'localhost:50051';
  23. }
  24. var client = new hello_proto.Greeter(target,
  25. grpc.credentials.createInsecure());
  26. var user;
  27. if (argv._.length > 0) {
  28. user = argv._[0];
  29. } else {
  30. user = 'world';
  31. }
  32. client.sayHello({name: user}, function(err, response) {
  33. console.log('Greeting:', response.message);
  34. });
  35. client.sayHelloAgain({name: 'you'}, function(err, response) {
  36. console.log('Greeting:', response.message);
  37. });
  38. }
  39. main();

1.5 运行测试

  1. $ node greeter_server.js
  2. # 为 client 新建一个终端,运行以下命令
  3. $ node greeter_client.js
  4. # client 端输出结果
  5. Greeting: Hello world
  6. Greeting: Hello again, you

2. Basics tutorial 示例

学习内容:

  • 在一个 .proto 文件内定义服务。
  • 用 protocol buffer 编译器生成服务器和客户端代码。
  • 使用 gRPC 的 Node.js API 为你的服务实现一个简单的客户端和服务器。

2.1 编写 route_guide.proto 服务定义

  • 要定义一个服务,你必须在你的 .proto 文件中指定 service:

    1. service RouteGuide {
    2. ...
    3. }
  • 然后在你的服务中定义 rpc 方法,指定请求的和响应类型。gRPC 允许你定义4种类型的 service 方法,在 RouteGuide 服务中都有使用:

    • 一个 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。

      1. // Obtains the feature at a given position.
      2. rpc GetFeature(Point) returns (Feature) {}
    • 一个 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。

      1. // Obtains the Features available within the given Rectangle. Results are
      2. // streamed rather than returned at once (e.g. in a response message with a
      3. // repeated field), as the rectangle may cover a large area and contain a
      4. // huge number of features.
      5. rpc ListFeatures(Rectangle) returns (stream Feature) {}
    • 一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。

      1. // Accepts a stream of Points on a route being traversed, returning a
      2. // RouteSummary when traversal is completed.
      3. rpc RecordRoute(stream Point) returns (RouteSummary) {}
    • 一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。

      1. // Accepts a stream of RouteNotes sent while a route is being traversed,
      2. // while receiving other RouteNotes (e.g. from other users).
      3. rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
  • 我们的 .proto 文件也包含了所有请求的 protocol buffer 消息类型定义以及在服务方法中使用的响应类型——比如,下面的 Point 消息类型:

    1. // Points are represented as latitude-longitude pairs in the E7 representation
    2. // (degrees multiplied by 10**7 and rounded to the nearest integer).
    3. // Latitudes should be in the range +/- 90 degrees and longitude should be in
    4. // the range +/- 180 degrees (inclusive).
    5. message Point {
    6. int32 latitude = 1;
    7. int32 longitude = 2;
    8. }

2.2 从 .proto 文件加载服务描述符

Node.js 的类库在运行时加载 .proto 中的客户端存根并动态生成服务描述符。

要加载一个 .proto 文件,只需要 require gRPC proto loader 类库,然后使用它的 loadSync() 方法。最后将加载的内容传递给 gRPC 类库的 loadPackageDefinition() 方法:

  1. var PROTO_PATH = __dirname + '/route_guide.proto';
  2. var grpc = require('@grpc/grpc-js');
  3. var protoLoader = require('@grpc/proto-loader');
  4. // Suggested options for similarity to existing grpc.load behavior
  5. var packageDefinition = protoLoader.loadSync(
  6. PROTO_PATH,
  7. {keepCase: true,
  8. longs: String,
  9. enums: String,
  10. defaults: true,
  11. oneofs: true
  12. });
  13. var protoDescriptor = grpc.loadPackageDefinition(packageDefinition);
  14. // The protoDescriptor object has the full package hierarchy
  15. var routeguide = protoDescriptor.routeguide;

一旦你完成这个,存根构造函数是在 routeguide 命名空间(protoDescriptor.routeguide.RouteGuide)中而服务描述符(用来创建服务器)是存根(protoDescriptor.routeguide.RouteGuide.service)的一个属性。

  • 完整的 route_guide.proto 文件 ```protobuf syntax = “proto3”;

option java_multiple_files = true; option java_package = “io.grpc.examples.routeguide”; option java_outer_classname = “RouteGuideProto”; option objc_class_prefix = “RTG”;

package routeguide;

// Interface exported by the server. service RouteGuide { // A simple RPC. // // Obtains the feature at a given position. // // A feature with an empty name is returned if there’s no feature at the given // position. rpc GetFeature(Point) returns (Feature) {}

// A server-to-client streaming RPC. // // Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}

// A client-to-server streaming RPC. // // Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}

// A Bidirectional streaming RPC. // // Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} }

// Points are represented as latitude-longitude pairs in the E7 representation // (degrees multiplied by 10**7 and rounded to the nearest integer). // Latitudes should be in the range +/- 90 degrees and longitude should be in // the range +/- 180 degrees (inclusive). message Point { int32 latitude = 1; int32 longitude = 2; }

// A latitude-longitude rectangle, represented as two diagonally opposite // points “lo” and “hi”. message Rectangle { // One corner of the rectangle. Point lo = 1;

// The other corner of the rectangle. Point hi = 2; }

// A feature names something at a given point. // // If a feature could not be named, the name is empty. message Feature { // The name of the feature. string name = 1;

// The point where the feature is detected. Point location = 2; }

// A RouteNote is a message sent while at a given point. message RouteNote { // The location from which the message is sent. Point location = 1;

// The message to be sent. string message = 2; }

// A RouteSummary is received in response to a RecordRoute rpc. // // It contains the number of individual points received, the number of // detected features, and the total distance covered as the cumulative sum of // the distance between each point. message RouteSummary { // The number of points received. int32 point_count = 1;

// The number of known features passed while traversing the route. int32 feature_count = 2;

// The distance covered in metres. int32 distance = 3;

// The duration of the traversal in seconds. int32 elapsed_time = 4; }

  1. <a name="4PFZR"></a>
  2. ## 2.3 创建服务器
  3. 首先来看看我们如何创建一个 RouteGuide 服务器。
  4. 让 RouteGuide 服务运作起来需要有两个部分支持:
  5. - 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”。
  6. - 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。
  7. - 完整的 **route_guide_server.js**,看看它是如何工作的。
  8. ```javascript
  9. var PROTO_PATH = __dirname + '/../../../protos/route_guide.proto';
  10. var fs = require('fs');
  11. var parseArgs = require('minimist');
  12. var path = require('path');
  13. var _ = require('lodash');
  14. var grpc = require('@grpc/grpc-js');
  15. var protoLoader = require('@grpc/proto-loader');
  16. var packageDefinition = protoLoader.loadSync(
  17. PROTO_PATH,
  18. {keepCase: true,
  19. longs: String,
  20. enums: String,
  21. defaults: true,
  22. oneofs: true
  23. });
  24. var routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide;
  25. var COORD_FACTOR = 1e7;
  26. /**
  27. * For simplicity, a point is a record type that looks like
  28. * {latitude: number, longitude: number}, and a feature is a record type that
  29. * looks like {name: string, location: point}. feature objects with name===''
  30. * are points with no feature.
  31. */
  32. /**
  33. * List of feature objects at points that have been requested so far.
  34. */
  35. var feature_list = [];
  36. /**
  37. * Get a feature object at the given point, or creates one if it does not exist.
  38. * @param {point} point The point to check
  39. * @return {feature} The feature object at the point. Note that an empty name
  40. * indicates no feature
  41. */
  42. function checkFeature(point) {
  43. var feature;
  44. // Check if there is already a feature object for the given point
  45. for (var i = 0; i < feature_list.length; i++) {
  46. feature = feature_list[i];
  47. if (feature.location.latitude === point.latitude &&
  48. feature.location.longitude === point.longitude) {
  49. return feature;
  50. }
  51. }
  52. var name = '';
  53. feature = {
  54. name: name,
  55. location: point
  56. };
  57. return feature;
  58. }
  59. /**
  60. * getFeature request handler. Gets a request with a point, and responds with a
  61. * feature object indicating whether there is a feature at that point.
  62. * @param {EventEmitter} call Call object for the handler to process
  63. * @param {function(Error, feature)} callback Response callback
  64. */
  65. function getFeature(call, callback) {
  66. callback(null, checkFeature(call.request));
  67. }
  68. /**
  69. * listFeatures request handler. Gets a request with two points, and responds
  70. * with a stream of all features in the bounding box defined by those points.
  71. * @param {Writable} call Writable stream for responses with an additional
  72. * request property for the request value.
  73. */
  74. function listFeatures(call) {
  75. var lo = call.request.lo;
  76. var hi = call.request.hi;
  77. var left = _.min([lo.longitude, hi.longitude]);
  78. var right = _.max([lo.longitude, hi.longitude]);
  79. var top = _.max([lo.latitude, hi.latitude]);
  80. var bottom = _.min([lo.latitude, hi.latitude]);
  81. // For each feature, check if it is in the given bounding box
  82. _.each(feature_list, function(feature) {
  83. if (feature.name === '') {
  84. return;
  85. }
  86. if (feature.location.longitude >= left &&
  87. feature.location.longitude <= right &&
  88. feature.location.latitude >= bottom &&
  89. feature.location.latitude <= top) {
  90. call.write(feature);
  91. }
  92. });
  93. call.end();
  94. }
  95. /**
  96. * Calculate the distance between two points using the "haversine" formula.
  97. * The formula is based on http://mathforum.org/library/drmath/view/51879.html.
  98. * @param start The starting point
  99. * @param end The end point
  100. * @return The distance between the points in meters
  101. */
  102. function getDistance(start, end) {
  103. function toRadians(num) {
  104. return num * Math.PI / 180;
  105. }
  106. var R = 6371000; // earth radius in metres
  107. var lat1 = toRadians(start.latitude / COORD_FACTOR);
  108. var lat2 = toRadians(end.latitude / COORD_FACTOR);
  109. var lon1 = toRadians(start.longitude / COORD_FACTOR);
  110. var lon2 = toRadians(end.longitude / COORD_FACTOR);
  111. var deltalat = lat2-lat1;
  112. var deltalon = lon2-lon1;
  113. var a = Math.sin(deltalat/2) * Math.sin(deltalat/2) +
  114. Math.cos(lat1) * Math.cos(lat2) *
  115. Math.sin(deltalon/2) * Math.sin(deltalon/2);
  116. var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
  117. return R * c;
  118. }
  119. /**
  120. * recordRoute handler. Gets a stream of points, and responds with statistics
  121. * about the "trip": number of points, number of known features visited, total
  122. * distance traveled, and total time spent.
  123. * @param {Readable} call The request point stream.
  124. * @param {function(Error, routeSummary)} callback The callback to pass the
  125. * response to
  126. */
  127. function recordRoute(call, callback) {
  128. var point_count = 0;
  129. var feature_count = 0;
  130. var distance = 0;
  131. var previous = null;
  132. // Start a timer
  133. var start_time = process.hrtime();
  134. call.on('data', function(point) {
  135. point_count += 1;
  136. if (checkFeature(point).name !== '') {
  137. feature_count += 1;
  138. }
  139. /* For each point after the first, add the incremental distance from the
  140. * previous point to the total distance value */
  141. if (previous != null) {
  142. distance += getDistance(previous, point);
  143. }
  144. previous = point;
  145. });
  146. call.on('end', function() {
  147. callback(null, {
  148. point_count: point_count,
  149. feature_count: feature_count,
  150. // Cast the distance to an integer
  151. distance: distance|0,
  152. // End the timer
  153. elapsed_time: process.hrtime(start_time)[0]
  154. });
  155. });
  156. }
  157. var route_notes = {};
  158. /**
  159. * Turn the point into a dictionary key.
  160. * @param {point} point The point to use
  161. * @return {string} The key for an object
  162. */
  163. function pointKey(point) {
  164. return point.latitude + ' ' + point.longitude;
  165. }
  166. /**
  167. * routeChat handler. Receives a stream of message/location pairs, and responds
  168. * with a stream of all previous messages at each of those locations.
  169. * @param {Duplex} call The stream for incoming and outgoing messages
  170. */
  171. function routeChat(call) {
  172. call.on('data', function(note) {
  173. var key = pointKey(note.location);
  174. /* For each note sent, respond with all previous notes that correspond to
  175. * the same point */
  176. if (route_notes.hasOwnProperty(key)) {
  177. _.each(route_notes[key], function(note) {
  178. call.write(note);
  179. });
  180. } else {
  181. route_notes[key] = [];
  182. }
  183. // Then add the new note to the list
  184. route_notes[key].push(JSON.parse(JSON.stringify(note)));
  185. });
  186. call.on('end', function() {
  187. call.end();
  188. });
  189. }
  190. /**
  191. * Get a new server with the handler functions in this file bound to the methods
  192. * it serves.
  193. * @return {Server} The new server object
  194. */
  195. function getServer() {
  196. var server = new grpc.Server();
  197. server.addService(routeguide.RouteGuide.service, {
  198. getFeature: getFeature,
  199. listFeatures: listFeatures,
  200. recordRoute: recordRoute,
  201. routeChat: routeChat
  202. });
  203. return server;
  204. }
  205. if (require.main === module) {
  206. // If this is run as a script, start a server on an unused port
  207. var routeServer = getServer();
  208. routeServer.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
  209. var argv = parseArgs(process.argv, {
  210. string: 'db_path'
  211. });
  212. fs.readFile(path.resolve(argv.db_path), function(err, data) {
  213. if (err) throw err;
  214. feature_list = JSON.parse(data);
  215. routeServer.start();
  216. });
  217. });
  218. }
  219. exports.getServer = getServer;

2.3.1 实现 RouteGuide

可以看出,我们的服务器有一个从 RouteGuide.service 描述符对象生成的 Server 构造函数:

  1. var Server = new grpc.Server();

在这个场景下,我们实现了 异步 版本的 RouteGuide,它提供了 gRPC 缺省的行为。

route_guide_server.js 中的函数实现了所有的服务方法。

最简单的类型 RPC:getFeature

首先让我们看看最简单的类型 getFeature,它从客户端拿到一个 Point 对象,然后返回包含从数据库拿到的 feature 信息的 Feature。

  1. function checkFeature(point) {
  2. var feature;
  3. // Check if there is already a feature object for the given point
  4. for (var i = 0; i < feature_list.length; i++) {
  5. feature = feature_list[i];
  6. if (feature.location.latitude === point.latitude &&
  7. feature.location.longitude === point.longitude) {
  8. return feature;
  9. }
  10. }
  11. var name = '';
  12. feature = {
  13. name: name,
  14. location: point
  15. };
  16. return feature;
  17. }
  18. function getFeature(call, callback) {
  19. callback(null, checkFeature(call.request));
  20. }

getFeature 方法传入一个 RPC 的 call 对象,call 对象包含一个 Point 属性。而且还有一个 callback 函数作为参数,它会返回 Feature 对象。

在 checkFeature 方法中我们根据给出的 Point 去对应的填充 Feature,并将其传给 callback 函数,其中 callback 函数第一个参数为 null,表示没有错误。

服务器端流式 RPC:listFeatures

现在让我们看看稍微复杂点的东西 —— 流式 RPC。 listFeatures 是一个服务器端流式 RPC,所以我们需要发回多个 Feature 给客户端。

  1. function listFeatures(call) {
  2. var lo = call.request.lo;
  3. var hi = call.request.hi;
  4. var left = _.min([lo.longitude, hi.longitude]);
  5. var right = _.max([lo.longitude, hi.longitude]);
  6. var top = _.max([lo.latitude, hi.latitude]);
  7. var bottom = _.min([lo.latitude, hi.latitude]);
  8. // For each feature, check if it is in the given bounding box
  9. _.each(feature_list, function(feature) {
  10. if (feature.name === '') {
  11. return;
  12. }
  13. if (feature.location.longitude >= left &&
  14. feature.location.longitude <= right &&
  15. feature.location.latitude >= bottom &&
  16. feature.location.latitude <= top) {
  17. call.write(feature);
  18. }
  19. });
  20. call.end();
  21. }

这次,不再是传入 call 对象和 callback 函数作为参数,而是只传入一个 call 对象,它实现了 Writable 接口。
在 listFeatures 方法中,我们遍历 feature_list 数组,根据客户端传入的坐标范围,返回满足条件的 feature,将 feature 使用 write() 方法写入 call 中。最后,我们调用 call.end() 表示我们已经完成了所有消息的发送。

客户端流式 RPC:recordRoute

如果你看过客户端流方法 recordRoute,你会发现它很类似,除了这次 call 参数实现了 Reader 的接口。 每次有新数据的时候,call 的 data 事件被触发,每次数据读取完成时,end 事件被触发。和一元的场景一样,我们通过调用 callback 函数来应答:

  1. call.on('data', function(point) {
  2. // Process user data
  3. });
  4. call.on('end', function() {
  5. callback(null, result);
  6. });

双向流式 RPC:routeChat

  1. function routeChat(call) {
  2. call.on('data', function(note) {
  3. var key = pointKey(note.location);
  4. /* For each note sent, respond with all previous notes that correspond to
  5. * the same point */
  6. if (route_notes.hasOwnProperty(key)) {
  7. _.each(route_notes[key], function(note) {
  8. call.write(note);
  9. });
  10. } else {
  11. route_notes[key] = [];
  12. }
  13. // Then add the new note to the list
  14. route_notes[key].push(JSON.parse(JSON.stringify(note)));
  15. });
  16. call.on('end', function() {
  17. call.end();
  18. });
  19. }

这次我们得到的是一个实现了 Duplex 的 call 对象,可以用来读 和 写消息。这里读写的语法和我们客户端流以及服务器流方法是一样的。虽然每一端获取对方信息的顺序和写入的顺序一致,客户端和服务器都可以以任意顺序读写——流的操作是完全独立的。

2.3.2 启动服务器

一旦我们实现了所有的方法,我们还需要启动一个gRPC服务器,这样客户端才可以使用服务。下面这段代码展示了在我们 RouteGuide 服务中实现的过程:

  1. function getServer() {
  2. var server = new grpc.Server();
  3. server.addService(routeguide.RouteGuide.service, {
  4. getFeature: getFeature,
  5. listFeatures: listFeatures,
  6. recordRoute: recordRoute,
  7. routeChat: routeChat
  8. });
  9. return server;
  10. }
  11. var routeServer = getServer();
  12. routeServer.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
  13. routeServer.start();
  14. });

如你所见,我们通过下面的步骤去构建和启动服务器:

  • 通过 RouteGuide 服务描述符创建一个 Server 构造函数。
  • 实现服务的方法。
  • 通过调用 Server 的构造函数以及方法实现去创建一个服务器的实例。
  • 用实例的 bind() 方法指定地址以及我们期望客户端请求监听的端口。
  • 调用实例的 listen() 方法启动一个RPC服务器。

2.4 创建客户端

在这部分,我们将尝试为 RouteGuide 服务创建一个 Node.js 的客户端。

  • 完整的 route_guide_client.js ```javascript /
    • Copyright 2015 gRPC authors. *
    • Licensed under the Apache License, Version 2.0 (the “License”);
    • you may not use this file except in compliance with the License.
    • You may obtain a copy of the License at *
    • http://www.apache.org/licenses/LICENSE-2.0 *
    • Unless required by applicable law or agreed to in writing, software
    • distributed under the License is distributed on an “AS IS” BASIS,
    • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    • See the License for the specific language governing permissions and
    • limitations under the License. /

var PROTO_PATH = __dirname + ‘/../../../protos/route_guide.proto’;

var async = require(‘async’); var fs = require(‘fs’); var parseArgs = require(‘minimist’); var path = require(‘path’); var _ = require(‘lodash’); var grpc = require(‘@grpc/grpc-js’); var protoLoader = require(‘@grpc/proto-loader’); var packageDefinition = protoLoader.loadSync( PROTO_PATH, {keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); var routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide; var client = new routeguide.RouteGuide(‘localhost:50051’, grpc.credentials.createInsecure());

var COORD_FACTOR = 1e7;

/**

  • Run the getFeature demo. Calls getFeature with a point known to have a
  • feature and a point known not to have a feature.
  • @param {function} callback Called when this demo is complete */ function runGetFeature(callback) { var next = _.after(2, callback); function featureCallback(error, feature) { if (error) { callback(error); return; } if (feature.name === ‘’) { console.log(‘Found no feature at ‘ +
    1. feature.location.latitude/COORD_FACTOR + ', ' +
    2. feature.location.longitude/COORD_FACTOR);
    } else { console.log(‘Found feature called “‘ + feature.name + ‘“ at ‘ +
    1. feature.location.latitude/COORD_FACTOR + ', ' +
    2. feature.location.longitude/COORD_FACTOR);
    } next(); } var point1 = { latitude: 409146138, longitude: -746188906 }; var point2 = { latitude: 0, longitude: 0 }; client.getFeature(point1, featureCallback); client.getFeature(point2, featureCallback); }

/**

  • Run the listFeatures demo. Calls listFeatures with a rectangle containing all
  • of the features in the pre-generated database. Prints each response as it
  • comes in.
  • @param {function} callback Called when this demo is complete */ function runListFeatures(callback) { var rectangle = { lo: { latitude: 400000000, longitude: -750000000 }, hi: { latitude: 420000000, longitude: -730000000 } }; console.log(‘Looking for features between 40, -75 and 42, -73’); var call = client.listFeatures(rectangle); call.on(‘data’, function(feature) { console.log(‘Found feature called “‘ + feature.name + ‘“ at ‘ +
    1. feature.location.latitude/COORD_FACTOR + ', ' +
    2. feature.location.longitude/COORD_FACTOR);
    }); call.on(‘end’, callback); }

/**

  • Run the recordRoute demo. Sends several randomly chosen points from the
  • pre-generated feature database with a variable delay in between. Prints the
  • statistics when they are sent from the server.
  • @param {function} callback Called when this demo is complete */ function runRecordRoute(callback) { var argv = parseArgs(process.argv, { string: ‘db_path’ }); fs.readFile(path.resolve(argv.db_path), function(err, data) { if (err) { callback(err); return; } var feature_list = JSON.parse(data);

    var num_points = 10; var call = client.recordRoute(function(error, stats) { if (error) {

    1. callback(error);
    2. return;

    } console.log(‘Finished trip with’, stats.point_count, ‘points’); console.log(‘Passed’, stats.feature_count, ‘features’); console.log(‘Travelled’, stats.distance, ‘meters’); console.log(‘It took’, stats.elapsed_time, ‘seconds’); callback(); }); /**

    • Constructs a function that asynchronously sends the given point and then
    • delays sending its callback
    • @param {number} lat The latitude to send
    • @param {number} lng The longitude to send
    • @return {function(function)} The function that sends the point / function pointSender(lat, lng) { /*
      • Sends the point, then calls the callback after a delay
      • @param {function} callback Called when complete */ return function(callback) { console.log(‘Visiting point ‘ + lat/COORDFACTOR + ‘, ‘ + lng/COORD_FACTOR); call.write({ latitude: lat, longitude: lng }); .delay(callback, .random(500, 1500)); }; } var point_senders = []; for (var i = 0; i < num_points; i++) { var rand_point = feature_list[.random(0, feature_list.length - 1)]; point_senders[i] = pointSender(rand_point.location.latitude,
        1. rand_point.location.longitude);
        } async.series(point_senders, function() { call.end(); }); }); }

/**

  • Run the routeChat demo. Send some chat messages, and print any chat messages
  • that are sent from the server.
  • @param {function} callback Called when the demo is complete */ function runRouteChat(callback) { var call = client.routeChat(); call.on(‘data’, function(note) { console.log(‘Got message “‘ + note.message + ‘“ at ‘ +

    1. note.location.latitude + ', ' + note.location.longitude);

    });

    call.on(‘end’, callback);

    var notes = [{ location: { latitude: 0, longitude: 0 }, message: ‘First message’ }, { location: { latitude: 0, longitude: 1 }, message: ‘Second message’ }, { location: { latitude: 1, longitude: 0 }, message: ‘Third message’ }, { location: { latitude: 0, longitude: 0 }, message: ‘Fourth message’ }]; for (var i = 0; i < notes.length; i++) { var note = notes[i]; console.log(‘Sending message “‘ + note.message + ‘“ at ‘ +

    1. note.location.latitude + ', ' + note.location.longitude);

    call.write(note); } call.end(); }

/**

  • Run all of the demos in order */ function main() { async.series([ runGetFeature, runListFeatures, runRecordRoute, runRouteChat ]); }

if (require.main === module) { main(); }

exports.runGetFeature = runGetFeature;

exports.runListFeatures = runListFeatures;

exports.runRecordRoute = runRecordRoute;

exports.runRouteChat = runRouteChat;

  1. <a name="ufexo"></a>
  2. ### 2.4.1 创建存根
  3. 为了能调用服务的方法,我们得先创建一个 存根。要做到这点,我们只需要调用 RouteGuide 的存根构造函数,指定服务器地址和端口。
  4. ```javascript
  5. new routeguide.RouteGuide('localhost:50051', grpc.credentials.createInsecure());

2.4.2 调用服务的方法

现在我们来看看如何调用服务的方法。注意这些方法都是异步的:他们使用事件或者 callback 去获得结果。

简单类型 RPC:getFeature

调用简单 RPC 的 getFeature 几乎是和调用一个本地的异步方法一样简单。

  1. var point = {latitude: 409146138, longitude: -746188906};
  2. stub.getFeature(point, function(err, feature) {
  3. if (err) {
  4. // process error
  5. } else {
  6. // process feature
  7. }
  8. });

如你所见,我们创建并且填充了一个请求对象。最后我们调用了存根上的方法,传入请求和回调函数。如果没有错误,就可以从我们的服务器从应答对象读取应答信息。

  1. console.log('Found feature called "' + feature.name + '" at ' +
  2. feature.location.latitude/COORD_FACTOR + ', ' +
  3. feature.location.longitude/COORD_FACTOR);

流式 RPC

现在来看看我们的流方法。如果你已经读过【创建服务器】,本节的一些内容看上去很熟悉——流式 RPC 是在客户端和服务器两端以一种类似的方式实现的。

  • 下面就是我们称作是服务器端的流方法 listFeatures,它会返回地理的 Feature:

    1. var call = client.listFeatures(rectangle);
    2. call.on('data', function(feature) {
    3. console.log('Found feature called "' + feature.name + '" at ' +
    4. feature.location.latitude/COORD_FACTOR + ', ' +
    5. feature.location.longitude/COORD_FACTOR);
    6. });
    7. call.on('end', function() {
    8. // The server has finished sending
    9. });
    10. call.on('error', function(e) {
    11. // An error has occurred and the stream has been closed.
    12. });
    13. call.on('status', function(status) {
    14. // process status
    15. });

    我们传给它一个请求并拿回一个 Readable 流对象,而不是给方法传入请求和回调函数。客户端可以使用 Readable 的 ‘data’ 事件去读取服务器的应答。这个事件由每个 Feature 消息对象触发,知道没有更多的消息:’end’ 事件揭示调用已经结束。最后,当服务器发送状态时,触发状态事件。

  • 客户端的流方法 RecordRoute 的使用很相似,除了我们将一个回调函数传给方法,拿到一个 Writable 返回。

    1. var call = client.recordRoute(function(error, stats) {
    2. if (error) {
    3. callback(error);
    4. }
    5. console.log('Finished trip with', stats.point_count, 'points');
    6. console.log('Passed', stats.feature_count, 'features');
    7. console.log('Travelled', stats.distance, 'meters');
    8. console.log('It took', stats.elapsed_time, 'seconds');
    9. });
    10. function pointSender(lat, lng) {
    11. return function(callback) {
    12. console.log('Visiting point ' + lat/COORD_FACTOR + ', ' +
    13. lng/COORD_FACTOR);
    14. call.write({
    15. latitude: lat,
    16. longitude: lng
    17. });
    18. _.delay(callback, _.random(500, 1500));
    19. };
    20. }
    21. var point_senders = [];
    22. for (var i = 0; i < num_points; i++) {
    23. var rand_point = feature_list[_.random(0, feature_list.length - 1)];
    24. point_senders[i] = pointSender(rand_point.location.latitude,
    25. rand_point.location.longitude);
    26. }
    27. async.series(point_senders, function() {
    28. call.end();
    29. });

    一旦我们用 write() 将客户端请求写入到流的动作完成,我们需要在流上调用 end() 通知 gRPC 我们已经完成写。如果状态是 OK,stats 对象会跟着服务器的响应被填充。

  • 最后,让我们看看双向流式 RPC routeChat()。在这种场景下,我们将上下文传给一个方法,拿到一个可以用来读写消息的 Duplex 流对象的返回。

    1. var call = client.routeChat();

    这里读写的语法和我们客户端流以及服务器端流方法没有任何区别。虽然每一方都能按照写入时的顺序拿到另一方的消息,客户端和服务器端都可以以任意顺序读写——流操作起来是完全独立的。

2.5 运行测试

  1. $ npm install
  2. # 运行 server
  3. $ node ./dynamic_codegen/route_guide/route_guide_server.js --db_path=./dynamic_codegen/route_guide/route_guide_db.json
  4. # 在新的终端中,运行 client
  5. $ node ./dynamic_codegen/route_guide/route_guide_client.js --db_path=./dynamic_codegen/route_guide/route_guide_db.json
  6. # client 输出结果:
  7. ...
  8. Found feature called "261 Van Sickle Road, Goshen, NY 10924, USA" at 41.3069058, -74.4597778
  9. ...
  10. Visiting point 40.1012643, -74.4035134
  11. Finished trip with 10 points
  12. Passed 6 features
  13. Travelled 638865 meters
  14. It took 10 seconds
  15. Sending message "First message" at 0, 0
  16. Sending message "Second message" at 0, 1
  17. Sending message "Third message" at 1, 0
  18. Sending message "Fourth message" at 0, 0
  19. Got message "First message" at 0, 0