HTTP模块实现了HTTP服务器和客户端的功能,是Node.js的核心模块,也是我们使用得最多的模块。本章我们来分析HTTP模块,从中我们可以学习到一个HTTP服务器和客户端是怎么实现的,以及HTTP协议本身的一些原理和优化。

18.1 HTTP解析器

HTTP解析器是HTTP模块的核心,不管是作为服务器处理请求还是客户端处理响应都需要使用HTTP解析器解析HTTP协议。新版Node.js使用了新的HTTP解析器llhttp。根据官方说明llhttp比旧版的http_parser在性能上有了非常大的提高。本节我们分析分析llhttp的基础原理和使用。HTTP解析器是一个非常复杂的状态机,在解析数据的过程中,会不断触发钩子函数。下面是llhttp支持的钩子函数。如果用户定义了对应的钩子,在解析的过程中就会被回调。

  1. // 开始解析HTTP协议
  2. int llhttp__on_message_begin(llhttp_t* s, const char* p, const char* endp) {
  3. int err;
  4. CALLBACK_MAYBE(s, on_message_begin, s);
  5. return err;
  6. }
  7. // 解析出请求url时的回调,最后拿到一个url
  8. int llhttp__on_url(llhttp_t* s, const char* p, const char* endp) {
  9. int err;
  10. CALLBACK_MAYBE(s, on_url, s, p, endp - p);
  11. return err;
  12. }
  13. // 解析出HTTP响应状态的回调
  14. int llhttp__on_status(llhttp_t* s, const char* p, const char* endp) {
  15. int err;
  16. CALLBACK_MAYBE(s, on_status, s, p, endp - p);
  17. return err;
  18. }
  19. // 解析出头部键时的回调
  20. int llhttp__on_header_field(llhttp_t* s, const char* p, const char* endp) {
  21. int err;
  22. CALLBACK_MAYBE(s, on_header_field, s, p, endp - p);
  23. return err;
  24. }
  25. // 解析出头部值时的回调
  26. int llhttp__on_header_value(llhttp_t* s, const char* p, const char* endp) {
  27. int err;
  28. CALLBACK_MAYBE(s, on_header_value, s, p, endp - p);
  29. return err;
  30. }
  31. // 解析HTTP头完成时的回调
  32. int llhttp__on_headers_complete(llhttp_t* s, const char* p, const char* endp) {
  33. int err;
  34. CALLBACK_MAYBE(s, on_headers_complete, s);
  35. return err;
  36. }
  37. // 解析完body的回调
  38. int llhttp__on_message_complete(llhttp_t* s, const char* p, const char* endp) {
  39. int err;
  40. CALLBACK_MAYBE(s, on_message_complete, s);
  41. return err;
  42. }
  43. // 解析body时的回调
  44. int llhttp__on_body(llhttp_t* s, const char* p, const char* endp) {
  45. int err;
  46. CALLBACK_MAYBE(s, on_body, s, p, endp - p);
  47. return err;
  48. }
  49. // 解析到一个chunk结构头时的回调
  50. int llhttp__on_chunk_header(llhttp_t* s, const char* p, const char* endp) {
  51. int err;
  52. CALLBACK_MAYBE(s, on_chunk_header, s);
  53. return err;
  54. }
  55. // 解析完一个chunk时的回调
  56. int llhttp__on_chunk_complete(llhttp_t* s, const char* p, const char* endp) {
  57. int err;
  58. CALLBACK_MAYBE(s, on_chunk_complete, s);
  59. return err;
  60. }

Node.js在node_http_parser.cc中对llhttp进行了封装。该模块导出了一个HTTPParser。

  1. Local<FunctionTemplate> t=env->NewFunctionTemplate(Parser::New);
  2. t->InstanceTemplate()->SetInternalFieldCount(1);
  3. t->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(),
  4. "HTTPParser"));
  5. target->Set(env->context(),
  6. FIXED_ONE_BYTE_STRING(env->isolate(), "HTTPParser"),
  7. t->GetFunction(env->context()).ToLocalChecked()).Check();

在Node.js中我们通过以下方式使用HTTPParser。

  1. const parser = new HTTPParser();
  2. cleanParser(parser);
  3. parser.onIncoming = null;
  4. parser[kOnHeaders] = parserOnHeaders;
  5. parser[kOnHeadersComplete] = parserOnHeadersComplete;
  6. parser[kOnBody] = parserOnBody;
  7. parser[kOnMessageComplete] = parserOnMessageComplete;
  8. // 初始化HTTP解析器处理的报文类型,这里是响应报文
  9. parser.initialize(HTTPParser.RESPONSE,
  10. new HTTPClientAsyncResource('HTTPINCOMINGMESSAGE', req),
  11. req.maxHeaderSize || 0,
  12. req.insecureHTTPParser === undefined ?
  13. isLenient() : req.insecureHTTPParser);
  14. // 收到数据后传给解析器处理
  15. const ret = parser.execute(data);
  16. }

我们看一下initialize和execute的代码。Initialize函数用于初始化llhttp。

  1. static void Initialize(const FunctionCallbackInfo<Value>& args) {
  2. Environment* env = Environment::GetCurrent(args);
  3. bool lenient = args[3]->IsTrue();
  4. uint64_t max_http_header_size = 0;
  5. // 头部的最大大小
  6. if (args.Length() > 2) {
  7. max_http_header_size = args[2].As<Number>()->Value();
  8. }
  9. // 没有设置则取Node.js的默认值
  10. if (max_http_header_size == 0) {
  11. max_http_header_size=env->options()->max_http_header_size;
  12. }
  13. // 解析的报文类型
  14. llhttp_type_t type =
  15. static_cast<llhttp_type_t>(args[0].As<Int32>()->Value());
  16. CHECK(type == HTTP_REQUEST || type == HTTP_RESPONSE);
  17. Parser* parser;
  18. ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
  19. parser->Init(type, max_http_header_size, lenient);
  20. }

Initialize做了一些预处理后调用Init。

  1. void Init(llhttp_type_t type, uint64_t max_http_header_size, bool lenient) {
  2. // 初始化llhttp
  3. llhttp_init(&parser_, type, &settings);
  4. llhttp_set_lenient(&parser_, lenient);
  5. header_nread_ = 0;
  6. url_.Reset();
  7. status_message_.Reset();
  8. num_fields_ = 0;
  9. num_values_ = 0;
  10. have_flushed_ = false;
  11. got_exception_ = false;
  12. max_http_header_size_ = max_http_header_size;
  13. }

Init做了一些字段的初始化,最重要的是调用了llhttp_init对llhttp进行了初始化,另外kOn开头的属性是钩子函数,由node_http_parser.cc中的回调,而node_http_parser.cc也会定义钩子函数,由llhttp回调,我们看一下node_http_parser.cc钩子函数的定义和实现。

  1. const llhttp_settings_t Parser::settings = {
  2. Proxy<Call, &Parser::on_message_begin>::Raw,
  3. Proxy<DataCall, &Parser::on_url>::Raw,
  4. Proxy<DataCall, &Parser::on_status>::Raw,
  5. Proxy<DataCall, &Parser::on_header_field>::Raw,
  6. Proxy<DataCall, &Parser::on_header_value>::Raw,
  7. Proxy<Call, &Parser::on_headers_complete>::Raw,
  8. Proxy<DataCall, &Parser::on_body>::Raw,
  9. Proxy<Call, &Parser::on_message_complete>::Raw,
  10. Proxy<Call, &Parser::on_chunk_header>::Raw,
  11. Proxy<Call, &Parser::on_chunk_complete>::Raw,
  12. };

1 开始解析报文的回调

  1. // 开始解析报文,一个TCP连接可能会有多个报文
  2. int on_message_begin() {
  3. num_fields_ = num_values_ = 0;
  4. url_.Reset();
  5. status_message_.Reset();
  6. return 0;
  7. }

2 解析url时的回调

  1. int on_url(const char* at, size_t length) {
  2. int rv = TrackHeader(length);
  3. if (rv != 0) {
  4. return rv;
  5. }
  6. url_.Update(at, length);
  7. return 0;
  8. }

3解析HTTP响应时的回调

  1. int on_status(const char* at, size_t length) {
  2. int rv = TrackHeader(length);
  3. if (rv != 0) {
  4. return rv;
  5. }
  6. status_message_.Update(at, length);
  7. return 0;
  8. }

4解析到HTTP头的键时回调

  1. int on_header_field(const char* at, size_t length) {
  2. int rv = TrackHeader(length);
  3. if (rv != 0) {
  4. return rv;
  5. }
  6. // 相等说明键对值的解析是一一对应的
  7. if (num_fields_ == num_values_) {
  8. // start of new field name
  9. // 键的数加一
  10. num_fields_++;
  11. // 超过阈值则先回调js消费掉
  12. if (num_fields_ == kMaxHeaderFieldsCount) {
  13. // ran out of space - flush to javascript land
  14. Flush();
  15. // 重新开始
  16. num_fields_ = 1;
  17. num_values_ = 0;
  18. }
  19. // 初始化
  20. fields_[num_fields_ - 1].Reset();
  21. }
  22. // 保存键
  23. fields_[num_fields_ - 1].Update(at, length);
  24. return 0;
  25. }

当解析的头部个数达到阈值时,Node.js会先通过Flush函数回调JS层保存当前的一些数据。

  1. void Flush() {
  2. HandleScope scope(env()->isolate());
  3. Local<Object> obj = object();
  4. // JS层的钩子
  5. Local<Value> cb = obj->Get(env()->context(), kOnHeaders).ToLocalChecked();
  6. if (!cb->IsFunction())
  7. return;
  8. Local<Value> argv[2] = {
  9. CreateHeaders(),
  10. url_.ToString(env())
  11. };
  12. MaybeLocal<Value> r = MakeCallback(cb.As<Function>(),
  13. arraysize(argv),
  14. argv);
  15. url_.Reset();
  16. have_flushed_ = true;
  17. }
  18. Local<Array> CreateHeaders() {
  19. // HTTP头的个数乘以2,因为一个头由键和值组成
  20. Local<Value> headers_v[kMaxHeaderFieldsCount * 2];
  21. // 保存键和值到HTTP头
  22. for (size_t i = 0; i < num_values_; ++i) {
  23. headers_v[i * 2] = fields_[i].ToString(env());
  24. headers_v[i * 2 + 1] = values_[i].ToString(env());
  25. }
  26. return Array::New(env()->isolate(), headers_v, num_values_ * 2);
  27. }

Flush会调用JS层的kOnHeaders钩子函数。

5解析到HTTP头的值时回调

  1. int on_header_value(const char* at, size_t length) {
  2. int rv = TrackHeader(length);
  3. if (rv != 0) {
  4. return rv;
  5. }
  6. /*
  7. 值的个数不等于键的个数说明正解析到键对应的值,即一一对应。
  8. 否则说明一个键存在多个值,则不更新值的个数,多个值累加到一个slot
  9. */
  10. if (num_values_ != num_fields_) {
  11. // start of new header value
  12. num_values_++;
  13. values_[num_values_ - 1].Reset();
  14. }
  15. CHECK_LT(num_values_, arraysize(values_));
  16. CHECK_EQ(num_values_, num_fields_);
  17. values_[num_values_ - 1].Update(at, length);
  18. return 0;
  19. }

