gRPC 基本概念介绍:《微服务通信:gRPC 介绍》
gRPC 的使用通常包括如下几个步骤:
- 通过 protobuf 来定义接口和数据类型
- 编写 gRPC server 端代码
- 编写 gRPC client 端代码
本文通过一个实例来详细讲解上述的三步。
1. Quick start 示例
1.1 初始化示例项目
$ npm init --yes
# 向 package.json 文件添加 grpc 依赖
"dependencies": {
"@grpc/proto-loader": "^0.5.0",
"async": "^1.5.2",
"google-protobuf": "^3.0.0",
"@grpc/grpc-js": "^1.1.0",
"lodash": "^4.6.1",
"minimist": "^1.2.0"
}
$ npm inistall
1.2 编写 helloworld.proto 文件来定义接口和数据类型
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
// Sends another greeting
rpc SayHelloAgain (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
1.3 编写 server 端代码 greeter_server.js
var PROTO_PATH = __dirname + '/helloworld.proto';
var grpc = require('@grpc/grpc-js');
var protoLoader = require('@grpc/proto-loader');
var packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
var hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld;
/**
* Implements the SayHello RPC method.
*/
function sayHello(call, callback) {
callback(null, {message: 'Hello ' + call.request.name});
}
function sayHelloAgain(call, callback) {
callback(null, {message: 'Hello again, ' + call.request.name});
}
/**
* Starts an RPC server that receives requests for the Greeter service at the
* sample server port
*/
function main() {
var server = new grpc.Server();
server.addService(hello_proto.Greeter.service,
{sayHello: sayHello, sayHelloAgain: sayHelloAgain});
server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
server.start();
});
}
main();
1.4 编写 client 端代码 greeter_client.js
var PROTO_PATH = __dirname + '/helloworld.proto';
var parseArgs = require('minimist');
var grpc = require('@grpc/grpc-js');
var protoLoader = require('@grpc/proto-loader');
var packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
var hello_proto = grpc.loadPackageDefinition(packageDefinition).helloworld;
function main() {
var argv = parseArgs(process.argv.slice(2), {
string: 'target'
});
var target;
if (argv.target) {
target = argv.target;
} else {
target = 'localhost:50051';
}
var client = new hello_proto.Greeter(target,
grpc.credentials.createInsecure());
var user;
if (argv._.length > 0) {
user = argv._[0];
} else {
user = 'world';
}
client.sayHello({name: user}, function(err, response) {
console.log('Greeting:', response.message);
});
client.sayHelloAgain({name: 'you'}, function(err, response) {
console.log('Greeting:', response.message);
});
}
main();
1.5 运行测试
$ node greeter_server.js
# 为 client 新建一个终端,运行以下命令
$ node greeter_client.js
# client 端输出结果
Greeting: Hello world
Greeting: Hello again, you
2. Basics tutorial 示例
学习内容:
- 在一个 .proto 文件内定义服务。
- 用 protocol buffer 编译器生成服务器和客户端代码。
- 使用 gRPC 的 Node.js API 为你的服务实现一个简单的客户端和服务器。
2.1 编写 route_guide.proto 服务定义
要定义一个服务,你必须在你的 .proto 文件中指定 service:
service RouteGuide {
...
}
然后在你的服务中定义 rpc 方法,指定请求的和响应类型。gRPC 允许你定义4种类型的 service 方法,在 RouteGuide 服务中都有使用:
一个 简单 RPC , 客户端使用存根发送请求到服务器并等待响应返回,就像平常的函数调用一样。
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
一个 服务器端流式 RPC , 客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息。从例子中可以看出,通过在 响应 类型前插入 stream 关键字,可以指定一个服务器端的流方法。
// Obtains the Features available within the given Rectangle. Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
一个 客户端流式 RPC , 客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦客户端完成写入消息,它等待服务器完成读取返回它的响应。通过在 请求 类型前指定 stream 关键字来指定一个客户端的流方法。
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
一个 双向流式 RPC 是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留。你可以通过在请求和响应前加 stream 关键字去制定方法的类型。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
我们的 .proto 文件也包含了所有请求的 protocol buffer 消息类型定义以及在服务方法中使用的响应类型——比如,下面的 Point 消息类型:
// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
int32 latitude = 1;
int32 longitude = 2;
}
2.2 从 .proto 文件加载服务描述符
Node.js 的类库在运行时加载 .proto 中的客户端存根并动态生成服务描述符。
要加载一个 .proto 文件,只需要 require gRPC proto loader 类库,然后使用它的 loadSync() 方法。最后将加载的内容传递给 gRPC 类库的 loadPackageDefinition() 方法:
var PROTO_PATH = __dirname + '/route_guide.proto';
var grpc = require('@grpc/grpc-js');
var protoLoader = require('@grpc/proto-loader');
// Suggested options for similarity to existing grpc.load behavior
var packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
var protoDescriptor = grpc.loadPackageDefinition(packageDefinition);
// The protoDescriptor object has the full package hierarchy
var routeguide = protoDescriptor.routeguide;
一旦你完成这个,存根构造函数是在 routeguide 命名空间(protoDescriptor.routeguide.RouteGuide)中而服务描述符(用来创建服务器)是存根(protoDescriptor.routeguide.RouteGuide.service)的一个属性。
- 完整的 route_guide.proto 文件 ```protobuf syntax = “proto3”;
option java_multiple_files = true; option java_package = “io.grpc.examples.routeguide”; option java_outer_classname = “RouteGuideProto”; option objc_class_prefix = “RTG”;
package routeguide;
// Interface exported by the server. service RouteGuide { // A simple RPC. // // Obtains the feature at a given position. // // A feature with an empty name is returned if there’s no feature at the given // position. rpc GetFeature(Point) returns (Feature) {}
// A server-to-client streaming RPC. // // Obtains the Features available within the given Rectangle. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field), as the rectangle may cover a large area and contain a // huge number of features. rpc ListFeatures(Rectangle) returns (stream Feature) {}
// A client-to-server streaming RPC. // // Accepts a stream of Points on a route being traversed, returning a // RouteSummary when traversal is completed. rpc RecordRoute(stream Point) returns (RouteSummary) {}
// A Bidirectional streaming RPC. // // Accepts a stream of RouteNotes sent while a route is being traversed, // while receiving other RouteNotes (e.g. from other users). rpc RouteChat(stream RouteNote) returns (stream RouteNote) {} }
// Points are represented as latitude-longitude pairs in the E7 representation // (degrees multiplied by 10**7 and rounded to the nearest integer). // Latitudes should be in the range +/- 90 degrees and longitude should be in // the range +/- 180 degrees (inclusive). message Point { int32 latitude = 1; int32 longitude = 2; }
// A latitude-longitude rectangle, represented as two diagonally opposite // points “lo” and “hi”. message Rectangle { // One corner of the rectangle. Point lo = 1;
// The other corner of the rectangle. Point hi = 2; }
// A feature names something at a given point. // // If a feature could not be named, the name is empty. message Feature { // The name of the feature. string name = 1;
// The point where the feature is detected. Point location = 2; }
// A RouteNote is a message sent while at a given point. message RouteNote { // The location from which the message is sent. Point location = 1;
// The message to be sent. string message = 2; }
// A RouteSummary is received in response to a RecordRoute rpc. // // It contains the number of individual points received, the number of // detected features, and the total distance covered as the cumulative sum of // the distance between each point. message RouteSummary { // The number of points received. int32 point_count = 1;
// The number of known features passed while traversing the route. int32 feature_count = 2;
// The distance covered in metres. int32 distance = 3;
// The duration of the traversal in seconds. int32 elapsed_time = 4; }
<a name="4PFZR"></a>
## 2.3 创建服务器
首先来看看我们如何创建一个 RouteGuide 服务器。
让 RouteGuide 服务运作起来需要有两个部分支持:
- 实现我们服务定义的生成的服务接口:做我们的服务的实际的“工作”。
- 运行一个 gRPC 服务器,监听来自客户端的请求并返回服务的响应。
- 完整的 **route_guide_server.js**,看看它是如何工作的。
```javascript
var PROTO_PATH = __dirname + '/../../../protos/route_guide.proto';
var fs = require('fs');
var parseArgs = require('minimist');
var path = require('path');
var _ = require('lodash');
var grpc = require('@grpc/grpc-js');
var protoLoader = require('@grpc/proto-loader');
var packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
var routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide;
var COORD_FACTOR = 1e7;
/**
* For simplicity, a point is a record type that looks like
* {latitude: number, longitude: number}, and a feature is a record type that
* looks like {name: string, location: point}. feature objects with name===''
* are points with no feature.
*/
/**
* List of feature objects at points that have been requested so far.
*/
var feature_list = [];
/**
* Get a feature object at the given point, or creates one if it does not exist.
* @param {point} point The point to check
* @return {feature} The feature object at the point. Note that an empty name
* indicates no feature
*/
function checkFeature(point) {
var feature;
// Check if there is already a feature object for the given point
for (var i = 0; i < feature_list.length; i++) {
feature = feature_list[i];
if (feature.location.latitude === point.latitude &&
feature.location.longitude === point.longitude) {
return feature;
}
}
var name = '';
feature = {
name: name,
location: point
};
return feature;
}
/**
* getFeature request handler. Gets a request with a point, and responds with a
* feature object indicating whether there is a feature at that point.
* @param {EventEmitter} call Call object for the handler to process
* @param {function(Error, feature)} callback Response callback
*/
function getFeature(call, callback) {
callback(null, checkFeature(call.request));
}
/**
* listFeatures request handler. Gets a request with two points, and responds
* with a stream of all features in the bounding box defined by those points.
* @param {Writable} call Writable stream for responses with an additional
* request property for the request value.
*/
function listFeatures(call) {
var lo = call.request.lo;
var hi = call.request.hi;
var left = _.min([lo.longitude, hi.longitude]);
var right = _.max([lo.longitude, hi.longitude]);
var top = _.max([lo.latitude, hi.latitude]);
var bottom = _.min([lo.latitude, hi.latitude]);
// For each feature, check if it is in the given bounding box
_.each(feature_list, function(feature) {
if (feature.name === '') {
return;
}
if (feature.location.longitude >= left &&
feature.location.longitude <= right &&
feature.location.latitude >= bottom &&
feature.location.latitude <= top) {
call.write(feature);
}
});
call.end();
}
/**
* Calculate the distance between two points using the "haversine" formula.
* The formula is based on http://mathforum.org/library/drmath/view/51879.html.
* @param start The starting point
* @param end The end point
* @return The distance between the points in meters
*/
function getDistance(start, end) {
function toRadians(num) {
return num * Math.PI / 180;
}
var R = 6371000; // earth radius in metres
var lat1 = toRadians(start.latitude / COORD_FACTOR);
var lat2 = toRadians(end.latitude / COORD_FACTOR);
var lon1 = toRadians(start.longitude / COORD_FACTOR);
var lon2 = toRadians(end.longitude / COORD_FACTOR);
var deltalat = lat2-lat1;
var deltalon = lon2-lon1;
var a = Math.sin(deltalat/2) * Math.sin(deltalat/2) +
Math.cos(lat1) * Math.cos(lat2) *
Math.sin(deltalon/2) * Math.sin(deltalon/2);
var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));
return R * c;
}
/**
* recordRoute handler. Gets a stream of points, and responds with statistics
* about the "trip": number of points, number of known features visited, total
* distance traveled, and total time spent.
* @param {Readable} call The request point stream.
* @param {function(Error, routeSummary)} callback The callback to pass the
* response to
*/
function recordRoute(call, callback) {
var point_count = 0;
var feature_count = 0;
var distance = 0;
var previous = null;
// Start a timer
var start_time = process.hrtime();
call.on('data', function(point) {
point_count += 1;
if (checkFeature(point).name !== '') {
feature_count += 1;
}
/* For each point after the first, add the incremental distance from the
* previous point to the total distance value */
if (previous != null) {
distance += getDistance(previous, point);
}
previous = point;
});
call.on('end', function() {
callback(null, {
point_count: point_count,
feature_count: feature_count,
// Cast the distance to an integer
distance: distance|0,
// End the timer
elapsed_time: process.hrtime(start_time)[0]
});
});
}
var route_notes = {};
/**
* Turn the point into a dictionary key.
* @param {point} point The point to use
* @return {string} The key for an object
*/
function pointKey(point) {
return point.latitude + ' ' + point.longitude;
}
/**
* routeChat handler. Receives a stream of message/location pairs, and responds
* with a stream of all previous messages at each of those locations.
* @param {Duplex} call The stream for incoming and outgoing messages
*/
function routeChat(call) {
call.on('data', function(note) {
var key = pointKey(note.location);
/* For each note sent, respond with all previous notes that correspond to
* the same point */
if (route_notes.hasOwnProperty(key)) {
_.each(route_notes[key], function(note) {
call.write(note);
});
} else {
route_notes[key] = [];
}
// Then add the new note to the list
route_notes[key].push(JSON.parse(JSON.stringify(note)));
});
call.on('end', function() {
call.end();
});
}
/**
* Get a new server with the handler functions in this file bound to the methods
* it serves.
* @return {Server} The new server object
*/
function getServer() {
var server = new grpc.Server();
server.addService(routeguide.RouteGuide.service, {
getFeature: getFeature,
listFeatures: listFeatures,
recordRoute: recordRoute,
routeChat: routeChat
});
return server;
}
if (require.main === module) {
// If this is run as a script, start a server on an unused port
var routeServer = getServer();
routeServer.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
var argv = parseArgs(process.argv, {
string: 'db_path'
});
fs.readFile(path.resolve(argv.db_path), function(err, data) {
if (err) throw err;
feature_list = JSON.parse(data);
routeServer.start();
});
});
}
exports.getServer = getServer;
2.3.1 实现 RouteGuide
可以看出,我们的服务器有一个从 RouteGuide.service 描述符对象生成的 Server 构造函数:
var Server = new grpc.Server();
在这个场景下,我们实现了 异步 版本的 RouteGuide,它提供了 gRPC 缺省的行为。
route_guide_server.js 中的函数实现了所有的服务方法。
最简单的类型 RPC:getFeature
首先让我们看看最简单的类型 getFeature,它从客户端拿到一个 Point 对象,然后返回包含从数据库拿到的 feature 信息的 Feature。
function checkFeature(point) {
var feature;
// Check if there is already a feature object for the given point
for (var i = 0; i < feature_list.length; i++) {
feature = feature_list[i];
if (feature.location.latitude === point.latitude &&
feature.location.longitude === point.longitude) {
return feature;
}
}
var name = '';
feature = {
name: name,
location: point
};
return feature;
}
function getFeature(call, callback) {
callback(null, checkFeature(call.request));
}
getFeature 方法传入一个 RPC 的 call 对象,call 对象包含一个 Point 属性。而且还有一个 callback 函数作为参数,它会返回 Feature 对象。
在 checkFeature 方法中我们根据给出的 Point 去对应的填充 Feature,并将其传给 callback 函数,其中 callback 函数第一个参数为 null,表示没有错误。
服务器端流式 RPC:listFeatures
现在让我们看看稍微复杂点的东西 —— 流式 RPC。 listFeatures 是一个服务器端流式 RPC,所以我们需要发回多个 Feature 给客户端。
function listFeatures(call) {
var lo = call.request.lo;
var hi = call.request.hi;
var left = _.min([lo.longitude, hi.longitude]);
var right = _.max([lo.longitude, hi.longitude]);
var top = _.max([lo.latitude, hi.latitude]);
var bottom = _.min([lo.latitude, hi.latitude]);
// For each feature, check if it is in the given bounding box
_.each(feature_list, function(feature) {
if (feature.name === '') {
return;
}
if (feature.location.longitude >= left &&
feature.location.longitude <= right &&
feature.location.latitude >= bottom &&
feature.location.latitude <= top) {
call.write(feature);
}
});
call.end();
}
这次,不再是传入 call 对象和 callback 函数作为参数,而是只传入一个 call 对象,它实现了 Writable 接口。
在 listFeatures 方法中,我们遍历 feature_list 数组,根据客户端传入的坐标范围,返回满足条件的 feature,将 feature 使用 write() 方法写入 call 中。最后,我们调用 call.end() 表示我们已经完成了所有消息的发送。
客户端流式 RPC:recordRoute
如果你看过客户端流方法 recordRoute,你会发现它很类似,除了这次 call 参数实现了 Reader 的接口。 每次有新数据的时候,call 的 data 事件被触发,每次数据读取完成时,end 事件被触发。和一元的场景一样,我们通过调用 callback 函数来应答:
call.on('data', function(point) {
// Process user data
});
call.on('end', function() {
callback(null, result);
});
双向流式 RPC:routeChat
function routeChat(call) {
call.on('data', function(note) {
var key = pointKey(note.location);
/* For each note sent, respond with all previous notes that correspond to
* the same point */
if (route_notes.hasOwnProperty(key)) {
_.each(route_notes[key], function(note) {
call.write(note);
});
} else {
route_notes[key] = [];
}
// Then add the new note to the list
route_notes[key].push(JSON.parse(JSON.stringify(note)));
});
call.on('end', function() {
call.end();
});
}
这次我们得到的是一个实现了 Duplex 的 call 对象,可以用来读 和 写消息。这里读写的语法和我们客户端流以及服务器流方法是一样的。虽然每一端获取对方信息的顺序和写入的顺序一致,客户端和服务器都可以以任意顺序读写——流的操作是完全独立的。
2.3.2 启动服务器
一旦我们实现了所有的方法,我们还需要启动一个gRPC服务器,这样客户端才可以使用服务。下面这段代码展示了在我们 RouteGuide 服务中实现的过程:
function getServer() {
var server = new grpc.Server();
server.addService(routeguide.RouteGuide.service, {
getFeature: getFeature,
listFeatures: listFeatures,
recordRoute: recordRoute,
routeChat: routeChat
});
return server;
}
var routeServer = getServer();
routeServer.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
routeServer.start();
});
如你所见,我们通过下面的步骤去构建和启动服务器:
- 通过 RouteGuide 服务描述符创建一个 Server 构造函数。
- 实现服务的方法。
- 通过调用 Server 的构造函数以及方法实现去创建一个服务器的实例。
- 用实例的 bind() 方法指定地址以及我们期望客户端请求监听的端口。
- 调用实例的 listen() 方法启动一个RPC服务器。
2.4 创建客户端
在这部分,我们将尝试为 RouteGuide 服务创建一个 Node.js 的客户端。
- 完整的 route_guide_client.js
```javascript
/
- Copyright 2015 gRPC authors. *
- Licensed under the Apache License, Version 2.0 (the “License”);
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at *
- http://www.apache.org/licenses/LICENSE-2.0 *
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an “AS IS” BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. /
var PROTO_PATH = __dirname + ‘/../../../protos/route_guide.proto’;
var async = require(‘async’); var fs = require(‘fs’); var parseArgs = require(‘minimist’); var path = require(‘path’); var _ = require(‘lodash’); var grpc = require(‘@grpc/grpc-js’); var protoLoader = require(‘@grpc/proto-loader’); var packageDefinition = protoLoader.loadSync( PROTO_PATH, {keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); var routeguide = grpc.loadPackageDefinition(packageDefinition).routeguide; var client = new routeguide.RouteGuide(‘localhost:50051’, grpc.credentials.createInsecure());
var COORD_FACTOR = 1e7;
/**
- Run the getFeature demo. Calls getFeature with a point known to have a
- feature and a point known not to have a feature.
- @param {function} callback Called when this demo is complete
*/
function runGetFeature(callback) {
var next = _.after(2, callback);
function featureCallback(error, feature) {
if (error) {
callback(error);
return;
}
if (feature.name === ‘’) {
console.log(‘Found no feature at ‘ +
} else { console.log(‘Found feature called “‘ + feature.name + ‘“ at ‘ +feature.location.latitude/COORD_FACTOR + ', ' +
feature.location.longitude/COORD_FACTOR);
} next(); } var point1 = { latitude: 409146138, longitude: -746188906 }; var point2 = { latitude: 0, longitude: 0 }; client.getFeature(point1, featureCallback); client.getFeature(point2, featureCallback); }feature.location.latitude/COORD_FACTOR + ', ' +
feature.location.longitude/COORD_FACTOR);
/**
- Run the listFeatures demo. Calls listFeatures with a rectangle containing all
- of the features in the pre-generated database. Prints each response as it
- comes in.
- @param {function} callback Called when this demo is complete
*/
function runListFeatures(callback) {
var rectangle = {
lo: {
latitude: 400000000,
longitude: -750000000
},
hi: {
latitude: 420000000,
longitude: -730000000
}
};
console.log(‘Looking for features between 40, -75 and 42, -73’);
var call = client.listFeatures(rectangle);
call.on(‘data’, function(feature) {
console.log(‘Found feature called “‘ + feature.name + ‘“ at ‘ +
}); call.on(‘end’, callback); }feature.location.latitude/COORD_FACTOR + ', ' +
feature.location.longitude/COORD_FACTOR);
/**
- Run the recordRoute demo. Sends several randomly chosen points from the
- pre-generated feature database with a variable delay in between. Prints the
- statistics when they are sent from the server.
@param {function} callback Called when this demo is complete */ function runRecordRoute(callback) { var argv = parseArgs(process.argv, { string: ‘db_path’ }); fs.readFile(path.resolve(argv.db_path), function(err, data) { if (err) { callback(err); return; } var feature_list = JSON.parse(data);
var num_points = 10; var call = client.recordRoute(function(error, stats) { if (error) {
callback(error);
return;
} console.log(‘Finished trip with’, stats.point_count, ‘points’); console.log(‘Passed’, stats.feature_count, ‘features’); console.log(‘Travelled’, stats.distance, ‘meters’); console.log(‘It took’, stats.elapsed_time, ‘seconds’); callback(); }); /**
- Constructs a function that asynchronously sends the given point and then
- delays sending its callback
- @param {number} lat The latitude to send
- @param {number} lng The longitude to send
- @return {function(function)} The function that sends the point
/
function pointSender(lat, lng) {
/*
- Sends the point, then calls the callback after a delay
- @param {function} callback Called when complete
*/
return function(callback) {
console.log(‘Visiting point ‘ + lat/COORDFACTOR + ‘, ‘ +
lng/COORD_FACTOR);
call.write({
latitude: lat,
longitude: lng
});
.delay(callback, .random(500, 1500));
};
}
var point_senders = [];
for (var i = 0; i < num_points; i++) {
var rand_point = feature_list[.random(0, feature_list.length - 1)];
point_senders[i] = pointSender(rand_point.location.latitude,
} async.series(point_senders, function() { call.end(); }); }); }rand_point.location.longitude);
/**
- Run the routeChat demo. Send some chat messages, and print any chat messages
- that are sent from the server.
@param {function} callback Called when the demo is complete */ function runRouteChat(callback) { var call = client.routeChat(); call.on(‘data’, function(note) { console.log(‘Got message “‘ + note.message + ‘“ at ‘ +
note.location.latitude + ', ' + note.location.longitude);
});
call.on(‘end’, callback);
var notes = [{ location: { latitude: 0, longitude: 0 }, message: ‘First message’ }, { location: { latitude: 0, longitude: 1 }, message: ‘Second message’ }, { location: { latitude: 1, longitude: 0 }, message: ‘Third message’ }, { location: { latitude: 0, longitude: 0 }, message: ‘Fourth message’ }]; for (var i = 0; i < notes.length; i++) { var note = notes[i]; console.log(‘Sending message “‘ + note.message + ‘“ at ‘ +
note.location.latitude + ', ' + note.location.longitude);
call.write(note); } call.end(); }
/**
- Run all of the demos in order */ function main() { async.series([ runGetFeature, runListFeatures, runRecordRoute, runRouteChat ]); }
if (require.main === module) { main(); }
exports.runGetFeature = runGetFeature;
exports.runListFeatures = runListFeatures;
exports.runRecordRoute = runRecordRoute;
exports.runRouteChat = runRouteChat;
<a name="ufexo"></a>
### 2.4.1 创建存根
为了能调用服务的方法,我们得先创建一个 存根。要做到这点,我们只需要调用 RouteGuide 的存根构造函数,指定服务器地址和端口。
```javascript
new routeguide.RouteGuide('localhost:50051', grpc.credentials.createInsecure());
2.4.2 调用服务的方法
现在我们来看看如何调用服务的方法。注意这些方法都是异步的:他们使用事件或者 callback 去获得结果。
简单类型 RPC:getFeature
调用简单 RPC 的 getFeature 几乎是和调用一个本地的异步方法一样简单。
var point = {latitude: 409146138, longitude: -746188906};
stub.getFeature(point, function(err, feature) {
if (err) {
// process error
} else {
// process feature
}
});
如你所见,我们创建并且填充了一个请求对象。最后我们调用了存根上的方法,传入请求和回调函数。如果没有错误,就可以从我们的服务器从应答对象读取应答信息。
console.log('Found feature called "' + feature.name + '" at ' +
feature.location.latitude/COORD_FACTOR + ', ' +
feature.location.longitude/COORD_FACTOR);
流式 RPC
现在来看看我们的流方法。如果你已经读过【创建服务器】,本节的一些内容看上去很熟悉——流式 RPC 是在客户端和服务器两端以一种类似的方式实现的。
下面就是我们称作是服务器端的流方法 listFeatures,它会返回地理的 Feature:
var call = client.listFeatures(rectangle);
call.on('data', function(feature) {
console.log('Found feature called "' + feature.name + '" at ' +
feature.location.latitude/COORD_FACTOR + ', ' +
feature.location.longitude/COORD_FACTOR);
});
call.on('end', function() {
// The server has finished sending
});
call.on('error', function(e) {
// An error has occurred and the stream has been closed.
});
call.on('status', function(status) {
// process status
});
我们传给它一个请求并拿回一个 Readable 流对象,而不是给方法传入请求和回调函数。客户端可以使用 Readable 的 ‘data’ 事件去读取服务器的应答。这个事件由每个 Feature 消息对象触发,知道没有更多的消息:’end’ 事件揭示调用已经结束。最后,当服务器发送状态时,触发状态事件。
客户端的流方法 RecordRoute 的使用很相似,除了我们将一个回调函数传给方法,拿到一个 Writable 返回。
var call = client.recordRoute(function(error, stats) {
if (error) {
callback(error);
}
console.log('Finished trip with', stats.point_count, 'points');
console.log('Passed', stats.feature_count, 'features');
console.log('Travelled', stats.distance, 'meters');
console.log('It took', stats.elapsed_time, 'seconds');
});
function pointSender(lat, lng) {
return function(callback) {
console.log('Visiting point ' + lat/COORD_FACTOR + ', ' +
lng/COORD_FACTOR);
call.write({
latitude: lat,
longitude: lng
});
_.delay(callback, _.random(500, 1500));
};
}
var point_senders = [];
for (var i = 0; i < num_points; i++) {
var rand_point = feature_list[_.random(0, feature_list.length - 1)];
point_senders[i] = pointSender(rand_point.location.latitude,
rand_point.location.longitude);
}
async.series(point_senders, function() {
call.end();
});
一旦我们用 write() 将客户端请求写入到流的动作完成,我们需要在流上调用 end() 通知 gRPC 我们已经完成写。如果状态是 OK,stats 对象会跟着服务器的响应被填充。
最后,让我们看看双向流式 RPC routeChat()。在这种场景下,我们将上下文传给一个方法,拿到一个可以用来读写消息的 Duplex 流对象的返回。
var call = client.routeChat();
这里读写的语法和我们客户端流以及服务器端流方法没有任何区别。虽然每一方都能按照写入时的顺序拿到另一方的消息,客户端和服务器端都可以以任意顺序读写——流操作起来是完全独立的。
2.5 运行测试
$ npm install
# 运行 server
$ node ./dynamic_codegen/route_guide/route_guide_server.js --db_path=./dynamic_codegen/route_guide/route_guide_db.json
# 在新的终端中,运行 client
$ node ./dynamic_codegen/route_guide/route_guide_client.js --db_path=./dynamic_codegen/route_guide/route_guide_db.json
# client 输出结果:
...
Found feature called "261 Van Sickle Road, Goshen, NY 10924, USA" at 41.3069058, -74.4597778
...
Visiting point 40.1012643, -74.4035134
Finished trip with 10 points
Passed 6 features
Travelled 638865 meters
It took 10 seconds
Sending message "First message" at 0, 0
Sending message "Second message" at 0, 1
Sending message "Third message" at 1, 0
Sending message "Fourth message" at 0, 0
Got message "First message" at 0, 0