gRPC 基本概念介绍:《微服务通信:gRPC 介绍》
gRPC 的使用通常包括如下几个步骤:
- 通过 protobuf 来定义接口和数据类型
- 编写 gRPC server 端代码
- 编写 gRPC client 端代码
本文通过一个实例来详细讲解上述的三步。
1. Quick start 示例
1.1 初始化示例项目
$ npm init --yes# 向 package.json 文件添加 grpc 依赖"dependencies": {"@grpc/proto-loader": "^0.5.0","async": "^1.5.2","google-protobuf": "^3.0.0","@grpc/grpc-js": "^1.1.0","lodash": "^4.6.1","minimist": "^1.2.0"}$ npm inistall
1.2 编写 helloworld.proto 文件来定义接口和数据类型
syntax = "proto3";option java_multiple_files = true;option java_package = "io.grpc.examples.helloworld";option java_outer_classname = "HelloWorldProto";option objc_class_prefix = "HLW";package helloworld;// The greeting service definition.service Greeter {// Sends a greetingrpc SayHello (HelloRequest) returns (HelloReply) {}// Sends another greetingrpc SayHelloAgain (HelloRequest) returns (HelloReply) {}}// The request message containing the user's name.message HelloRequest {string name = 1;}// The response message containing the greetingsmessage HelloReply {string message = 1;}
1.3 编写 server 端代码 greeter_server.js
var PROTO_PATH = __dirname + '/helloworld.proto';var grpc = require('@grpc/grpc-js');var protoLoader = require('@grpc/proto-loader');var packageDefinition = protoLoader.loadSync(PROTO_PATH,{keepCase: true,longs: String,enums: String,defaults: true,oneofs: true});var hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld;/*** Implements the SayHello RPC method.*/function sayHello(call, callback) {callback(null, {message: 'Hello ' + call.request.name});}function sayHelloAgain(call, callback) {callback(null, {message: 'Hello again, ' + call.request.name});}/*** Starts an RPC server that receives requests for the Greeter service at the* sample server port*/function main() {var server = new grpc.Server();server.addService(hello_proto.Greeter.service,{sayHello: sayHello, sayHelloAgain: sayHelloAgain});server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {server.start();});}main();
1.4 编写 client 端代码 greeter_client.js
var PROTO_PATH = __dirname + '/helloworld.proto';var parseArgs = require('minimist');var grpc = require('@grpc/grpc-js');var protoLoader = require('@grpc/proto-loader');var packageDefinition = protoLoader.loadSync(PROTO_PATH,{keepCase: true,longs: String,enums: String,defaults: true,oneofs: true});var hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld;function main() {var argv = parseArgs(process.argv.slice(2), {string: 'target'});var target;if (argv.target) {target = argv.target;} else {target = 'localhost:50051';}var client = new hello_proto.Greeter(target,grpc.credentials.createInsecure());var user;if (argv._.length > 0) {user = argv._[0];} else {user = 'world';}client.sayHello({name: user}, function(err, response) {console.log('Greeting:', response.message);});client.sayHelloAgain({name: 'you'}, function(err, response) {console.log('Greeting:', response.message);});}main();
1.5 运行测试
$ node greeter_server.js# 为 client 新建一个终端,运行以下命令$ node greeter_client.js# client 端输出结果Greeting: Hello worldGreeting: Hello again, you
2. Basics tutorial 示例
学习内容:
- 在一个 .proto 文件内定义服务。
- 用 protocol buffer 编译器生成服务器和客户端代码。
- 使用 gRPC 的 Node.js API 为你的服务实现一个简单的客户端和服务器。
2.1 编写 route_guide.proto 服务定义
要定义一个服务,你必须在你的 .proto 文件中指定 service:
service RouteGuide {...}
然后在你的服务中定义 rpc 方法,指定请求的和响应类型。gRPC 允许你定义4种类型的 service 方法,在 RouteGuide 服务中都有使用:
一个 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
// Obtains the feature at a given position.rpc GetFeature(Point) returns (Feature) {}
一个 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
// Obtains the Features available within the given Rectangle. Results are// streamed rather than returned at once (e.g. in a response message with a// repeated field), as the rectangle may cover a large area and contain a// huge number of features.rpc ListFeatures(Rectangle) returns (stream Feature) {}
一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
// Accepts a stream of Points on a route being traversed, returning a// RouteSummary when traversal is completed.rpc RecordRoute(stream Point) returns (RouteSummary) {}
一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
// Accepts a stream of RouteNotes sent while a route is being traversed,// while receiving other RouteNotes (e.g. from other users).rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我们的 .proto 文件也包含了所有请求的 protocol buffer 消息类型定义以及在服务方法中使用的响应类型——比如,下面的 Point 消息类型:
// Points are represented as latitude-longitude pairs in the E7 representation// (degrees multiplied by 10**7 and rounded to the nearest integer).// Latitudes should be in the range +/- 90 degrees and longitude should be in// the range +/- 180 degrees (inclusive).message Point {int32 latitude = 1;int32 longitude = 2;}
2.2 从 .proto 文件加载服务描述符
Node.js 的类库在运行时加载 .proto 中的客户端存根并动态生成服务描述符。
要加载一个 .proto 文件,只需要 require gRPC proto loader 类库,然后使用它的 loadSync() 方法。最后将加载的内容传递给 gRPC 类库的 loadPackageDefinition() 方法:
var PROTO_PATH = __dirname + '/route_guide.proto';var grpc = require('@grpc/grpc-js');var protoLoader = require('@grpc/proto-loader');// Suggested options for similarity to existing grpc.load behaviorvar packageDefinition = protoLoader.loadSync(PROTO_PATH,{keepCase: true,longs: String,enums: String,defaults: true,oneofs: true});var protoDescriptor = grpc.loadPackageDefinition(packageDefinition);// The protoDescriptor object has the full package hierarchyvar routeguide = protoDescriptor.routeguide;
一旦你完成这个,存根构造函数是在 routeguide 命名空间(protoDescriptor.routeguide.RouteGuide)中而服务描述符(用来创建服务器)是存根(protoDescriptor.routeguide.RouteGuide.service)的一个属性。
- 完整的 route_guide.proto 文件 ```protobuf syntax = “proto3”;
option java_multiple_files = true; option java_package = “io.grpc.examples.routeguide”; option java_outer_classname = “RouteGuideProto”; option objc_class_prefix = “RTG”;
package routeguide;
// Interface exported by the server. service RouteGuide { // A simple RPC. // // Obtains the feature at a given position. // // A feature with an empty name is returned if there’s no feature at the given // position. rpc GetFeature(Point) returns (Feature) {}
// A server-to-client streaming RPC. // // Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
// A client-to-server streaming RPC. // // Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
// A Bidirectional streaming RPC. // // Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} }
// Points are represented as latitude-longitude pairs in the E7 representation // (degrees multiplied by 10**7 and rounded to the nearest integer). // Latitudes should be in the range +/- 90 degrees and longitude should be in // the range +/- 180 degrees (inclusive). message Point { int32 latitude = 1; int32 longitude = 2; }
// A latitude-longitude rectangle, represented as two diagonally opposite // points “lo” and “hi”. message Rectangle { // One corner of the rectangle. Point lo = 1;
// The other corner of the rectangle. Point hi = 2; }
// A feature names something at a given point. // // If a feature could not be named, the name is empty. message Feature { // The name of the feature. string name = 1;
// The point where the feature is detected. Point location = 2; }
// A RouteNote is a message sent while at a given point. message RouteNote { // The location from which the message is sent. Point location = 1;
// The message to be sent. string message = 2; }
// A RouteSummary is received in response to a RecordRoute rpc. // // It contains the number of individual points received, the number of // detected features, and the total distance covered as the cumulative sum of // the distance between each point. message RouteSummary { // The number of points received. int32 point_count = 1;
// The number of known features passed while traversing the route. int32 feature_count = 2;
// The distance covered in metres. int32 distance = 3;
// The duration of the traversal in seconds. int32 elapsed_time = 4; }
<a name="4PFZR"></a>## 2.3 创建服务器首先来看看我们如何创建一个 RouteGuide 服务器。让 RouteGuide 服务运作起来需要有两个部分支持:- 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”。- 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。- 完整的 **route_guide_server.js**,看看它是如何工作的。```javascriptvar PROTO_PATH = __dirname + '/../../../protos/route_guide.proto';var fs = require('fs');var parseArgs = require('minimist');var path = require('path');var _ = require('lodash');var grpc = require('@grpc/grpc-js');var protoLoader = require('@grpc/proto-loader');var packageDefinition = protoLoader.loadSync(PROTO_PATH,{keepCase: true,longs: String,enums: String,defaults: true,oneofs: true});var routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide;var COORD_FACTOR = 1e7;/*** For simplicity, a point is a record type that looks like* {latitude: number, longitude: number}, and a feature is a record type that* looks like {name: string, location: point}. feature objects with name===''* are points with no feature.*//*** List of feature objects at points that have been requested so far.*/var feature_list = [];/*** Get a feature object at the given point, or creates one if it does not exist.* @param {point} point The point to check* @return {feature} The feature object at the point. Note that an empty name* indicates no feature*/function checkFeature(point) {var feature;// Check if there is already a feature object for the given pointfor (var i = 0; i < feature_list.length; i++) {feature = feature_list[i];if (feature.location.latitude === point.latitude &&feature.location.longitude === point.longitude) {return feature;}}var name = '';feature = {name: name,location: point};return feature;}/*** getFeature request handler. Gets a request with a point, and responds with a* feature object indicating whether there is a feature at that point.* @param {EventEmitter} call Call object for the handler to process* @param {function(Error, feature)} callback Response callback*/function getFeature(call, callback) {callback(null, checkFeature(call.request));}/*** listFeatures request handler. Gets a request with two points, and responds* with a stream of all features in the bounding box defined by those points.* @param {Writable} call Writable stream for responses with an additional* request property for the request value.*/function listFeatures(call) {var lo = call.request.lo;var hi = call.request.hi;var left = _.min([lo.longitude, hi.longitude]);var right = _.max([lo.longitude, hi.longitude]);var top = _.max([lo.latitude, hi.latitude]);var bottom = _.min([lo.latitude, hi.latitude]);// For each feature, check if it is in the given bounding box_.each(feature_list, function(feature) {if (feature.name === '') {return;}if (feature.location.longitude >= left &&feature.location.longitude <= right &&feature.location.latitude >= bottom &&feature.location.latitude <= top) {call.write(feature);}});call.end();}/*** Calculate the distance between two points using the "haversine" formula.* The formula is based on http://mathforum.org/library/drmath/view/51879.html.* @param start The starting point* @param end The end point* @return The distance between the points in meters*/function getDistance(start, end) {function toRadians(num) {return num * Math.PI / 180;}var R = 6371000; // earth radius in metresvar lat1 = toRadians(start.latitude / COORD_FACTOR);var lat2 = toRadians(end.latitude / COORD_FACTOR);var lon1 = toRadians(start.longitude / COORD_FACTOR);var lon2 = toRadians(end.longitude / COORD_FACTOR);var deltalat = lat2-lat1;var deltalon = lon2-lon1;var a = Math.sin(deltalat/2) * Math.sin(deltalat/2) +Math.cos(lat1) * Math.cos(lat2) *Math.sin(deltalon/2) * Math.sin(deltalon/2);var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));return R * c;}/*** recordRoute handler. Gets a stream of points, and responds with statistics* about the "trip": number of points, number of known features visited, total* distance traveled, and total time spent.* @param {Readable} call The request point stream.* @param {function(Error, routeSummary)} callback The callback to pass the* response to*/function recordRoute(call, callback) {var point_count = 0;var feature_count = 0;var distance = 0;var previous = null;// Start a timervar start_time = process.hrtime();call.on('data', function(point) {point_count += 1;if (checkFeature(point).name !== '') {feature_count += 1;}/* For each point after the first, add the incremental distance from the* previous point to the total distance value */if (previous != null) {distance += getDistance(previous, point);}previous = point;});call.on('end', function() {callback(null, {point_count: point_count,feature_count: feature_count,// Cast the distance to an integerdistance: distance|0,// End the timerelapsed_time: process.hrtime(start_time)[0]});});}var route_notes = {};/*** Turn the point into a dictionary key.* @param {point} point The point to use* @return {string} The key for an object*/function pointKey(point) {return point.latitude + ' ' + point.longitude;}/*** routeChat handler. Receives a stream of message/location pairs, and responds* with a stream of all previous messages at each of those locations.* @param {Duplex} call The stream for incoming and outgoing messages*/function routeChat(call) {call.on('data', function(note) {var key = pointKey(note.location);/* For each note sent, respond with all previous notes that correspond to* the same point */if (route_notes.hasOwnProperty(key)) {_.each(route_notes[key], function(note) {call.write(note);});} else {route_notes[key] = [];}// Then add the new note to the listroute_notes[key].push(JSON.parse(JSON.stringify(note)));});call.on('end', function() {call.end();});}/*** Get a new server with the handler functions in this file bound to the methods* it serves.* @return {Server} The new server object*/function getServer() {var server = new grpc.Server();server.addService(routeguide.RouteGuide.service, {getFeature: getFeature,listFeatures: listFeatures,recordRoute: recordRoute,routeChat: routeChat});return server;}if (require.main === module) {// If this is run as a script, start a server on an unused portvar routeServer = getServer();routeServer.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {var argv = parseArgs(process.argv, {string: 'db_path'});fs.readFile(path.resolve(argv.db_path), function(err, data) {if (err) throw err;feature_list = JSON.parse(data);routeServer.start();});});}exports.getServer = getServer;
2.3.1 实现 RouteGuide
可以看出,我们的服务器有一个从 RouteGuide.service 描述符对象生成的 Server 构造函数:
var Server = new grpc.Server();
在这个场景下,我们实现了 异步 版本的 RouteGuide,它提供了 gRPC 缺省的行为。
route_guide_server.js 中的函数实现了所有的服务方法。
最简单的类型 RPC:getFeature
首先让我们看看最简单的类型 getFeature,它从客户端拿到一个 Point 对象,然后返回包含从数据库拿到的 feature 信息的 Feature。
function checkFeature(point) {var feature;// Check if there is already a feature object for the given pointfor (var i = 0; i < feature_list.length; i++) {feature = feature_list[i];if (feature.location.latitude === point.latitude &&feature.location.longitude === point.longitude) {return feature;}}var name = '';feature = {name: name,location: point};return feature;}function getFeature(call, callback) {callback(null, checkFeature(call.request));}
getFeature 方法传入一个 RPC 的 call 对象,call 对象包含一个 Point 属性。而且还有一个 callback 函数作为参数,它会返回 Feature 对象。
在 checkFeature 方法中我们根据给出的 Point 去对应的填充 Feature,并将其传给 callback 函数,其中 callback 函数第一个参数为 null,表示没有错误。
服务器端流式 RPC:listFeatures
现在让我们看看稍微复杂点的东西 —— 流式 RPC。 listFeatures 是一个服务器端流式 RPC,所以我们需要发回多个 Feature 给客户端。
function listFeatures(call) {var lo = call.request.lo;var hi = call.request.hi;var left = _.min([lo.longitude, hi.longitude]);var right = _.max([lo.longitude, hi.longitude]);var top = _.max([lo.latitude, hi.latitude]);var bottom = _.min([lo.latitude, hi.latitude]);// For each feature, check if it is in the given bounding box_.each(feature_list, function(feature) {if (feature.name === '') {return;}if (feature.location.longitude >= left &&feature.location.longitude <= right &&feature.location.latitude >= bottom &&feature.location.latitude <= top) {call.write(feature);}});call.end();}
这次,不再是传入 call 对象和 callback 函数作为参数,而是只传入一个 call 对象,它实现了 Writable 接口。
在 listFeatures 方法中,我们遍历 feature_list 数组,根据客户端传入的坐标范围,返回满足条件的 feature,将 feature 使用 write() 方法写入 call 中。最后,我们调用 call.end() 表示我们已经完成了所有消息的发送。
客户端流式 RPC:recordRoute
如果你看过客户端流方法 recordRoute,你会发现它很类似,除了这次 call 参数实现了 Reader 的接口。 每次有新数据的时候,call 的 data 事件被触发,每次数据读取完成时,end 事件被触发。和一元的场景一样,我们通过调用 callback 函数来应答:
call.on('data', function(point) {// Process user data});call.on('end', function() {callback(null, result);});
双向流式 RPC:routeChat
function routeChat(call) {call.on('data', function(note) {var key = pointKey(note.location);/* For each note sent, respond with all previous notes that correspond to* the same point */if (route_notes.hasOwnProperty(key)) {_.each(route_notes[key], function(note) {call.write(note);});} else {route_notes[key] = [];}// Then add the new note to the listroute_notes[key].push(JSON.parse(JSON.stringify(note)));});call.on('end', function() {call.end();});}
这次我们得到的是一个实现了 Duplex 的 call 对象,可以用来读 和 写消息。这里读写的语法和我们客户端流以及服务器流方法是一样的。虽然每一端获取对方信息的顺序和写入的顺序一致,客户端和服务器都可以以任意顺序读写——流的操作是完全独立的。
2.3.2 启动服务器
一旦我们实现了所有的方法,我们还需要启动一个gRPC服务器,这样客户端才可以使用服务。下面这段代码展示了在我们 RouteGuide 服务中实现的过程:
function getServer() {var server = new grpc.Server();server.addService(routeguide.RouteGuide.service, {getFeature: getFeature,listFeatures: listFeatures,recordRoute: recordRoute,routeChat: routeChat});return server;}var routeServer = getServer();routeServer.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {routeServer.start();});
如你所见,我们通过下面的步骤去构建和启动服务器:
- 通过 RouteGuide 服务描述符创建一个 Server 构造函数。
- 实现服务的方法。
- 通过调用 Server 的构造函数以及方法实现去创建一个服务器的实例。
- 用实例的 bind() 方法指定地址以及我们期望客户端请求监听的端口。
- 调用实例的 listen() 方法启动一个RPC服务器。
2.4 创建客户端
在这部分,我们将尝试为 RouteGuide 服务创建一个 Node.js 的客户端。
- 完整的 route_guide_client.js
```javascript
/
- Copyright 2015 gRPC authors. *
- Licensed under the Apache License, Version 2.0 (the “License”);
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at *
- http://www.apache.org/licenses/LICENSE-2.0 *
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an “AS IS” BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. /
var PROTO_PATH = __dirname + ‘/../../../protos/route_guide.proto’;
var async = require(‘async’); var fs = require(‘fs’); var parseArgs = require(‘minimist’); var path = require(‘path’); var _ = require(‘lodash’); var grpc = require(‘@grpc/grpc-js’); var protoLoader = require(‘@grpc/proto-loader’); var packageDefinition = protoLoader.loadSync( PROTO_PATH, {keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); var routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide; var client = new routeguide.RouteGuide(‘localhost:50051’, grpc.credentials.createInsecure());
var COORD_FACTOR = 1e7;
/**
- Run the getFeature demo. Calls getFeature with a point known to have a
- feature and a point known not to have a feature.
- @param {function} callback Called when this demo is complete
*/
function runGetFeature(callback) {
var next = _.after(2, callback);
function featureCallback(error, feature) {
if (error) {
callback(error);
return;
}
if (feature.name === ‘’) {
console.log(‘Found no feature at ‘ +
} else { console.log(‘Found feature called “‘ + feature.name + ‘“ at ‘ +feature.location.latitude/COORD_FACTOR + ', ' +feature.location.longitude/COORD_FACTOR);
} next(); } var point1 = { latitude: 409146138, longitude: -746188906 }; var point2 = { latitude: 0, longitude: 0 }; client.getFeature(point1, featureCallback); client.getFeature(point2, featureCallback); }feature.location.latitude/COORD_FACTOR + ', ' +feature.location.longitude/COORD_FACTOR);
/**
- Run the listFeatures demo. Calls listFeatures with a rectangle containing all
- of the features in the pre-generated database. Prints each response as it
- comes in.
- @param {function} callback Called when this demo is complete
*/
function runListFeatures(callback) {
var rectangle = {
lo: {
latitude: 400000000,
longitude: -750000000
},
hi: {
latitude: 420000000,
longitude: -730000000
}
};
console.log(‘Looking for features between 40, -75 and 42, -73’);
var call = client.listFeatures(rectangle);
call.on(‘data’, function(feature) {
console.log(‘Found feature called “‘ + feature.name + ‘“ at ‘ +
}); call.on(‘end’, callback); }feature.location.latitude/COORD_FACTOR + ', ' +feature.location.longitude/COORD_FACTOR);
/**
- Run the recordRoute demo. Sends several randomly chosen points from the
- pre-generated feature database with a variable delay in between. Prints the
- statistics when they are sent from the server.
@param {function} callback Called when this demo is complete */ function runRecordRoute(callback) { var argv = parseArgs(process.argv, { string: ‘db_path’ }); fs.readFile(path.resolve(argv.db_path), function(err, data) { if (err) { callback(err); return; } var feature_list = JSON.parse(data);
var num_points = 10; var call = client.recordRoute(function(error, stats) { if (error) {
callback(error);return;
} console.log(‘Finished trip with’, stats.point_count, ‘points’); console.log(‘Passed’, stats.feature_count, ‘features’); console.log(‘Travelled’, stats.distance, ‘meters’); console.log(‘It took’, stats.elapsed_time, ‘seconds’); callback(); }); /**
- Constructs a function that asynchronously sends the given point and then
- delays sending its callback
- @param {number} lat The latitude to send
- @param {number} lng The longitude to send
- @return {function(function)} The function that sends the point
/
function pointSender(lat, lng) {
/*
- Sends the point, then calls the callback after a delay
- @param {function} callback Called when complete
*/
return function(callback) {
console.log(‘Visiting point ‘ + lat/COORDFACTOR + ‘, ‘ +
lng/COORD_FACTOR);
call.write({
latitude: lat,
longitude: lng
});
.delay(callback, .random(500, 1500));
};
}
var point_senders = [];
for (var i = 0; i < num_points; i++) {
var rand_point = feature_list[.random(0, feature_list.length - 1)];
point_senders[i] = pointSender(rand_point.location.latitude,
} async.series(point_senders, function() { call.end(); }); }); }rand_point.location.longitude);
/**
- Run the routeChat demo. Send some chat messages, and print any chat messages
- that are sent from the server.
@param {function} callback Called when the demo is complete */ function runRouteChat(callback) { var call = client.routeChat(); call.on(‘data’, function(note) { console.log(‘Got message “‘ + note.message + ‘“ at ‘ +
note.location.latitude + ', ' + note.location.longitude);
});
call.on(‘end’, callback);
var notes = [{ location: { latitude: 0, longitude: 0 }, message: ‘First message’ }, { location: { latitude: 0, longitude: 1 }, message: ‘Second message’ }, { location: { latitude: 1, longitude: 0 }, message: ‘Third message’ }, { location: { latitude: 0, longitude: 0 }, message: ‘Fourth message’ }]; for (var i = 0; i < notes.length; i++) { var note = notes[i]; console.log(‘Sending message “‘ + note.message + ‘“ at ‘ +
note.location.latitude + ', ' + note.location.longitude);
call.write(note); } call.end(); }
/**
- Run all of the demos in order */ function main() { async.series([ runGetFeature, runListFeatures, runRecordRoute, runRouteChat ]); }
if (require.main === module) { main(); }
exports.runGetFeature = runGetFeature;
exports.runListFeatures = runListFeatures;
exports.runRecordRoute = runRecordRoute;
exports.runRouteChat = runRouteChat;
<a name="ufexo"></a>### 2.4.1 创建存根为了能调用服务的方法,我们得先创建一个 存根。要做到这点,我们只需要调用 RouteGuide 的存根构造函数,指定服务器地址和端口。```javascriptnew routeguide.RouteGuide('localhost:50051', grpc.credentials.createInsecure());
2.4.2 调用服务的方法
现在我们来看看如何调用服务的方法。注意这些方法都是异步的:他们使用事件或者 callback 去获得结果。
简单类型 RPC:getFeature
调用简单 RPC 的 getFeature 几乎是和调用一个本地的异步方法一样简单。
var point = {latitude: 409146138, longitude: -746188906};stub.getFeature(point, function(err, feature) {if (err) {// process error} else {// process feature}});
如你所见,我们创建并且填充了一个请求对象。最后我们调用了存根上的方法,传入请求和回调函数。如果没有错误,就可以从我们的服务器从应答对象读取应答信息。
console.log('Found feature called "' + feature.name + '" at ' +feature.location.latitude/COORD_FACTOR + ', ' +feature.location.longitude/COORD_FACTOR);
流式 RPC
现在来看看我们的流方法。如果你已经读过【创建服务器】,本节的一些内容看上去很熟悉——流式 RPC 是在客户端和服务器两端以一种类似的方式实现的。
下面就是我们称作是服务器端的流方法 listFeatures,它会返回地理的 Feature:
var call = client.listFeatures(rectangle);call.on('data', function(feature) {console.log('Found feature called "' + feature.name + '" at ' +feature.location.latitude/COORD_FACTOR + ', ' +feature.location.longitude/COORD_FACTOR);});call.on('end', function() {// The server has finished sending});call.on('error', function(e) {// An error has occurred and the stream has been closed.});call.on('status', function(status) {// process status});
我们传给它一个请求并拿回一个 Readable 流对象,而不是给方法传入请求和回调函数。客户端可以使用 Readable 的 ‘data’ 事件去读取服务器的应答。这个事件由每个 Feature 消息对象触发,知道没有更多的消息:’end’ 事件揭示调用已经结束。最后,当服务器发送状态时,触发状态事件。
客户端的流方法 RecordRoute 的使用很相似,除了我们将一个回调函数传给方法,拿到一个 Writable 返回。
var call = client.recordRoute(function(error, stats) {if (error) {callback(error);}console.log('Finished trip with', stats.point_count, 'points');console.log('Passed', stats.feature_count, 'features');console.log('Travelled', stats.distance, 'meters');console.log('It took', stats.elapsed_time, 'seconds');});function pointSender(lat, lng) {return function(callback) {console.log('Visiting point ' + lat/COORD_FACTOR + ', ' +lng/COORD_FACTOR);call.write({latitude: lat,longitude: lng});_.delay(callback, _.random(500, 1500));};}var point_senders = [];for (var i = 0; i < num_points; i++) {var rand_point = feature_list[_.random(0, feature_list.length - 1)];point_senders[i] = pointSender(rand_point.location.latitude,rand_point.location.longitude);}async.series(point_senders, function() {call.end();});
一旦我们用 write() 将客户端请求写入到流的动作完成,我们需要在流上调用 end() 通知 gRPC 我们已经完成写。如果状态是 OK,stats 对象会跟着服务器的响应被填充。
最后,让我们看看双向流式 RPC routeChat()。在这种场景下,我们将上下文传给一个方法,拿到一个可以用来读写消息的 Duplex 流对象的返回。
var call = client.routeChat();
这里读写的语法和我们客户端流以及服务器端流方法没有任何区别。虽然每一方都能按照写入时的顺序拿到另一方的消息,客户端和服务器端都可以以任意顺序读写——流操作起来是完全独立的。
2.5 运行测试
$ npm install# 运行 server$ node ./dynamic_codegen/route_guide/route_guide_server.js --db_path=./dynamic_codegen/route_guide/route_guide_db.json# 在新的终端中,运行 client$ node ./dynamic_codegen/route_guide/route_guide_client.js --db_path=./dynamic_codegen/route_guide/route_guide_db.json# client 输出结果:...Found feature called "261 Van Sickle Road, Goshen, NY 10924, USA" at 41.3069058, -74.4597778...Visiting point 40.1012643, -74.4035134Finished trip with 10 pointsPassed 6 featuresTravelled 638865 metersIt took 10 secondsSending message "First message" at 0, 0Sending message "Second message" at 0, 1Sending message "Third message" at 1, 0Sending message "Fourth message" at 0, 0Got message "First message" at 0, 0