6解析完HTTP头后的回调

  1. int on_headers_complete() {
  2. header_nread_ = 0;
  3. enum on_headers_complete_arg_index {
  4. A_VERSION_MAJOR = 0,
  5. A_VERSION_MINOR,
  6. A_HEADERS,
  7. A_METHOD,
  8. A_URL,
  9. A_STATUS_CODE,
  10. A_STATUS_MESSAGE,
  11. A_UPGRADE,
  12. A_SHOULD_KEEP_ALIVE,
  13. A_MAX
  14. };
  15. Local<Value> argv[A_MAX];
  16. Local<Object> obj = object();
  17. Local<Value> cb = obj->Get(env()->context(),
  18. kOnHeadersComplete).ToLocalChecked();
  19. Local<Value> undefined = Undefined(env()->isolate());
  20. for (size_t i = 0; i < arraysize(argv); i++)
  21. argv[i] = undefined;
  22. // 之前flush过,则继续flush到JS层,否则返回全部头给js
  23. if (have_flushed_) {
  24. // Slow case, flush remaining headers.
  25. Flush();
  26. } else {
  27. // Fast case, pass headers and URL to JS land.
  28. argv[A_HEADERS] = CreateHeaders();
  29. if (parser_.type == HTTP_REQUEST)
  30. argv[A_URL] = url_.ToString(env());
  31. }
  32. num_fields_ = 0;
  33. num_values_ = 0;
  34. // METHOD
  35. if (parser_.type == HTTP_REQUEST) {
  36. argv[A_METHOD] =
  37. Uint32::NewFromUnsigned(env()->isolate(), parser_.method);
  38. }
  39. // STATUS
  40. if (parser_.type == HTTP_RESPONSE) {
  41. argv[A_STATUS_CODE] =
  42. Integer::New(env()->isolate(), parser_.status_code);
  43. argv[A_STATUS_MESSAGE] = status_message_.ToString(env());
  44. }
  45. // VERSION
  46. argv[A_VERSION_MAJOR] = Integer::New(env()->isolate(), parser_.http_major);
  47. argv[A_VERSION_MINOR] = Integer::New(env()->isolate(), parser_.http_minor);
  48. bool should_keep_alive;
  49. // 是否定义了keepalive头
  50. should_keep_alive = llhttp_should_keep_alive(&parser_);
  51. argv[A_SHOULD_KEEP_ALIVE] =
  52. Boolean::New(env()->isolate(), should_keep_alive);
  53. // 是否是升级协议
  54. argv[A_UPGRADE] = Boolean::New(env()->isolate(), parser_.upgrade);
  55. MaybeLocal<Value> head_response;
  56. {
  57. InternalCallbackScope callback_scope(
  58. this, InternalCallbackScope::kSkipTaskQueues);
  59. head_response = cb.As<Function>()->Call(
  60. env()->context(), object(), arraysize(argv), argv);
  61. }
  62. int64_t val;
  63. if (head_response.IsEmpty() || !head_response.ToLocalChecked()
  64. ->IntegerValue(env()->context())
  65. .To(&val)) {
  66. got_exception_ = true;
  67. return -1;
  68. }
  69. return val;
  70. }

on_headers_complete会执行JS层的kOnHeadersComplete钩子。

7 解析body时的回调

  1. int on_body(const char* at, size_t length) {
  2. EscapableHandleScope scope(env()->isolate());
  3. Local<Object> obj = object();
  4. Local<Value> cb = obj->Get(env()->context(), kOnBody).ToLocalChecked();
  5. // We came from consumed stream
  6. if (current_buffer_.IsEmpty()) {
  7. // Make sure Buffer will be in parent HandleScope
  8. current_buffer_ = scope.Escape(Buffer::Copy(
  9. env()->isolate(),
  10. current_buffer_data_,
  11. current_buffer_len_).ToLocalChecked());
  12. }
  13. Local<Value> argv[3] = {
  14. // 当前解析中的数据
  15. current_buffer_,
  16. // body开始的位置
  17. Integer::NewFromUnsigned(env()->isolate(), at - current_buffer_data_),
  18. // body当前长度
  19. Integer::NewFromUnsigned(env()->isolate(), length)
  20. };
  21. MaybeLocal<Value> r = MakeCallback(cb.As<Function>(),
  22. arraysize(argv),
  23. argv);
  24. return 0;
  25. }

Node.js中并不是每次解析HTTP报文的时候就新建一个HTTP解析器,Node.js使用FreeList数据结构对HTTP解析器实例进行了管理。

  1. class FreeList {
  2. constructor(name, max, ctor) {
  3. this.name = name;
  4. // 构造函数
  5. this.ctor = ctor;
  6. // 节点的最大值
  7. this.max = max;
  8. // 实例列表
  9. this.list = [];
  10. }
  11. // 分配一个实例
  12. alloc() {
  13. // 有空闲的则直接返回,否则新建一个
  14. return this.list.length > 0 ?
  15. this.list.pop() :
  16. ReflectApply(this.ctor, this, arguments);
  17. }
  18. // 释放实例
  19. free(obj) {
  20. // 小于阈值则放到空闲列表,否则释放(调用方负责释放)
  21. if (this.list.length < this.max) {
  22. this.list.push(obj);
  23. return true;
  24. }
  25. return false;
  26. }
  27. }

我们看一下在Node.js中对FreeList的使用。。

  1. const parsers = new FreeList('parsers', 1000, function parsersCb() {
  2. const parser = new HTTPParser();
  3. // 初始化字段
  4. cleanParser(parser);
  5. // 设置钩子
  6. parser.onIncoming = null;
  7. parser[kOnHeaders] = parserOnHeaders;
  8. parser[kOnHeadersComplete] = parserOnHeadersComplete;
  9. parser[kOnBody] = parserOnBody;
  10. parser[kOnMessageComplete] = parserOnMessageComplete;
  11. return parser;
  12. });

HTTP解析器的使用

  1. var HTTPParser = process.binding('http_parser').HTTPParser;
  2. var parser = new HTTPParser(HTTPParser.REQUEST);
  3. const kOnHeaders = HTTPParser.kOnHeaders;
  4. const kOnHeadersComplete = HTTPParser.kOnHeadersComplete;
  5. const kOnBody = HTTPParser.kOnBody;
  6. const kOnMessageComplete = HTTPParser.kOnMessageComplete;
  7. const kOnExecute = HTTPParser.kOnExecute;
  8. parser[kOnHeaders] = function(headers, url) {
  9. console.log('kOnHeaders', headers.length, url);
  10. }
  11. parser[kOnHeadersComplete] = function(versionMajor, versionMinor, headers, method,
  12. url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
  13. console.log('kOnHeadersComplete', headers);
  14. }
  15. parser[kOnBody] = function(b, start, len) {
  16. console.log('kOnBody', b.slice(start).toString('utf-8'));
  17. }
  18. parser[kOnMessageComplete] = function() {
  19. console.log('kOnMessageComplete');
  20. }
  21. parser[kOnExecute] = function() {
  22. console.log('kOnExecute');
  23. }
  24. parser.execute(Buffer.from(
  25. 'GET / HTTP/1.1\r\n' +
  26. 'Host: http://localhost\r\n\r\n'
  27. ));

以上代码的输出

  1. kOnHeadersComplete [ 'Host', 'http://localhost' ]
  2. kOnMessageComplete

我们看到只执行了kOnHeadersComplete和 kOnMessageComplete。那其它几个回调什么时候会执行呢?我们接着看。我们把输入改一下。

  1. parser.execute(Buffer.from(
  2. 'GET / HTTP/1.1\r\n' +
  3. 'Host: http://localhost\r\n' +
  4. 'content-length: 1\r\n\r\n'+
  5. '1'
  6. ));

上面代码的输出

  1. kOnHeadersComplete [ 'Host', 'http://localhost', 'content-length', '1' ]
  2. kOnBody 1
  3. kOnMessageComplete

我们看到多了一个回调kOnBody,因为我们加了一个HTTP头content-length指示有body,所以HTTP解析器解析到body的时候就会回调kOnBody。那kOnHeaders什么时候会执行呢?我们继续修改代码。

  1. parser.execute(Buffer.from(
  2. 'GET / HTTP/1.1\r\n' +
  3. 'Host: http://localhost\r\n' +
  4. 'a: b\r\n'+
  5. // 很多'a: b\r\n'+
  6. 'content-length: 1\r\n\r\n'+
  7. '1'
  8. ));

以上代码的输出

  1. kOnHeaders 62 /
  2. kOnHeaders 22
  3. kOnHeadersComplete undefined
  4. kOnBody 1
  5. kOnMessageComplete

我们看到kOnHeaders被执行了,并且执行了两次。因为如果HTTP头的个数达到阈值,在解析HTTP头部的过程中,就先flush到JS层(如果多次达到阈值,则回调多次),并且在解析完所有HTTP头后,会在kOnHeadersComplet回调之前再次回调kOnHeaders(如果还有的话)。最后我们看一下kOnExecute如何触发。

  1. var HTTPParser = process.binding('http_parser').HTTPParser;
  2. var parser = new HTTPParser(HTTPParser.REQUEST);
  3. var net = require('net');
  4. const kOnHeaders = HTTPParser.kOnHeaders;
  5. const kOnHeadersComplete = HTTPParser.kOnHeadersComplete;
  6. const kOnBody = HTTPParser.kOnBody;
  7. const kOnMessageComplete = HTTPParser.kOnMessageComplete;
  8. const kOnExecute = HTTPParser.kOnExecute;
  9. parser[kOnHeaders] = function(headers, url) {
  10. console.log('kOnHeaders', headers.length, url);
  11. }
  12. parser[kOnHeadersComplete] = function(versionMajor, versionMinor, headers, method,
  13. url, statusCode, statusMessage, upgrade, shouldKeepAlive) {
  14. console.log('kOnHeadersComplete', headers);
  15. }
  16. parser[kOnBody] = function(b, start, len) {
  17. console.log('kOnBody', b.slice(start).toString('utf-8'));
  18. }
  19. parser[kOnMessageComplete] = function() {
  20. console.log('kOnMessageComplete');
  21. }
  22. parser[kOnExecute] = function(a,b) {
  23. console.log('kOnExecute,解析的字节数:',a);
  24. }
  25. // 启动一个服务器
  26. net.createServer((socket) => {
  27. parser.consume(socket._handle);
  28. }).listen(80);
  29. // 启动一个客户端
  30. setTimeout(() => {
  31. var socket = net.connect({port: 80});
  32. socket.end('GET / HTTP/1.1\r\n' +
  33. 'Host: http://localhost\r\n' +
  34. 'content-length: 1\r\n\r\n'+
  35. '1');
  36. }, 1000);

我们需要调用parser.consume方法并且传入一个isStreamBase的流(stream_base.cc定义),才会触发kOnExecute。因为kOnExecute是在StreamBase流可读时触发的。

18.2 HTTP客户端

我们首先看一下使用Node.js作为客户端的例子。

  1. const data = querystring.stringify({
  2. 'msg': 'hi'
  3. });
  4. const options = {
  5. hostname: 'your domain',
  6. path: '/',
  7. method: 'POST',
  8. headers: {
  9. 'Content-Type': 'application/x-www-form-urlencoded',
  10. 'Content-Length': Buffer.byteLength(data)
  11. }
  12. };
  13. const req = http.request(options, (res) => {
  14. res.setEncoding('utf8');
  15. res.on('data', (chunk) => {
  16. console.log(`${chunk}`);
  17. });
  18. res.on('end', () => {
  19. console.log('end');
  20. });
  21. });
  22. req.on('error', (e) => {
  23. console.error(`${e.message}`);
  24. });
  25. // 发送请求的数据
  26. req.write(data);
  27. // 设置请求结束
  28. req.end();

我们看一下http.request的实现。

  1. function request(url, options, cb) {
  2. return new ClientRequest(url, options, cb);
  3. }

HTTP客户端通过_http_client.js的ClientRequest实现,ClientRequest的代码非常多,我们只分析核心的流程。我们看初始化一个请求的逻辑。

  1. function ClientRequest(input, options, cb) {
  2. // 继承OutgoingMessage
  3. OutgoingMessage.call(this);
  4. // 是否使用agent
  5. let agent = options.agent;
  6. // 忽略agent的处理,具体参考_http_agent.js,主要用于复用TCP连接
  7. this.agent = agent;
  8. // 建立连接的超时时间
  9. if (options.timeout !== undefined)
  10. this.timeout = getTimerDuration(options.timeout, 'timeout');
  11. // HTTP头个数的阈值
  12. const maxHeaderSize = options.maxHeaderSize;
  13. this.maxHeaderSize = maxHeaderSize;
  14. // 监听响应事件
  15. if (cb) {
  16. this.once('response', cb);
  17. }
  18. // 忽略设置http协议的请求行或请求头的逻辑
  19. // 建立TCP连接后的回调
  20. const oncreate = (err, socket) => {
  21. if (called)
  22. return;
  23. called = true;
  24. if (err) {
  25. process.nextTick(() => this.emit('error', err));
  26. return;
  27. }
  28. // 建立连接成功,执行回调
  29. this.onSocket(socket);
  30. // 连接成功后发送数据
  31. this._deferToConnect(null, null, () => this._flush());
  32. };
  33. // 使用agent时,socket由agent提供,否则自己创建socket
  34. if (this.agent) {
  35. this.agent.addRequest(this, options);
  36. } else {
  37. // 不使用agent则每次创建一个socket,默认使用net模块的接口
  38. if (typeof options.createConnection === 'function') {
  39. const newSocket = options.createConnection(options,
  40. oncreate);
  41. if (newSocket && !called) {
  42. called = true;
  43. this.onSocket(newSocket);
  44. } else {
  45. return;
  46. }
  47. } else {
  48. this.onSocket(net.createConnection(options));
  49. }
  50. }
  51. // 连接成功后发送待缓存的数据
  52. this._deferToConnect(null, null, () => this._flush());
  53. }

获取一个ClientRequest实例后,不管是通过agent还是自己创建一个TCP连接,在连接成功后都会执行onSocket。

  1. // socket可用时的回调
  2. ClientRequest.prototype.onSocket = function onSocket(socket) {
  3. process.nextTick(onSocketNT, this, socket);
  4. };
  5. function onSocketNT(req, socket) {
  6. // 申请socket过程中,请求已经终止
  7. if (req.aborted) {
  8. // 不使用agent,直接销毁socekt
  9. if (!req.agent) {
  10. socket.destroy();
  11. } else {
  12. // 使用agent触发free事件,由agent处理socekt
  13. req.emit('close');
  14. socket.emit('free');
  15. }
  16. } else {
  17. // 处理socket
  18. tickOnSocket(req, socket);
  19. }
  20. }

我们继续看tickOnSocket

  1. // 初始化HTTP解析器和注册data事件等,等待响应
  2. function tickOnSocket(req, socket) {
  3. // 分配一个HTTP解析器
  4. const parser = parsers.alloc();
  5. req.socket = socket;
  6. // 初始化,处理响应报文
  7. parser.initialize(HTTPParser.RESPONSE,
  8. new HTTPClientAsyncResource('HTTPINCOMINGMESSAGE', req), req.maxHeaderSize || 0,
  9. req.insecureHTTPParser === undefined ?
  10. isLenient() : req.insecureHTTPParser);
  11. parser.socket = socket;
  12. parser.outgoing = req;
  13. req.parser = parser;
  14. socket.parser = parser;
  15. // socket正处理的请求
  16. socket._httpMessage = req;
  17. // Propagate headers limit from request object to parser
  18. if (typeof req.maxHeadersCount === 'number') {
  19. parser.maxHeaderPairs = req.maxHeadersCount << 1;
  20. }
  21. // 解析完HTTP头部的回调
  22. parser.onIncoming = parserOnIncomingClient;
  23. socket.removeListener('error', freeSocketErrorListener);
  24. socket.on('error', socketErrorListener);
  25. socket.on('data', socketOnData);
  26. socket.on('end', socketOnEnd);
  27. socket.on('close', socketCloseListener);
  28. socket.on('drain', ondrain);
  29. if (
  30. req.timeout !== undefined ||
  31. (req.agent && req.agent.options &&
  32. req.agent.options.timeout)
  33. ) {
  34. // 处理超时时间
  35. listenSocketTimeout(req);
  36. }
  37. req.emit('socket', socket);
  38. }

拿到一个socket后,就开始监听socket上http报文的到来。并且申请一个HTTP解析器准备解析http报文,我们主要分析超时时间的处理和data事件的处理逻辑。
1 超时时间的处理

  1. function listenSocketTimeout(req) {
  2. // 设置过了则返回
  3. if (req.timeoutCb) {
  4. return;
  5. }
  6. // 超时回调
  7. req.timeoutCb = emitRequestTimeout;
  8. // Delegate socket timeout event.
  9. // 设置socket的超时时间,即socket上一定时间后没有响应则触发超时
  10. if (req.socket) {
  11. req.socket.once('timeout', emitRequestTimeout);
  12. } else {
  13. req.on('socket', (socket) => {
  14. socket.once('timeout', emitRequestTimeout);
  15. });
  16. }
  17. }
  18. function emitRequestTimeout() {
  19. const req = this._httpMessage;
  20. if (req) {
  21. req.emit('timeout');
  22. }
  23. }

2 处理响应数据

  1. function socketOnData(d) {
  2. const socket = this;
  3. const req = this._httpMessage;
  4. const parser = this.parser;
  5. // 交给HTTP解析器处理
  6. const ret = parser.execute(d);
  7. // ...
  8. }

当Node.js收到响应报文时,会把数据交给HTTP解析器处理。http解析在解析的过程中会不断触发钩子函数。我们看一下JS层各个钩子函数的逻辑。
1 解析头部过程中执行的回调

  1. function parserOnHeaders(headers, url) {
  2. // 保存头和url
  3. if (this.maxHeaderPairs <= 0 ||
  4. this._headers.length < this.maxHeaderPairs) {
  5. this._headers = this._headers.concat(headers);
  6. }
  7. this._url += url;
  8. }

2 解析完头部的回调

  1. function parserOnHeadersComplete(versionMajor,
  2. versionMinor,
  3. headers,
  4. method,
  5. url,
  6. statusCode,
  7. statusMessage,
  8. upgrade,
  9. shouldKeepAlive) {
  10. const parser = this;
  11. const { socket } = parser;
  12. // 剩下的HTTP头
  13. if (headers === undefined) {
  14. headers = parser._headers;
  15. parser._headers = [];
  16. }
  17. if (url === undefined) {
  18. url = parser._url;
  19. parser._url = '';
  20. }
  21. // Parser is also used by http client
  22. // IncomingMessage
  23. const ParserIncomingMessage=(socket &&
  24. socket.server &&
  25. socket.server[kIncomingMessage]
  26. ) ||
  27. IncomingMessage;
  28. // 新建一个IncomingMessage对象
  29. const incoming = parser.incoming = new ParserIncomingMessage(socket);
  30. incoming.httpVersionMajor = versionMajor;
  31. incoming.httpVersionMinor = versionMinor;
  32. incoming.httpVersion = `${versionMajor}.${versionMinor}`;
  33. incoming.url = url;
  34. incoming.upgrade = upgrade;
  35. let n = headers.length;
  36. // If parser.maxHeaderPairs <= 0 assume that there's no limit.
  37. if (parser.maxHeaderPairs > 0)
  38. n = MathMin(n, parser.maxHeaderPairs);
  39. // 更新到保存HTTP头的对象
  40. incoming._addHeaderLines(headers, n);
  41. // 请求方法或响应行信息
  42. if (typeof method === 'number') {
  43. // server only
  44. incoming.method = methods[method];
  45. } else {
  46. // client only
  47. incoming.statusCode = statusCode;
  48. incoming.statusMessage = statusMessage;
  49. }
  50. // 执行回调
  51. return parser.onIncoming(incoming, shouldKeepAlive);
  52. }

我们看到解析完头部后会执行另一个回调onIncoming,并传入IncomingMessage实例,这就是我们平时使用的res。在前面分析过,onIncoming设置的值是parserOnIncomingClient。

  1. function parserOnIncomingClient(res, shouldKeepAlive) {
  2. const socket = this.socket;
  3. // 请求对象
  4. const req = socket._httpMessage;
  5. // 服务器发送了多个响应
  6. if (req.res) {
  7. socket.destroy();
  8. return 0;
  9. }
  10. req.res = res;
  11. if (statusIsInformational(res.statusCode)) {
  12. req.res = null;
  13. // 请求时设置了expect头,则响应码为100,可以继续发送数据
  14. if (res.statusCode === 100) {
  15. req.emit('continue');
  16. }
  17. return 1;
  18. }
  19. req.res = res;
  20. res.req = req;
  21. // 等待响应结束,响应结束后会清除定时器
  22. res.on('end', responseOnEnd);
  23. // 请求终止了或触发response事件,返回false说明没有监听response事件,则丢弃数据
  24. if (req.aborted || !req.emit('response', res))
  25. res._dump();
  26. }

从源码中我们看出在解析完HTTP响应头时,就执行了http.request设置的回调函数。例如下面代码中的回调。

  1. http. request('domain', { agent }, (res) => {
  2. // 解析body
  3. res.on('data', (data) => {
  4. //
  5. });
  6. // 解析body结束,响应结束
  7. res.on('end', (data) => {
  8. //
  9. });
  10. });
  11. // ...

在回调里我们可以把res作为一个流使用,在解析完HTTP头后,HTTP解析器会继续解析HTTP body。我们看一下HTTP解析器在解析body过程中执行的回调。

  1. function parserOnBody(b, start, len) {
  2. const stream = this.incoming;
  3. if (len > 0 && !stream._dumped) {
  4. const slice = b.slice(start, start + len);
  5. // 把数据push到流中,流会触发data事件
  6. const ret = stream.push(slice);
  7. // 数据过载,暂停接收
  8. if (!ret)
  9. readStop(this.socket);
  10. }
  11. }

最后我们再看一下解析完body时HTTP解析器执行的回调。

  1. function parserOnMessageComplete() {
  2. const parser = this;
  3. const stream = parser.incoming;
  4. if (stream !== null) {
  5. // body解析完了
  6. stream.complete = true;
  7. // 在body后可能有trailer头,保存下来
  8. const headers = parser._headers;
  9. if (headers.length) {
  10. stream._addHeaderLines(headers, headers.length);
  11. parser._headers = [];
  12. parser._url = '';
  13. }
  14. // 流结束
  15. stream.push(null);
  16. }
  17. // 读取下一个响应,如果有的话
  18. readStart(parser.socket);
  19. }

我们看到在解析body过程中会不断往流中push数据,从而不断触发res的data事件,最后解析body结束后,通过push(null)通知流结束,从而触发res.end事件。我们沿着onSocket函数分析完处理响应后我们再来分析请求的过程。执行完http.request后我们会得到一个标记请求的实例。然后执行它的write方法发送数据。

  1. OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {
  2. const ret = write_(this, chunk, encoding, callback, false);
  3. // 返回false说明需要等待drain事件
  4. if (!ret)
  5. this[kNeedDrain] = true;
  6. return ret;
  7. };
  8. function write_(msg, chunk, encoding, callback, fromEnd) {
  9. // 还没有设置this._header字段,则把请求行和HTTP头拼接到this._header字段
  10. if (!msg._header) {
  11. msg._implicitHeader();
  12. }
  13. let ret;
  14. // chunk模式则需要额外加一下字段,否则直接发送
  15. if (msg.chunkedEncoding && chunk.length !== 0) {
  16. let len;
  17. if (typeof chunk === 'string')
  18. len = Buffer.byteLength(chunk, encoding);
  19. else
  20. len = chunk.length;
  21. /*
  22. chunk模式时,http报文的格式如下
  23. chunk长度 回车换行
  24. 数据 回车换行
  25. */
  26. msg._send(len.toString(16), 'latin1', null);
  27. msg._send(crlf_buf, null, null);
  28. msg._send(chunk, encoding, null);
  29. ret = msg._send(crlf_buf, null, callback);
  30. } else {
  31. ret = msg._send(chunk, encoding, callback);
  32. }
  33. return ret;
  34. }

我们接着看_send函数

  1. OutgoingMessage.prototype._send = function _send(data, encoding, callback) {
  2. // 头部还没有发送
  3. if (!this._headerSent) {
  4. // 是字符串则追加到头部,this._header保存了HTTP请求行和HTTP头
  5. if (typeof data === 'string' &&
  6. (encoding === 'utf8' ||
  7. encoding === 'latin1' ||
  8. !encoding)) {
  9. data = this._header + data;
  10. } else {
  11. // 否则缓存起来
  12. const header = this._header;
  13. // HTTP头需要放到最前面
  14. if (this.outputData.length === 0) {
  15. this.outputData = [{
  16. data: header,
  17. encoding: 'latin1',
  18. callback: null
  19. }];
  20. } else {
  21. this.outputData.unshift({
  22. data: header,
  23. encoding: 'latin1',
  24. callback: null
  25. });
  26. }
  27. // 更新缓存大小
  28. this.outputSize += header.length;
  29. this._onPendingData(header.length);
  30. }
  31. // 已经在排队等待发送了,不能修改
  32. this._headerSent = true;
  33. }
  34. return this._writeRaw(data, encoding, callback);
  35. };

我们继续看_writeRaw

  1. OutgoingMessage.prototype._writeRaw = function _writeRaw(data, encoding, callback) {
  2. // 可写的时候直接发送
  3. if (conn && conn._httpMessage === this && conn.writable) {
  4. // There might be pending data in the this.output buffer.
  5. // 如果有缓存的数据则先发送缓存的数据
  6. if (this.outputData.length) {
  7. this._flushOutput(conn);
  8. }
  9. // 接着发送当前需要发送的
  10. return conn.write(data, encoding, callback);
  11. }
  12. // 否先缓存
  13. this.outputData.push({ data, encoding, callback });
  14. this.outputSize += data.length;
  15. this._onPendingData(data.length);
  16. return this.outputSize < HIGH_WATER_MARK;
  17. }
  18. OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
  19. // 之前设置了加塞,则操作socket先积攒数据
  20. while (this[kCorked]) {
  21. this[kCorked]--;
  22. socket.cork();
  23. }
  24. const outputLength = this.outputData.length;
  25. if (outputLength <= 0)
  26. return undefined;
  27. const outputData = this.outputData;
  28. socket.cork();
  29. // 把缓存的数据写到socket
  30. let ret;
  31. for (let i = 0; i < outputLength; i++) {
  32. const { data, encoding, callback } = outputData[i];
  33. ret = socket.write(data, encoding, callback);
  34. }
  35. socket.uncork();
  36. this.outputData = [];
  37. this._onPendingData(-this.outputSize);
  38. this.outputSize = 0;
  39. return ret;
  40. };

写完数据后,我们还需要执行end函数标记HTTP请求的结束。

  1. OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
  2. // 还没结束
  3. // 加塞
  4. if (this.socket) {
  5. this.socket.cork();
  6. }
  7. // 流结束后回调
  8. if (typeof callback === 'function')
  9. this.once('finish', callback);
  10. // 数据写入底层后的回调
  11. const finish = onFinish.bind(undefined, this);
  12. // chunk模式后面需要发送一个0\r\n结束标记,否则不需要结束标记
  13. if (this._hasBody && this.chunkedEncoding) {
  14. this._send('0\r\n' +
  15. this._trailer + '\r\n', 'latin1', finish);
  16. } else {
  17. this._send('', 'latin1', finish);
  18. }
  19. // uncork解除塞子,发送数据
  20. if (this.socket) {
  21. // Fully uncork connection on end().
  22. this.socket._writableState.corked = 1;
  23. this.socket.uncork();
  24. }
  25. this[kCorked] = 0;
  26. // 标记执行了end
  27. this.finished = true;
  28. // 数据发完了
  29. if (this.outputData.length === 0 &&
  30. this.socket &&
  31. this.socket._httpMessage === this) {
  32. this._finish();
  33. }
  34. return this;
  35. };

18.3 HTTP服务器

本节我们来分析使用Node.js作为服务器的例子。

  1. const http = require('http');
  2. http.createServer((req, res) => {
  3. res.write('hello');
  4. res.end();
  5. })
  6. .listen(3000);

接着我们沿着createServer分析Node.js作为服务器的原理。

  1. function createServer(opts, requestListener) {
  2. return new Server(opts, requestListener);
  3. }

我们看Server的实现

  1. function Server(options, requestListener) {
  2. // 可以自定义表示请求的对象和响应的对象
  3. this[kIncomingMessage] = options.IncomingMessage || IncomingMessage;
  4. this[kServerResponse] = options.ServerResponse || ServerResponse;
  5. // HTTP头个数的阈值
  6. const maxHeaderSize = options.maxHeaderSize;
  7. this.maxHeaderSize = maxHeaderSize;
  8. // 允许半关闭
  9. net.Server.call(this, { allowHalfOpen: true });
  10. // 有请求时的回调
  11. if (requestListener) {
  12. this.on('request', requestListener);
  13. }
  14. // 服务器socket读端关闭时是否允许继续处理队列里的响应(tcp上有多个请求,管道化)
  15. this.httpAllowHalfOpen = false;
  16. // 有连接时的回调,由net模块触发
  17. this.on('connection', connectionListener);
  18. // 服务器下所有请求和响应的超时时间
  19. this.timeout = 0;
  20. // 同一个TCP连接上,两个请求之前最多间隔的时间
  21. this.keepAliveTimeout = 5000;
  22. this.maxHeadersCount = null;
  23. // 解析头部的超时时间,防止ddos
  24. this.headersTimeout = 60 * 1000; // 60 seconds
  25. }

接着调用listen函数,因为HTTP Server继承于net.Server,net.Server的listen函数前面我们已经分析过,就不再分析。当有请求到来时,会触发connection事件。从而执行connectionListener。

  1. function connectionListener(socket) {
  2. defaultTriggerAsyncIdScope(
  3. getOrSetAsyncId(socket), connectionListenerInternal, this, socket
  4. );
  5. }
  6. // socket表示新连接
  7. function connectionListenerInternal(server, socket) {
  8. // socket所属server
  9. socket.server = server;
  10. // 设置连接的超时时间,超时处理函数为socketOnTimeout
  11. if (server.timeout && typeof socket.setTimeout === 'function') socket.setTimeout(server.timeout);
  12. socket.on('timeout', socketOnTimeout);
  13. // 分配一个HTTP解析器
  14. const parser = parsers.alloc();
  15. // 解析请求报文
  16. parser.initialize(
  17. HTTPParser.REQUEST,
  18. new HTTPServerAsyncResource('HTTPINCOMINGMESSAGE', socket),
  19. server.maxHeaderSize || 0,
  20. server.insecureHTTPParser === undefined ?
  21. isLenient() : server.insecureHTTPParser,
  22. );
  23. parser.socket = socket;
  24. // 记录开始解析头部的开始时间
  25. parser.parsingHeadersStart = nowDate();
  26. socket.parser = parser;
  27. if (typeof server.maxHeadersCount === 'number') {
  28. parser.maxHeaderPairs = server.maxHeadersCount << 1;
  29. }
  30. const state = {
  31. onData: null,
  32. onEnd: null,
  33. onClose: null,
  34. onDrain: null,
  35. // 同一TCP连接上,请求和响应的的队列,线头阻塞的原理
  36. outgoing: [],
  37. incoming: [],
  38. // 待发送的字节数,如果超过阈值,则先暂停接收请求的数据
  39. outgoingData: 0,
  40. /*
  41. 是否重新设置了timeout,用于响应一个请求时,
  42. 标记是否重新设置超时时间的标记
  43. */
  44. keepAliveTimeoutSet: false
  45. };
  46. // 监听tcp上的数据,开始解析http报文
  47. state.onData = socketOnData.bind(undefined,
  48. server,
  49. socket,
  50. parser,
  51. state);
  52. state.onEnd = socketOnEnd.bind(undefined,
  53. server,
  54. socket,
  55. parser,
  56. state);
  57. state.onClose = socketOnClose.bind(undefined, socket, state);
  58. state.onDrain = socketOnDrain.bind(undefined, socket, state);
  59. socket.on('data', state.onData);
  60. socket.on('error', socketOnError);
  61. socket.on('end', state.onEnd);
  62. socket.on('close', state.onClose);
  63. socket.on('drain', state.onDrain);
  64. // 解析HTTP头部完成后执行的回调
  65. parser.onIncoming = parserOnIncoming.bind(undefined,
  66. server,
  67. socket,
  68. state);
  69. socket.on('resume', onSocketResume);
  70. socket.on('pause', onSocketPause);
  71. /*
  72. 如果handle是继承StreamBase的流则执行consume消费http
  73. 请求报文,而不是上面的onData,tcp模块的isStreamBase为true
  74. */
  75. if (socket._handle && socket._handle.isStreamBase &&
  76. !socket._handle._consumed) {
  77. parser._consumed = true;
  78. socket._handle._consumed = true;
  79. parser.consume(socket._handle);
  80. }
  81. parser[kOnExecute] =
  82. onParserExecute.bind(undefined,
  83. server,
  84. socket,
  85. parser,
  86. state);
  87. socket._paused = false;
  88. }

执行完connectionListener后就开始等待tcp上数据的到来,即HTTP请求报文。上面代码中Node.js监听了socket的data事件,同时注册了钩子kOnExecute。data事件我们都知道是流上有数据到来时触发的事件。我们看一下socketOnData做了什么事情。

  1. function socketOnData(server, socket, parser, state, d) {
  2. // 交给HTTP解析器处理,返回已经解析的字节数
  3. const ret = parser.execute(d);
  4. onParserExecuteCommon(server, socket, parser, state, ret, d);
  5. }

socketOnData的处理逻辑是当socket上有数据,然后交给HTTP解析器处理。这看起来没什么问题,那么kOnExecute是做什么的呢?kOnExecute钩子函数的值是onParserExecute,这个看起来也是解析tcp上的数据的,看起来和onSocketData是一样的作用,难道tcp上的数据有两个消费者?我们看一下kOnExecute什么时候被回调的。

  1. void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override {
  2. Local<Value> ret = Execute(buf.base, nread);
  3. Local<Value> cb =
  4. object()->Get(env()->context(), kOnExecute).ToLocalChecked();
  5. MakeCallback(cb.As<Function>(), 1, &ret);
  6. }

OnStreamRead是node_http_parser.cc实现的函数,所以kOnExecute在node_http_parser.cc中的OnStreamRead中被回调,那么OnStreamRead又是什么时候被回调的呢?在C层章节我们分析过,OnStreamRead是Node.js中C层流操作的通用函数,当流有数据的时候就会执行该回调。而且OnStreamRead中也会把数据交给HTTP解析器解析。这看起来真的有两个消费者?这就很奇怪,为什么一份数据会交给HTTP解析器处理两次?

  1. if (socket._handle && socket._handle.isStreamBase && !socket._handle._consumed) {
  2. parser._consumed = true;
  3. socket._handle._consumed = true;
  4. parser.consume(socket._handle);
  5. }

因为TCP流是继承StreamBase类的,所以if成立。我们看一下consume的实现。

  1. static void Consume(const FunctionCallbackInfo<Value>& args) {
  2. Parser* parser;
  3. ASSIGN_OR_RETURN_UNWRAP(&parser, args.Holder());
  4. CHECK(args[0]->IsObject());
  5. StreamBase* stream = StreamBase::FromObjject(args[0].As<Object>());
  6. CHECK_NOT_NULL(stream);
  7. stream->PushStreamListener(parser);
  8. }

HTTP解析器把自己注册为TCP stream的一个listener。这会使得TCP流上的数据由node_http_parser.cc的OnStreamRead直接消费,而不是触发onData事件。在OnStreamRead中会源源不断地把数据交给HTTP解析器处理,在解析的过程中,会不断触发对应的钩子函数,直到解析完HTTP头部后执行parserOnIncoming。

  1. function parserOnIncoming(server, socket, state, req, keepAlive) {
  2. // 需要重置定时器
  3. resetSocketTimeout(server, socket, state);
  4. // 设置了keepAlive则响应后需要重置一些状态
  5. if (server.keepAliveTimeout > 0) {
  6. req.on('end', resetHeadersTimeoutOnReqEnd);
  7. }
  8. // 标记头部解析完毕
  9. socket.parser.parsingHeadersStart = 0;
  10. // 请求入队(待处理的请求队列)
  11. state.incoming.push(req);
  12. if (!socket._paused) {
  13. const ws = socket._writableState;
  14. // 待发送的数据过多,先暂停接收请求数据
  15. if (ws.needDrain ||
  16. state.outgoingData >= socket.writableHighWaterMark) {
  17. socket._paused = true;
  18. socket.pause();
  19. }
  20. }
  21. // 新建一个表示响应的对象
  22. const res = new server[kServerResponse](req);
  23. // 设置数据写入待发送队列时触发的回调,见OutgoingMessage
  24. res._onPendingData = updateOutgoingData.bind(undefined,
  25. socket,
  26. state);
  27. // 根据请求的HTTP头设置是否支持keepalive(管道化)
  28. res.shouldKeepAlive = keepAlive;
  29. /*
  30. socket当前已经在处理其它请求的响应,则先排队,
  31. 否则挂载响应对象到socket,作为当前处理的响应
  32. */
  33. if (socket._httpMessage) {
  34. state.outgoing.push(res);
  35. } else {
  36. res.assignSocket(socket);
  37. }
  38. // 响应处理完毕后,需要做一些处理
  39. res.on('finish',
  40. resOnFinish.bind(undefined,
  41. req,
  42. res,
  43. socket,
  44. state,
  45. server));
  46. // 有expect请求头,并且是http1.1
  47. if (req.headers.expect !== undefined &&
  48. (req.httpVersionMajor === 1 &&
  49. req.httpVersionMinor === 1)
  50. ) {
  51. // Expect头的值是否是100-continue
  52. if (continueExpression.test(req.headers.expect)) {
  53. res._expect_continue = true;
  54. /*
  55. 监听了checkContinue事件则触发,
  56. 否则直接返回允许继续请求并触发request事件
  57. */
  58. if (server.listenerCount('checkContinue') > 0) {
  59. server.emit('checkContinue', req, res);
  60. } else {
  61. res.writeContinue();
  62. server.emit('request', req, res);
  63. }
  64. } else if (server.listenerCount('checkExpectation') > 0) {
  65. /*
  66. 值异常,监听了checkExpectation事件,
  67. 则触发,否则返回417拒绝请求
  68. */
  69. server.emit('checkExpectation', req, res);
  70. } else {
  71. res.writeHead(417);
  72. res.end();
  73. }
  74. } else {
  75. // 触发request事件说明有请求到来
  76. server.emit('request', req, res);
  77. }
  78. return 0; // No special treatment.
  79. }

我们看到这里会触发request事件通知用户有新请求到来,用户就可以处理请求了。我们看到Node.js解析头部的时候就会执行上层回调,通知有新请求到来,并传入request和response作为参数,分别对应的是表示请求和响应的对象。另外Node.js本身是不会解析body部分的,我们可以通过以下方式获取body的数据。

  1. const server = http.createServer((request, response) => {
  2. request.on('data', (chunk) => {
  3. // 处理body
  4. });
  5. request.on('end', () => {
  6. // body结束
  7. });
  8. })

18.3.1 HTTP管道化的原理和实现

HTTP1.0的时候,不支持管道化,客户端发送一个请求的时候,首先建立TCP连接,然后服务器返回一个响应,最后断开TCP连接,这种是最简单的实现方式,但是每次发送请求都需要走三次握手显然会带来一定的时间损耗,所以HTTP1.1的时候,支持了管道化。管道化的意思就是可以在一个TCP连接上发送多个请求,这样服务器就可以同时处理多个请求,但是由于HTTP1.1的限制,多个请求的响应需要按序返回。因为在HTTP1.1中,没有标记请求和响应的对应关系。所以HTTP客户端会假设第一个返回的响应是对应第一个请求的。如果乱序返回,就会导致问题,如图18-2所示。
18-HTTP - 图1
图18-2
而在HTTP 2.0中,每个请求会分配一个id,响应中也会返回对应的id,这样就算乱序返回,HTTP客户端也可以知道响应所对应的请求。在HTTP 1.1这种情况下,HTTP服务器的实现就会变得复杂,服务器可以以串行的方式处理请求,当前面请求的响应返回到客户端后,再继续处理下一个请求,这种实现方式是相对简单的,但是很明显,这种方式相对来说还是比较低效的,另一种实现方式是并行处理请求,串行返回,这样可以让请求得到尽快的处理,比如两个请求都访问数据库,那并行处理两个请求就会比串行快得多,但是这种实现方式相对比较复杂,Node.js就是属于这种方式,下面我们来看一下Node.js中是如何实现的。前面分析过,Node.js在解析完HTTP头部的时候会执行parserOnIncoming。

  1. function parserOnIncoming(server, socket, state, req, keepAlive) {
  2. // 标记头部解析完毕
  3. socket.parser.parsingHeadersStart = 0;
  4. // 请求入队
  5. state.incoming.push(req);
  6. // 新建一个表示响应的对象,一般是ServerResponse
  7. const res = new server[kServerResponse](req);
  8. /*
  9. socket当前已经在处理其它请求的响应,则先排队,
  10. 否则挂载响应对象到socket,作为当前处理的响应
  11. */
  12. if (socket._httpMessage) {
  13. state.outgoing.push(res);
  14. } else {
  15. res.assignSocket(socket); // socket._httpMessage = res;
  16. }
  17. // 响应处理完毕后,需要做一些处理
  18. res.on('finish', resOnFinish.bind(undefined,
  19. req,
  20. res,
  21. socket,
  22. state,
  23. server));
  24. // 触发request事件说明有请求到来
  25. server.emit('request', req, res);
  26. return 0;
  27. }

当Node.js解析HTTP请求头完成后,就会创建一个ServerResponse对象表示响应。然后判断当前是否有正在处理的响应,如果有则排队等待处理,否则把新建的ServerResponse对象作为当前需要处理的响应。最后触发request事件通知用户层。用户就可以进行请求的处理了。我们看到Node.js维护了两个队列,分别是请求和响应队列,如图18-3所示。
18-HTTP - 图2
图18-3
当前处理的请求在请求队列的队首,该请求对应的响应会挂载到socket的_httpMessage属性上。但是我们看到Node.js会触发request事件通知用户有新请求到来,所有在管道化的情况下,Node.js会并行处理多个请求(如果是cpu密集型的请求则实际上还是会变成串行,这和Node.js的单线程相关)。那Node.js是如何控制响应的顺序的呢?我们知道每次触发request事件的时候,我们都会执行一个函数。比如下面的代码。

  1. http.createServer((req, res) => {
  2. // 一些网络IO
  3. res.writeHead(200, { 'Content-Type': 'text/plain' });
  4. res.end('okay');
  5. });

我们看到每个请求的处理是独立的。假设每个请求都去操作数据库,如果请求2比请求1先完成数据库的操作,从而请求2先执行res.write和res.end。那岂不是请求2先返回?我们看一下ServerResponse和OutgoingMessage的实现,揭开迷雾。ServerResponse是OutgoingMessage的子类。write函数是在OutgoingMessage中实现的,write的调用链路很长,我们不层层分析,直接看最后的节点。

  1. function _writeRaw(data, encoding, callback) {
  2. const conn = this.socket;
  3. // socket对应的响应是自己并且可写
  4. if (conn && conn._httpMessage === this && conn.writable) {
  5. // 如果有缓存的数据则先发送缓存的数据
  6. if (this.outputData.length) {
  7. this._flushOutput(conn);
  8. }
  9. // 接着发送当前需要发送的
  10. return conn.write(data, encoding, callback);
  11. }
  12. // socket当前处理的响应对象不是自己,则先缓存数据。
  13. this.outputData.push({ data, encoding, callback });
  14. this.outputSize += data.length;
  15. this._onPendingData(data.length);
  16. return this.outputSize < HIGH_WATER_MARK;
  17. }

我们看到我们调用res.write的时候,Node.js会首先判断,res是不是属于当前处理中响应,如果是才会真正发送数据,否则会先把数据缓存起来。分析到这里,相信大家已经差不多明白Node.js是如何控制响应按序返回的。最后我们看一下这些缓存的数据什么时候会被发送出去。前面代码已经贴过,当一个响应结束的时候,Node.js会做一些处理。

  1. res.on('finish', resOnFinish.bind(undefined,
  2. req,
  3. res,
  4. socket,
  5. state,
  6. server));

我们看看resOnFinish

  1. function resOnFinish(req, res, socket, state, server) {
  2. // 删除响应对应的请求
  3. state.incoming.shift();
  4. clearIncoming(req);
  5. // 解除socket上挂载的响应对象
  6. res.detachSocket(socket);
  7. req.emit('close');
  8. process.nextTick(emitCloseNT, res);
  9. // 是不是最后一个响应
  10. if (res._last) {
  11. // 是则销毁socket
  12. if (typeof socket.destroySoon === 'function') {
  13. socket.destroySoon();
  14. } else {
  15. socket.end();
  16. }
  17. } else if (state.outgoing.length === 0) {
  18. /*
  19. 没有待处理的响应了,则重新设置超时时间,
  20. 等待请求的到来,一定时间内没有请求则触发timeout事件
  21. */
  22. if (server.keepAliveTimeout &&
  23. typeof socket.setTimeout === 'function') {
  24. socket.setTimeout(server.keepAliveTimeout);
  25. state.keepAliveTimeoutSet = true;
  26. }
  27. } else {
  28. // 获取下一个要处理的响应
  29. const m = state.outgoing.shift();
  30. // 挂载到socket作为当前处理的响应
  31. if (m) {
  32. m.assignSocket(socket);
  33. }
  34. }
  35. }

我们看到,Node.js处理完一个响应后,会做一些判断。分别有三种情况,我们分开分析。
1 是否是最后一个响应
什么情况下,会被认为是最后一个响应的?因为响应和请求是一一对应的,最后一个响应就意味着最后一个请求了,那么什么时候被认为是最后一个请求呢?当非管道化的情况下,一个请求一个响应,然后关闭TCP连接,所以非管道化的情况下,tcp上的第一个也是唯一一个请求就是最后一个请求。在管道化的情况下,理论上就没有所谓的最后一个响应。但是实现上会做一些限制。在管道化的情况下,每一个响应可以通过设置HTTP响应头connection来定义是否发送该响应后就断开连接,我们看一下Node.js的实现。

  1. // 是否显示删除过connection头,是则响应后断开连接,并标记当前响应是最后一个
  2. if (this._removedConnection) {
  3. this._last = true;
  4. this.shouldKeepAlive = false;
  5. } else if (!state.connection) {
  6. /*
  7. 没有显示设置了connection头,则取默认行为
  8. 1 Node.js的shouldKeepAlive默认为true,也可以根据请求报文里
  9. 的connection头定义
  10. 2 设置content-length或使用chunk模式才能区分响应报文编边界,
  11. 才能支持keepalive
  12. 3 使用了代理,代理是复用TCP连接的,支持keepalive
  13. */
  14. const shouldSendKeepAlive = this.shouldKeepAlive &&
  15. (state.contLen ||
  16. this.useChunkedEncodingByDefault ||
  17. this.agent);
  18. if (shouldSendKeepAlive) {
  19. header += 'Connection: keep-alive\r\n';
  20. } else {
  21. this._last = true;
  22. header += 'Connection: close\r\n';
  23. }
  24. }

另外当读端关闭的时候,也被认为是最后一个请求,毕竟不会再发送请求了。我们看一下读端关闭的逻辑。

  1. function socketOnEnd(server, socket, parser, state) {
  2. const ret = parser.finish();
  3. if (ret instanceof Error) {
  4. socketOnError.call(socket, ret);
  5. return;
  6. }
  7. // 不允许半开关则终止请求的处理,不响应,关闭写端
  8. if (!server.httpAllowHalfOpen) {
  9. abortIncoming(state.incoming);
  10. if (socket.writable) socket.end();
  11. } else if (state.outgoing.length) {
  12. /*
  13. 允许半开关,并且还有响应需要处理,
  14. 标记响应队列最后一个节点为最后的响应,
  15. 处理完就关闭socket写端
  16. */
  17. state.outgoing[state.outgoing.length - 1]._last = true;
  18. } else if (socket._httpMessage) {
  19. /*
  20. 没有等待处理的响应了,但是还有正在处理的响应,
  21. 则标记为最后一个响应
  22. */
  23. socket._httpMessage._last = true;
  24. } else if (socket.writable) {
  25. // 否则关闭socket写端
  26. socket.end();
  27. }
  28. }

以上就是Node.js中判断是否是最后一个响应的情况,如果一个响应被认为是最后一个响应,那么发送响应后就会关闭连接。
2 响应队列为空
我们继续看一下如果不是最后一个响应的时候,Node.js又是怎么处理的。如果当前的待处理响应队列为空,说明当前处理的响应是目前最后一个需要处理的,但是不是TCP连接上最后一个响应,这时候,Node.js会设置超时时间,如果超时还没有新的请求,则Node.js会关闭连接。
3 响应队列非空
如果当前待处理队列非空,处理完当前请求后会继续处理下一个响应。并从队列中删除该响应。我们看一下Node.js是如何处理下一个响应的。

  1. // 把响应对象挂载到socket,标记socket当前正在处理的响应
  2. ServerResponse.prototype.assignSocket = function assignSocket(socket) {
  3. // 挂载到socket上,标记是当前处理的响应
  4. socket._httpMessage = this;
  5. socket.on('close', onServerResponseClose);
  6. this.socket = socket;
  7. this.emit('socket', socket);
  8. this._flush();
  9. };

我们看到Node.js是通过_httpMessage标记当前处理的响应的,配合响应队列来实现响应的按序返回。标记完后执行_flush发送响应的数据(如果这时候请求已经被处理完成)

  1. OutgoingMessage.prototype._flush = function _flush() {
  2. const socket = this.socket;
  3. if (socket && socket.writable) {
  4. const ret = this._flushOutput(socket);
  5. };
  6. OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
  7. // 之前设置了加塞,则操作socket先积攒数据
  8. while (this[kCorked]) {
  9. this[kCorked]--;
  10. socket.cork();
  11. }
  12. const outputLength = this.outputData.length;
  13. // 没有数据需要发送
  14. if (outputLength <= 0)
  15. return undefined;
  16. const outputData = this.outputData;
  17. // 加塞,让数据一起发送出去
  18. socket.cork();
  19. // 把缓存的数据写到socket
  20. let ret;
  21. for (let i = 0; i < outputLength; i++) {
  22. const { data, encoding, callback } = outputData[i];
  23. ret = socket.write(data, encoding, callback);
  24. }
  25. socket.uncork();
  26. this.outputData = [];
  27. this._onPendingData(-this.outputSize);
  28. this.outputSize = 0;
  29. return ret;
  30. }

以上就是Node.js中对于管道化的实现。

18.3.2 HTTP Connect方法的原理和实现

分析HTTP Connect实现之前我们首先看一下为什么需要HTTP Connect方法或者说它出现的背景。Connect方法主要用于代理服务器的请求转发。我们看一下传统HTTP服务器的工作原理,如图18-4所示。
18-HTTP - 图3
图18-4
1 客户端和代理服务器建立TCP连接
2 客户端发送HTTP请求给代理服务器
3 代理服务器解析HTTP协议,根据配置拿到业务服务器的地址
4 代理服务器和业务服务器建立TCP连接,通过HTTP协议或者其它协议转发请求
5 业务服务器返回数据,代理服务器回复HTTP报文给客户端。

接着我们看一下HTTPS服务器的原理。
1 客户端和服务器建立TCP连接
2 服务器通过TLS报文返回证书信息,并和客户端完成后续的TLS通信。
3 完成TLS通信后,后续发送的HTTP报文会经过TLS层加密解密后再传输。

那么如果我们想实现一个HTTPS的代理服务器怎么做呢?因为客户端只管和直接相连的服务器进行HTTPS的通信,如果我们的业务服务器前面还有代理服务器,那么代理服务器就必须要有证书才能和客户端完成TLS握手,从而进行HTTPS通信。代理服务器和业务服务器使用HTTP或者HTTPS还是其它协议都可以。这样就意味着我们所有的服务的证书都需要放到代理服务器上,这种场景的限制是,代理服务器和业务服务器都由我们自己管理或者公司统一管理。如果我们想加一个代理对业务服务器不感知那怎么办呢(比如写一个代理服务器用于开发调试)?有一种方式就是为我们的代理服务器申请一个证书,这样客户端和代理服务器就可以完成正常的HTTPS通信了。从而也就可以完成代理的功能。另外一种方式就是HTTP Connect方法。HTTP Connect方法的作用是指示服务器帮忙建立一条TCP连接到真正的业务服务器,并且透传后续的数据,这样不申请证书也可以完成代理的功能,如图18-5所示。
18-HTTP - 图4
图18-5
这时候代理服务器只负责透传两端的数据,不像传统的方式一样解析请求然后再转发。这样客户端和业务服务器就可以自己完成TLS握手和HTTPS通信。代理服务器就像不存在一样。了解了Connect的原理后看一下来自Node.js官方的一个例子。

  1. const http = require('http');
  2. const net = require('net');
  3. const { URL } = require('url');
  4. // 创建一个HTTP服务器作为代理服务器
  5. const proxy = http.createServer((req, res) => {
  6. res.writeHead(200, { 'Content-Type': 'text/plain' });
  7. res.end('okay');
  8. });
  9. // 监听connect事件,有http connect请求时触发
  10. proxy.on('connect', (req, clientSocket, head) => {
  11. // 获取真正要连接的服务器地址并发起连接
  12. const { port, hostname } = new URL(`http://${req.url}`);
  13. const serverSocket = net.connect(port || 80, hostname, () => {
  14. // 连接成功告诉客户端
  15. clientSocket.write('HTTP/1.1 200 Connection Established\r\n' +
  16. 'Proxy-agent: Node.js-Proxy\r\n' +
  17. '\r\n');
  18. // 透传客户端和服务器的数据
  19. serverSocket.write(head);
  20. serverSocket.pipe(clientSocket);
  21. clientSocket.pipe(serverSocket);
  22. });
  23. });
  24. proxy.listen(1337, '127.0.0.1', () => {
  25. const options = {
  26. port: 1337,
  27. // 连接的代理服务器地址
  28. host: '127.0.0.1',
  29. method: 'CONNECT',
  30. // 我们需要真正想访问的服务器地址
  31. path: 'www.baidu.com',
  32. };
  33. // 发起http connect请求
  34. const req = http.request(options);
  35. req.end();
  36. // connect请求成功后触发
  37. req.on('connect', (res, socket, head) => {
  38. // 发送真正的请求
  39. socket.write('GET / HTTP/1.1\r\n' +
  40. 'Host: www.baidu.com\r\n' +
  41. 'Connection: close\r\n' +
  42. '\r\n');
  43. socket.on('data', (chunk) => {
  44. console.log(chunk.toString());
  45. });
  46. socket.on('end', () => {
  47. proxy.close();
  48. });
  49. });
  50. });

官网的这个例子很好地说明了Connect的原理,如图18-6所示。
18-HTTP - 图5
图18-6
下面我们看一下Node.js中Connect的实现。我们从HTTP Connect请求开始。之前已经分析过,客户端和Node.js服务器建立TCP连接后,Node.js收到数据的时候会交给HTTP解析器处理,

  1. // 连接上有数据到来
  2. function socketOnData(server, socket, parser, state, d) {
  3. // 交给HTTP解析器处理,返回已经解析的字节数
  4. const ret = parser.execute(d);
  5. onParserExecuteCommon(server, socket, parser, state, ret, d);
  6. }

HTTP解析数据的过程中会不断回调Node.js的回调,然后执行onParserExecuteCommon。我们这里只关注当Node.js解析完所有HTTP请求头后执行parserOnHeadersComplete。

  1. function parserOnHeadersComplete(versionMajor, versionMinor, headers, method,
  2. url, statusCode, statusMessage, upgrade,
  3. shouldKeepAlive) {
  4. const parser = this;
  5. const { socket } = parser;
  6. // IncomingMessage
  7. const ParserIncomingMessage = (socket && socket.server &&
  8. socket.server[kIncomingMessage]) ||
  9. IncomingMessage;
  10. // 新建一个IncomingMessage对象
  11. const incoming = parser.incoming = new ParserIncomingMessage(socket);
  12. incoming.httpVersionMajor = versionMajor;
  13. incoming.httpVersionMinor = versionMinor;
  14. incoming.httpVersion = `${versionMajor}.${versionMinor}`;
  15. incoming.url = url;
  16. // 是否是connect请求或者upgrade请求
  17. incoming.upgrade = upgrade;
  18. // 执行回调
  19. return parser.onIncoming(incoming, shouldKeepAlive);
  20. }

我们看到解析完HTTP头后,Node.js会创建一个表示请求的对象IncomingMessage,然后回调onIncoming。

  1. function parserOnIncoming(server, socket, state, req, keepAlive) {
  2. // 请求是否是connect或者upgrade
  3. if (req.upgrade) {
  4. req.upgrade = req.method === 'CONNECT' ||
  5. server.listenerCount('upgrade') > 0;
  6. if (req.upgrade)
  7. return 2;
  8. }
  9. // ...
  10. }

Node.js解析完头部并且执行了响应的钩子函数后,会执行onParserExecuteCommon。

  1. function onParserExecuteCommon(server, socket, parser, state, ret, d) {
  2. if (ret instanceof Error) {
  3. prepareError(ret, parser, d);
  4. ret.rawPacket = d || parser.getCurrentBuffer();
  5. socketOnError.call(socket, ret);
  6. } else if (parser.incoming && parser.incoming.upgrade) {
  7. // 处理Upgrade或者CONNECT请求
  8. const req = parser.incoming;
  9. const eventName = req.method === 'CONNECT' ?
  10. 'connect' : 'upgrade';
  11. // 监听了对应的事件则处理,否则关闭连接
  12. if (eventName === 'upgrade' ||
  13. server.listenerCount(eventName) > 0) {
  14. // 还没有解析的数据
  15. const bodyHead = d.slice(ret, d.length);
  16. socket.readableFlowing = null;
  17. server.emit(eventName, req, socket, bodyHead);
  18. } else {
  19. socket.destroy();
  20. }
  21. }
  22. }

这时候Node.js会判断请求是不是Connect或者协议升级的upgrade请求,是的话继续判断是否有处理该事件的函数,没有则关闭连接,否则触发对应的事件进行处理。所以这时候Node.js会触发Connect方法。Connect事件的处理逻辑正如我们开始给出的例子中那样。我们首先和真正的服务器建立TCP连接,然后返回响应头给客户端,后续客户就可以和真正的服务器真正进行TLS握手和HTTPS通信了。这就是Node.js中Connect的原理和实现。

不过在代码中我们发现一个好玩的地方。那就是在触发connect事件的时候,Node.js给回调函数传入的参数。

  1. server.emit('connect', req, socket, bodyHead);

第一第二个参数没什么特别的,但是第三个参数就有意思了,bodyHead代表的是HTTP Connect请求中除了请求行和HTTP头之外的数据。因为Node.js解析完HTTP头后就不继续处理了。把剩下的数据交给了用户。我们来做一些好玩的事情。

  1. const http = require('http');
  2. const net = require('net');
  3. const { URL } = require('url');
  4. const proxy = http.createServer((req, res) => {
  5. res.writeHead(200, { 'Content-Type': 'text/plain' });
  6. res.end('okay');
  7. });
  8. proxy.on('connect', (req, clientSocket, head) => {
  9. const { port, hostname } = new URL(`http://${req.url}`);
  10. const serverSocket = net.connect(port || 80, hostname, () => {
  11. clientSocket.write('HTTP/1.1 200 Connection Established\r\n' +
  12. 'Proxy-agent: Node.js-Proxy\r\n' +
  13. '\r\n');
  14. // 把connect请求剩下的数据转发给服务器
  15. serverSocket.write(head);
  16. serverSocket.pipe(clientSocket);
  17. clientSocket.pipe(serverSocket);
  18. });
  19. });
  20. proxy.listen(1337, '127.0.0.1', () => {
  21. const net = require('net');
  22. const body = 'GET http://www.baidu.com:80 HTTP/1.1\r\n\r\n';
  23. const length = body.length;
  24. const socket = net.connect({host: '127.0.0.1', port: 1337});
  25. socket.write(`CONNECT www.baidu.com:80 HTTP/1.1\r\n\r\n${body}`);
  26. socket.setEncoding('utf-8');
  27. socket.on('data', (chunk) => {
  28. console.log(chunk)
  29. });
  30. });

我们新建一个socket,然后自己构造HTTP Connect报文,并且在HTTP行后面加一个额外的字符串,这个字符串是两一个HTTP请求。当Node.js服务器收到Connect请求后,我们在connect事件的处理函数中,把Connect请求多余的那一部分数据传给真正的服务器。这样就节省了发送一个请求的时间。

18.3.3 超时管理

在解析HTTP协议或者支持长连接的时候,Node.js需要设置一些超时的机制,否则会造成攻击或者资源浪费。下面我们看一下HTTP服务器中涉及到超时的一些逻辑。
1 解析HTTP头部超时
当收到一个HTTP请求报文时,会从HTTP请求行,HTTP头,HTTP body的顺序进行解析,如果用户构造请求,只发送HTTP头的一部分。那么HTTP解析器就会一直在等待后续数据的到来。这会导致DDOS攻击,所以Node.js中设置了解析HTTP头的超时时间,阈值是60秒。如果60秒内没有解析完HTTP头部,则会触发timeout事件。如果用户不处理,则Node.js会自动关闭连接。我们看一下Node.js的实现。Node.js在初始化的时候会设置超时时间。

  1. this.headersTimeout = 60 * 1000; // 60 seconds
  2. // Node.js在建立TCP连接成功后初始化解析HTTP头的开始时间。
  3. function connectionListenerInternal(server, socket) {
  4. parser.parsingHeadersStart = nowDate();
  5. }

然后在每次收到数据的时候判断HTTP头部是否解析完成,如果没有解析完成并且超时了则会触发timeout事件。

  1. function onParserExecute(server, socket, parser, state, ret) {
  2. socket._unrefTimer();
  3. const start = parser.parsingHeadersStart;
  4. // start等于0,说明HTTP头已经解析完毕,否则说明正在解析头,然后再判断解析时间是否超时了
  5. if (start !== 0 && nowDate() - start > server.headersTimeout) {
  6. // 触发timeout,如果没有监听timeout,则默认会销毁socket,即关闭连接
  7. const serverTimeout = server.emit('timeout', socket);
  8. if (!serverTimeout)
  9. socket.destroy();
  10. return;
  11. }
  12. onParserExecuteCommon(server, socket, parser, state, ret, undefined);
  13. }

如果在超时之前解析HTTP头完成,则把parsingHeadersStart置为0表示解析完成。

  1. function parserOnIncoming(server, socket, state, req, keepAlive) {
  2. // 设置了keepAlive则响应后需要重置一些状态
  3. if (server.keepAliveTimeout > 0) {
  4. req.on('end', resetHeadersTimeoutOnReqEnd);
  5. }
  6. // 标记头部解析完毕
  7. socket.parser.parsingHeadersStart = 0;
  8. }
  9. function resetHeadersTimeoutOnReqEnd() {
  10. if (parser) {
  11. parser.parsingHeadersStart = nowDate();
  12. }
  13. }

另外如果支持长连接,即一个TCP连接上可以发送多个请求。则在每个响应结束之后,需要重新初始化解析HTTP头的开始时间。当下一个请求数据到来时再次判断解析HTTP头部是否超时。这里是响应结束后就开始计算。而不是下一个请求到来时。
2 支持管道化的情况下,多个请求的时间间隔
Node.js支持在一个TCP连接上发送多个HTTP请求,所以需要设置一个定时器,如果超时都没有新的请求到来,则触发超时事件。这里涉及定时器的设置和重置。

  1. // 是不是最后一个响应
  2. if (res._last) {
  3. // 是则销毁socket
  4. if (typeof socket.destroySoon === 'function') {
  5. socket.destroySoon();
  6. } else {
  7. socket.end();
  8. }
  9. } else if (state.outgoing.length === 0) {
  10. // 没有待处理的响应了,则重新设置超时时间,等待请求的到来,一定时间内没有请求则触发timeout事件
  11. if (server.keepAliveTimeout && typeof socket.setTimeout === 'function') {
  12. socket.setTimeout(server.keepAliveTimeout);
  13. state.keepAliveTimeoutSet = true;
  14. }
  15. }

每次响应结束的时候,Node.js首先会判断当前响应是不是最后一个,例如读端不可读了,说明不会又请求到来了,也不会有响应了,那么就不需要保持这个TCP连接。如果当前响应不是最后一个,则Node.js会根据keepAliveTimeout的值做下一步判断,如果keepAliveTimeout 非空,则设置定时器,如果keepAliveTimeout 时间内都没有新的请求则触发timeout事件。那么如果有新请求到来,则需要重置这个定时器。Node.js在收到新请求的第一个请求包中,重置该定时器。

  1. function onParserExecuteCommon(server, socket, parser, state, ret, d) {
  2. resetSocketTimeout(server, socket, state);
  3. }
  4. function resetSocketTimeout(server, socket, state) {
  5. if (!state.keepAliveTimeoutSet)
  6. return;
  7. socket.setTimeout(server.timeout || 0);
  8. state.keepAliveTimeoutSet = false;
  9. }

onParserExecuteCommon会在每次收到数据时执行,然后Node.js会重置定时器为server.timeout的值。

18.4 Agent

本节我们先分析Agent模块的实现,Agent对TCP连接进行了池化管理。简单的情况下,客户端发送一个HTTP请求之前,首先建立一个TCP连接,收到响应后会立刻关闭TCP连接。但是我们知道TCP的三次握手是比较耗时的。所以如果我们能复用TCP连接,在一个TCP连接上发送多个HTTP请求和接收多个HTTP响应,那么在性能上面就会得到很大的提升。Agent的作用就是复用TCP连接。不过Agent的模式是在一个TCP连接上串行地发送请求和接收响应,不支持HTTP PipeLine模式。下面我们看一下Agent模块的具体实现。看它是如何实现TCP连接复用的。

  1. function Agent(options) {
  2. if (!(this instanceof Agent))
  3. return new Agent(options);
  4. EventEmitter.call(this);
  5. this.defaultPort = 80;
  6. this.protocol = 'http:';
  7. this.options = { ...options };
  8. // path字段表示是本机的进程间通信时使用的路径,比如Unix域路径
  9. this.options.path = null;
  10. // socket个数达到阈值后,等待空闲socket的请求
  11. this.requests = {};
  12. // 正在使用的socket
  13. this.sockets = {};
  14. // 空闲socket
  15. this.freeSockets = {};
  16. // 空闲socket的存活时间
  17. this.keepAliveMsecs = this.options.keepAliveMsecs || 1000;
  18. /*
  19. 用完的socket是否放到空闲队列,
  20. 开启keepalive才会放到空闲队列,
  21. 不开启keepalive
  22. 还有等待socket的请求则复用socket
  23. 没有等待socket的请求则直接销毁socket
  24. */
  25. this.keepAlive = this.options.keepAlive || false;
  26. // 最大的socket个数,包括正在使用的和空闲的socket
  27. this.maxSockets = this.options.maxSockets
  28. || Agent.defaultMaxSockets;
  29. // 最大的空闲socket个数
  30. this.maxFreeSockets = this.options.maxFreeSockets || 256;
  31. }

Agent维护了几个数据结构,分别是等待socket的请求、正在使用的socket、空闲socket。每一个数据结构是一个对象,对象的key是根据HTTP请求参数计算的。对象的值是一个队列。具体结构如图18-7所示。
18-HTTP - 图6
图18-7
下面我们看一下Agent模块的具体实现。

18.4.1 key的计算

key的计算是池化管理的核心。正确地设计key的计算规则,才能更好地利用池化带来的好处。

  1. // 一个请求对应的key
  2. Agent.prototype.getName = function getName(options) {
  3. let name = options.host || 'localhost';
  4. name += ':';
  5. if (options.port)
  6. name += options.port;
  7. name += ':';
  8. if (options.localAddress)
  9. name += options.localAddress;
  10. if (options.family === 4 || options.family === 6)
  11. name += `:${options.family}`;
  12. if (options.socketPath)
  13. name += `:${options.socketPath}`;
  14. return name;
  15. };

我们看到key由host、port、本地地址、地址簇类型、unix路径计算而来。所以不同的请求只有这些因子都一样的情况下才能复用连接。另外我们看到Agent支持Unix域。

18.4.2 创建一个socket

  1. function createSocket(req, options, cb) {
  2. options = { ...options, ...this.options };
  3. // 计算key
  4. const name = this.getName(options);
  5. options._agentKey = name;
  6. options.encoding = null;
  7. let called = false;
  8. // 创建socket完毕后执行的回调
  9. const oncreate = (err, s) => {
  10. if (called)
  11. return;
  12. called = true;
  13. if (err)
  14. return cb(err);
  15. if (!this.sockets[name]) {
  16. this.sockets[name] = [];
  17. }
  18. // 插入正在使用的socket队列
  19. this.sockets[name].push(s);
  20. // 监听socket的一些事件,用于回收socket
  21. installListeners(this, s, options);
  22. // 有可用socket,通知调用方
  23. cb(null, s);
  24. };
  25. // 创建一个新的socket,使用net.createConnection
  26. const newSocket = this.createConnection(options, oncreate);
  27. if (newSocket)
  28. oncreate(null, newSocket);
  29. }
  30. function installListeners(agent, s, options) {
  31. /*
  32. socket触发空闲事件的处理函数,告诉agent该socket空闲了,
  33. agent会回收该socket到空闲队列
  34. */
  35. function onFree() {
  36. agent.emit('free', s, options);
  37. }
  38. /*
  39. 监听socket空闲事件,调用方使用完socket后触发,
  40. 通知agent socket用完了
  41. */
  42. s.on('free', onFree);
  43. function onClose(err) {
  44. agent.removeSocket(s, options);
  45. }
  46. // socket关闭则agent会从socket队列中删除它
  47. s.on('close', onClose);
  48. function onRemove() {
  49. agent.removeSocket(s, options);
  50. s.removeListener('close', onClose);
  51. s.removeListener('free', onFree);
  52. s.removeListener('agentRemove', onRemove);
  53. }
  54. // agent被移除
  55. s.on('agentRemove', onRemove);
  56. }

创建socket的主要逻辑如下
1 调用net模块创建一个socket(TCP或者Unix域),然后插入使用中的socket队列,最后通知调用方socket创建成功。
2 监听socket的close、free事件和agentRemove事件,触发时从队列中删除socket。

18.4.3 删除socket

  1. // 把socket从正在使用队列或者空闲队列中移出
  2. function removeSocket(s, options) {
  3. const name = this.getName(options);
  4. const sets = [this.sockets];
  5. /*
  6. socket不可写了,则有可能是存在空闲的队列中,
  7. 所以需要遍历空闲队列,因为removeSocket只会在
  8. 使用完socket或者socket关闭的时候被调用,前者只有在
  9. 可写状态时会调用,后者是不可写的
  10. */
  11. if (!s.writable)
  12. sets.push(this.freeSockets);
  13. // 从队列中删除对应的socket
  14. for (const sockets of sets) {
  15. if (sockets[name]) {
  16. const index = sockets[name].indexOf(s);
  17. if (index !== -1) {
  18. sockets[name].splice(index, 1);
  19. // Don't leak
  20. if (sockets[name].length === 0)
  21. delete sockets[name];
  22. }
  23. }
  24. }
  25. /*
  26. 如果还有在等待socekt的请求,则创建socket去处理它,
  27. 因为socket数已经减一了,说明socket个数还没有达到阈值
  28. 但是这里应该先判断是否还有空闲的socket,有则可以复用,
  29. 没有则创建新的socket
  30. */
  31. if (this.requests[name] && this.requests[name].length) {
  32. const req = this.requests[name][0];
  33. const socketCreationHandler = handleSocketCreation(this,
  34. req,
  35. false);
  36. this.createSocket(req, options, socketCreationHandler);
  37. }
  38. };

前面已经分析过,Agent维护了两个socket队列,删除socket就是从这两个队列中找到对应的socket,然后移除它。移除后需要判断一下是否还有等待socket的请求队列,有的话就新建一个socket去处理它。因为移除了一个socket,就说明可以新增一个socket。

18.4.4 设置socket keepalive

当socket被使用完并且被插入空闲队列后,需要重新设置socket的keepalive值。等到超时会自动关闭socket。在一个socket上调用一次setKeepAlive就可以了,这里可能会导致多次调用setKeepAlive,不过也没有影响。

  1. function keepSocketAlive(socket) {
  2. socket.setKeepAlive(true, this.keepAliveMsecs);
  3. socket.unref();
  4. return true;
  5. };

另外需要设置ref标记,防止该socket阻止事件循环的退出,因为该socket是空闲的,不应该影响事件循环的退出。

18.4.5 复用socket

  1. function reuseSocket(socket, req) {
  2. req.reusedSocket = true;
  3. socket.ref();
  4. };

重新使用该socket,需要修改ref标记,阻止事件循环退出,并标记请求使用的是复用socket。

18.4.6 销毁Agent

  1. function destroy() {
  2. for (const set of [this.freeSockets, this.sockets]) {
  3. for (const key of ObjectKeys(set)) {
  4. for (const setName of set[key]) {
  5. setName.destroy();
  6. }
  7. }
  8. }
  9. };

因为Agent本质上是一个socket池,销毁Agent即销毁池里维护的所有socket。

18.4.7 使用连接池

我们看一下如何使用Agent。

  1. function addRequest(req, options, port, localAddress) {
  2. // 参数处理
  3. if (typeof options === 'string') {
  4. options = {
  5. host: options,
  6. port,
  7. localAddress
  8. };
  9. }
  10. options = { ...options, ...this.options };
  11. if (options.socketPath)
  12. options.path = options.socketPath;
  13. if (!options.servername && options.servername !== '')
  14. options.servername = calculateServerName(options, req);
  15. // 拿到请求对应的key
  16. const name = this.getName(options);
  17. // 该key还没有在使用的socekt则初始化数据结构
  18. if (!this.sockets[name]) {
  19. this.sockets[name] = [];
  20. }
  21. // 该key对应的空闲socket列表
  22. const freeLen = this.freeSockets[name] ?
  23. this.freeSockets[name].length : 0;
  24. // 该key对应的所有socket个数
  25. const sockLen = freeLen + this.sockets[name].length;
  26. // 该key有对应的空闲socekt
  27. if (freeLen) {
  28. // 获取一个该key对应的空闲socket
  29. const socket = this.freeSockets[name].shift();
  30. // 取完了删除,防止内存泄漏
  31. if (!this.freeSockets[name].length)
  32. delete this.freeSockets[name];
  33. // 设置ref标记,因为正在使用该socket
  34. this.reuseSocket(socket, req);
  35. // 设置请求对应的socket
  36. setRequestSocket(this, req, socket);
  37. // 插入正在使用的socket队列
  38. this.sockets[name].push(socket);
  39. } else if (sockLen < this.maxSockets) {
  40. /*
  41. 如果该key没有对应的空闲socket并且使用的
  42. socket个数还没有得到阈值,则继续创建
  43. */
  44. this.createSocket(req,
  45. options,
  46. handleSocketCreation(this, req, true));
  47. } else {
  48. // 等待该key下有空闲的socket
  49. if (!this.requests[name]) {
  50. this.requests[name] = [];
  51. }
  52. this.requests[name].push(req);
  53. }
  54. }

当我们需要发送一个HTTP请求的时候,我们可以通过Agent的addRequest方法把请求托管到Agent中,当有可用的socket时,Agent会通知我们。addRequest的代码很长,主要分为三种情况。
1 有空闲socket,则直接复用,并插入正在使用的socket队列中
我们主要看一下setRequestSocket函数

  1. function setRequestSocket(agent, req, socket) {
  2. // 通知请求socket创建成功
  3. req.onSocket(socket);
  4. const agentTimeout = agent.options.timeout || 0;
  5. if (req.timeout === undefined || req.timeout === agentTimeout)
  6. {
  7. return;
  8. }
  9. // 开启一个定时器,过期后触发timeout事件
  10. socket.setTimeout(req.timeout);
  11. /*
  12. 监听响应事件,响应结束后需要重新设置超时时间,
  13. 开启下一个请求的超时计算,否则会提前过期
  14. */
  15. req.once('response', (res) => {
  16. res.once('end', () => {
  17. if (socket.timeout !== agentTimeout) {
  18. socket.setTimeout(agentTimeout);
  19. }
  20. });
  21. });
  22. }

setRequestSocket函数通过req.onSocket(socket)通知调用方有可用socket。然后如果请求设置了超时时间则设置socket的超时时间,即请求的超时时间。最后监听响应结束事件,重新设置超时时间。
2 没有空闲socket,但是使用的socket个数还没有达到阈值,则创建新的socket。
我们主要分析创建socket后的回调handleSocketCreation。

  1. function handleSocketCreation(agent, request, informRequest) {
  2. return function handleSocketCreation_Inner(err, socket) {
  3. if (err) {
  4. process.nextTick(emitErrorNT, request, err);
  5. return;
  6. }
  7. /*
  8. 是否需要直接通知请求方,这时候request不是来自等待
  9. socket的requests队列, 而是来自调用方,见addRequest
  10. */
  11. if (informRequest)
  12. setRequestSocket(agent, request, socket);
  13. else
  14. /*
  15. 不直接通知,先告诉agent有空闲的socket,
  16. agent会判断是否有正在等待socket的请求,有则处理
  17. */
  18. socket.emit('free');
  19. };
  20. }

3 不满足1,2,则把请求插入等待socket队列。
插入等待socket队列后,当有socket空闲时会触发free事件,我们看一下该事件的处理逻辑。

  1. // 监听socket空闲事件
  2. this.on('free', (socket, options) => {
  3. const name = this.getName(options);
  4. // socket还可写并且还有等待socket的请求,则复用socket
  5. if (socket.writable &&
  6. this.requests[name] && this.requests[name].length) {
  7. // 拿到一个等待socket的请求,然后通知它有socket可用
  8. const req = this.requests[name].shift();
  9. setRequestSocket(this, req, socket);
  10. // 没有等待socket的请求则删除,防止内存泄漏
  11. if (this.requests[name].length === 0) {
  12. // don't leak
  13. delete this.requests[name];
  14. }
  15. } else {
  16. // socket不可用写或者没有等待socket的请求了
  17. const req = socket._httpMessage;
  18. // socket可写并且请求设置了允许使用复用的socket
  19. if (req &&
  20. req.shouldKeepAlive &&
  21. socket.writable &&
  22. this.keepAlive) {
  23. let freeSockets = this.freeSockets[name];
  24. // 该key下当前的空闲socket个数
  25. const freeLen = freeSockets ? freeSockets.length : 0;
  26. let count = freeLen;
  27. // 正在使用的socket个数
  28. if (this.sockets[name])
  29. count += this.sockets[name].length;
  30. /*
  31. 该key使用的socket个数达到阈值或者空闲socket达到阈值,
  32. 则不复用socket,直接销毁socket
  33. */
  34. if (count > this.maxSockets ||
  35. freeLen >= this.maxFreeSockets) {
  36. socket.destroy();
  37. } else if (this.keepSocketAlive(socket)) {
  38. /*
  39. 重新设置socket的存活时间,设置失败说明无法重新设置存活时
  40. 间,则说明可能不支持复用
  41. */
  42. freeSockets = freeSockets || [];
  43. this.freeSockets[name] = freeSockets;
  44. socket[async_id_symbol] = -1;
  45. socket._httpMessage = null;
  46. // 把socket从正在使用队列中移除
  47. this.removeSocket(socket, options);
  48. // 插入socket空闲队列
  49. freeSockets.push(socket);
  50. } else {
  51. // 不复用则直接销毁
  52. socket.destroy();
  53. }
  54. } else {
  55. socket.destroy();
  56. }
  57. }
  58. });

当有socket空闲时,分为以下几种情况
1 如果有等待socket的请求,则直接复用socket。
2 如果没有等待socket的请求,允许复用并且socket个数没有达到阈值则插入空闲队列。
3 直接销毁

18.4.8 测试例子

客户端

  1. const http = require('http');
  2. const keepAliveAgent = new http.Agent({ keepAlive: true, maxSockets: 1 });
  3. const options = {port: 10000, method: 'GET', host: '127.0.0.1',}
  4. options.agent = keepAliveAgent;
  5. http.get(options, () => {});
  6. http.get(options, () => {});
  7. console.log(options.agent.requests)

服务器

  1. let i =0;
  2. const net = require('net');
  3. net.createServer((socket) => {
  4. console.log(++i);
  5. }).listen(10000);

在例子中,首先创建了一个tcp服务器。然后在客户端使用agent。但是maxSocket的值为1,代表最多只能有一个socket,而这时候客户端发送两个请求,所以有一个请求就会在排队。服务器也只收到了一个连接。