线程是操作系统的最小调度单位,它本质上是进程中的一个执行流,我们知道,进程有代码段,线程其实就是进程代码段中的其中一段代码。线程的一种实现是作为进程来实现的(pthread线程库),通过调用clone,新建一个进程,然后执行父进程代码段里的一个代码片段,其中文件描述符、内存等信息都是共享的。因为内存是共享的,所以线程不能共享栈,否则访问栈的地址的时候,会映射到相同的物理地址,那样就会互相影响,所以每个线程会有自己独立的栈。在调用clone函数的时候会设置栈的范围,比如在堆上分配一块内存用于做线程的栈,并且支持设置子线程和主线程共享哪些资源。具体可以参考clone系统调用。
由于Node.js是单线程的,虽然底层的Libuv实现了一个线程池,但是这个线程池只能执行C、C层定义的任务。如果我们想自定义一些耗时的操作,那就只能在C层处理,然后暴露接口给JS层调用,这个成本是非常高的,在早期的Node.js版本里,我们可以用进程去实现这样的需求。但是进程太重了,在新版的Node.js中,Node.js为我们提供了多线程的功能。这一章以Node.js多线程模块为背景,分析Node.js中多线程的原理,但是不分析Libuv的线程实现,它本质是对线程库的简单封装。Node.js中,线程的实现也非常复杂。虽然底层只是对线程库的封装,但是把它和Node.js原本的架构结合起来变得复杂起来。
14.1 使用多线程
对于同步文件操作、DNS解析等操作,Node.js使用了内置的线程池支持了异步。但是一些加解密、字符串运算、阻塞型API等操作。我们就不能在主线程里处理了,这时候就不得不使用线程,而且多线程还能利用多核的能力。Node.js的子线程本质上是一个新的事件循环,但是子线程和Node.js主线程共享一个Libuv线程池,所以如果在子线程里有文件、DNS等操作就会和主线程竞争Libuv线程池。如图14-1所示。
图14-1
我们看一下在Node.js中如何使用线程。
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
} else {
// 做点耗时的事情
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
上面这段代码会被执行两次,一次是在主线程,一次在子线程。所以首先通过isMainThread判断当前是主线程还是子线程。主线程的话,就创建一个子线程,然后监听子线程发过来的消息。子线程的话,首先执行业务相关的代码,还可以监听主线程传过来的消息。我们在子线程中可以做一些耗时或者阻塞性的操作,不会影响主线程的执行。我们也可以把这两个逻辑拆分到两个文件。
主线程
const { Worker, isMainThread, parentPort } = require('worker_threads');
const worker = new Worker(‘子线程文件路径’);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
子线程
const { Worker, isMainThread, parentPort } = require('worker_threads');
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
14.2 线程间通信数据结构
进程间的通信一般需要借助操作系统提供公共的内存来完成。因为进程间的内存是独立的,和进程间通信不一样。多线程的内存是共享的,同个进程的内存,多个线程都可以访问,所以线程间通信可以基于进程内的内存来完成。在Node.js中,线程间通信使用的是MessageChannel实现的,它是全双工的,任意一端都可以随时发送信息。MessageChannel类似socket通信,它包括两个端点。定义一个MessageChannel相当于建立一个TCP连接,它首先申请两个端点(MessagePort),然后把它们关联起来。下面我们看一下线程间通信的实现中,比较重要的几个数据结构。
1 Message代表一个消息。
2 MessagePortData是对操作Message的封装和对消息的承载。
3 MessagePort是代表通信的端点。
4 MessageChannel是代表通信的两端,即两个MessagePort。
下面我们看一下具体的实现。
14.2.1 Message
Message类代表的是子线程间通信的一条消息。
class Message : public MemoryRetainer {
public:
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
// 是否是最后一条消息,空消息代表是最后一条消息
bool IsCloseMessage() const;
// 线程间通信的数据需要通过序列化和反序列化处理
v8::MaybeLocal<v8::Value> Deserialize(Environment* env,
v8::Local<v8::Context> context);
v8::Maybe<bool> Serialize(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value> input,
const TransferList& transfer_list,
v8::Local<v8::Object> source_port =
v8::Local<v8::Object>());
// 传递SharedArrayBuffer型变量
void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store);
// 传递MessagePort型变量
void AddMessagePort(std::unique_ptr<MessagePortData>&& data);
// 消息所属端口,端口是消息到达的地方
const std::vector<std::unique_ptr<MessagePortData>>& message_ports() const {
return message_ports_;
}
private:
// 保存消息的内容
MallocedBuffer<char> main_message_buf_;
std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_;
std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_;
std::vector<std::unique_ptr<MessagePortData>> message_ports_;
std::vector<v8::CompiledWasmModule> wasm_modules_;
};
14.2.2 MessagePortData
MessagePortData是管理消息发送和接收的类。
class MessagePortData : public MemoryRetainer {
public:
explicit MessagePortData(MessagePort* owner);
~MessagePortData() override;
// 新增一个消息
void AddToIncomingQueue(Message&& message);
// 关联/解关联通信两端的端口
static void Entangle(MessagePortData* a, MessagePortData* b);
void Disentangle();
private:
// 用于多线程往对端消息队列插入消息时的互斥变量
mutable Mutex mutex_;
std::list<Message> incoming_messages_;
// 所属端口
MessagePort* owner_ = nullptr;
// 用于多线程访问对端sibling_属性时的互斥变量
std::shared_ptr<Mutex> sibling_mutex_ = std::make_shared<Mutex>();
// 指向通信对端的指针
MessagePortData* sibling_ = nullptr;
};
我们看一下实现。
MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { }
MessagePortData::~MessagePortData() {
// 析构时解除和对端的关系
Disentangle();
}
// 插入一个message
void MessagePortData::AddToIncomingQueue(Message&& message) {
// 先加锁,保证多线程安全,互斥访问
Mutex::ScopedLock lock(mutex_);
// 插入消息队列
incoming_messages_.emplace_back(std::move(message));
// 通知owner
if (owner_ != nullptr) {
owner_->TriggerAsync();
}
}
// 关联通信的对端,并保持对端的互斥变量,访问对端时需要使用
void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
a->sibling_ = b;
b->sibling_ = a;
a->sibling_mutex_ = b->sibling_mutex_;
}
// 解除关联
void MessagePortData::Disentangle() {
// 加锁操作对端的sibling字段
std::shared_ptr<Mutex> sibling_mutex = sibling_mutex_;
Mutex::ScopedLock sibling_lock(*sibling_mutex);
sibling_mutex_ = std::make_shared<Mutex>();
// 对端
MessagePortData* sibling = sibling_;
// 对端非空,则把对端的sibling也指向空,自己也指向空
if (sibling_ != nullptr) {
sibling_->sibling_ = nullptr;
sibling_ = nullptr;
}
// 插入一个空的消息通知对端和本端
AddToIncomingQueue(Message());
if (sibling != nullptr) {
sibling->AddToIncomingQueue(Message());
}
}
14.2.3 MessagePort
MessagePort表示的是通信的一端。
class MessagePort : public HandleWrap {
public:
MessagePort(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Object> wrap);
~MessagePort() override;
static MessagePort* New(Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<MessagePortData> data = nullptr);
// 发送消息
v8::Maybe<bool> PostMessage(Environment* env,
v8::Local<v8::Value> message,
const TransferList& transfer);
// 开启/关闭接收消息
void Start();
void Stop();
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
// 提供JS层使用的方法
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
// 关联对端
static void Entangle(MessagePort* a, MessagePort* b);
static void Entangle(MessagePort* a, MessagePortData* b);
// 解除MessagePortData和端口的关系
std::unique_ptr<MessagePortData> Detach();
// 关闭端口
void Close(
v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;
inline bool IsDetached() const;
private:
void OnClose() override;
void OnMessage();
void TriggerAsync();
v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
bool only_if_receiving);
// MessagePortData用于管理消息的发送和接收
std::unique_ptr<MessagePortData> data_ = nullptr;
// 是否开启接收消息标记
bool receiving_messages_ = false;
// 用于收到消息时通知事件循环,事件循环执行回调处理消息
uv_async_t async_;
};
我们看一下实现,只列出部分函数。
// 端口是否不接收消息了
bool MessagePort::IsDetached() const {
return data_ == nullptr || IsHandleClosing();
}
// 有消息到达,通知事件循环执行回调
void MessagePort::TriggerAsync() {
if (IsHandleClosing()) return;
CHECK_EQ(uv_async_send(&async_), 0);
}
// 关闭接收消息的端口
void MessagePort::Close(v8::Local<v8::Value> close_callback) {
if (data_) {
// 持有锁,防止再接收消息
Mutex::ScopedLock sibling_lock(data_->mutex_);
HandleWrap::Close(close_callback);
} else {
HandleWrap::Close(close_callback);
}
}
// 新建一个端口,并且可以挂载一个MessagePortData
MessagePort* MessagePort::New(
Environment* env,
Local<Context> context,
std::unique_ptr<MessagePortData> data) {
Context::Scope context_scope(context);
Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);
Local<Object> instance;
// JS层使用的对象
if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
return nullptr;
// 新建一个消息端口
MessagePort* port = new MessagePort(env, context, instance);
// 需要挂载MessagePortData
if (data) {
port->Detach();
port->data_ = std::move(data);
Mutex::ScopedLock lock(port->data_->mutex_);
// 修改data的owner为当前消息端口
port->data_->owner_ = port;
// data中可能有消息
port->TriggerAsync();
}
return port;
}
// 开始接收消息
void MessagePort::Start() {
Debug(this, "Start receiving messages");
receiving_messages_ = true;
Mutex::ScopedLock lock(data_->mutex_);
// 有缓存的消息,通知上层
if (!data_->incoming_messages_.empty())
TriggerAsync();
}
// 停止接收消息
void MessagePort::Stop() {
Debug(this, "Stop receiving messages");
receiving_messages_ = false;
}
// JS层调用
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
MessagePort* port;
ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
if (!port->data_) {
return;
}
port->Start();
}
void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
MessagePort* port;
CHECK(args[0]->IsObject());
ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
if (!port->data_) {
return;
}
port->Stop();
}
// 读取消息
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
MessagePort* port;
ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
port->OnMessage();
}
// 获取某个端口的消息
void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject());
// 第一个参数是端口
MessagePort* port = Unwrap<MessagePort>(args[0].As<Object>());
// 调用对象的ReceiverMessage方法
MaybeLocal<Value> payload =
port->ReceiveMessage(port->object()->CreationContext(), false);
if (!payload.IsEmpty())
args.GetReturnValue().Set(payload.ToLocalChecked());
}
// 关联两个端口
void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
Entangle(a, b->data_.get());
}
void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
MessagePortData::Entangle(a->data_.get(), b);
}
14.2.4 MessageChannel
MessageChannel表示线程间通信的两个端。
static void MessageChannel(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Context> context = args.This()->CreationContext();
Context::Scope context_scope(context);
MessagePort* port1 = MessagePort::New(env, context);
MessagePort* port2 = MessagePort::New(env, context);
MessagePort::Entangle(port1, port2);
// port1->object()拿到JS层使用的对象,它关联了MessagePort对象
args.This()->Set(context, env->port1_string(), port1->object())
.Check();
args.This()->Set(context, env->port2_string(), port2->object())
.Check();
}
MessageChannel的逻辑比较简单,新建两个消息端口,并且关联起来,后续就可以基于这两个端口进行通信了。
Message、MessagePortData、MessagePort和MessageChannel的关系图如图14-2所示。
图14-2
最后我们看一下线程间通信模块导出的一些功能。
static void InitMessaging(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv) {
Environment* env = Environment::GetCurrent(context);
{
// 线程间通信的通道
Local<String> message_channel_string = FIXED_ONE_BYTE_STRING(env->isolate(),
"MessageChannel");
Local<FunctionTemplate> templ = env->NewFunctionTemplate(MessageChannel);
templ->SetClassName(message_channel_string);
target->Set(context,
message_channel_string,
templ->GetFunction(context).ToLocalChecked()).Check();
}
// 新建消息端口的构造函数
target->Set(context,
env->message_port_constructor_string(),
GetMessagePortConstructorTemplate(env)
->GetFunction(context).ToLocalChecked()).Check();
env->SetMethod(target, "stopMessagePort", MessagePort::Stop);
env->SetMethod(target, "drainMessagePort", MessagePort::Drain);
env->SetMethod(target, "receiveMessageOnPort", MessagePort::ReceiveMessage);
env->SetMethod(target, "moveMessagePortToContext",
MessagePort::MoveToContext);
}
14.3 多线程的实现
本节我们从worker_threads模块开始分析多线程的实现。这是一个C++模块。我们看一下它导出的功能。require(“work_threads”)的时候就是引用了InitWorker函数导出的功能。
void InitWorker(Local<Object> target,
Local<Value> unused,
Local<Context> context,
void* priv) {
Environment* env = Environment::GetCurrent(context);
{
Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
w->InstanceTemplate()->SetInternalFieldCount(1);
w->Inherit(AsyncWrap::GetConstructorTemplate(env));
// 设置一系列原型方法,就不一一列举
env->SetProtoMethod(w, "setEnvVars", Worker::SetEnvVars);
// 一系列原型方法
/*
导出函数模块对应的函数,即我们代码中
const { Worker } = require("worker_threads");中的Worker
*/
Local<String> workerString = FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
w->SetClassName(workerString);
target->Set(env->context(),
workerString,
w->GetFunction(env->context()).ToLocalChecked()).Check();
/*
导出getEnvMessagePort方法,获取线程接收消息的端口
const {getEnvMessagePort} = require("worker_threads");
*/
env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
/*
线程id,这个不是操作系统分配的那个,而是Node.js分配的,
在创建线程的时候设置
const { threadId } = require("worker_threads");
*/
target->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(),
static_cast<double>(env->thread_id())))
.Check();
/*
是否是主线程,
const { isMainThread } = require("worker_threads");
这边变量在Node.js启动的时候设置为true,新开子线程的时候,没有设
置,所以是false
*/
target->Set(env->context(),
FIXED_ONE_BYTE_STRING(env->isolate(), "isMainThread"),
Boolean::New(env->isolate(), env->is_main_thread()))
.Check();
/*
如果不是主线程,导出资源限制的配置,
即在子线程中调用
const { resourceLimits } = require("worker_threads");
*/
if (!env->is_main_thread()) {
target->Set(env->context(),
FIXED_ONE_BYTE_STRING(env->isolate(),
"resourceLimits"),
env->worker_context()->GetResourceLimits(env->isolate())).Check();
}
// 导出几个常量
NODE_DEFINE_CONSTANT(target, kMaxYoungGenerationSizeMb);
NODE_DEFINE_CONSTANT(target, kMaxOldGenerationSizeMb);
NODE_DEFINE_CONSTANT(target, kCodeRangeSizeMb);
NODE_DEFINE_CONSTANT(target, kTotalResourceLimitCount);
}
了解work_threads模块导出的功能后,我们看在JS层执行new Worker的时候的逻辑。根据上面代码导出的逻辑,我们知道这时候首先会新建一个C对象。然后执行New回调,并传入新建的C对象。我们看New函数的逻辑。我们省略一系列的参数处理,主要代码如下。
// args.This()就是我们刚才传进来的this
Worker* worker = new Worker(env, args.This(),
url, per_isolate_opts,
std::move(exec_argv_out));
我们再看Worker类的声明。
class Worker : public AsyncWrap {
public:
// 函数声明
private:
std::shared_ptr<PerIsolateOptions> per_isolate_opts_;
std::vector<std::string> exec_argv_;
std::vector<std::string> argv_;
MultiIsolatePlatform* platform_;
v8::Isolate* isolate_ = nullptr;
bool start_profiler_idle_notifier_;
// 真正的线程id,底层返回的
uv_thread_t tid_;
// This mutex protects access to all variables listed below it.
mutable Mutex mutex_;
bool thread_joined_ = true;
const char* custom_error_ = nullptr;
int exit_code_ = 0;
// 线程id,Node.js分配,不是底层返回的
uint64_t thread_id_ = -1;
uintptr_t stack_base_ = 0;
// 线程资源限制配置
double resource_limits_[kTotalResourceLimitCount];
void UpdateResourceConstraints(v8::ResourceConstraints* constraints);
// 栈信息
static constexpr size_t kStackSize = 4 * 1024 * 1024;
static constexpr size_t kStackBufferSize = 192 * 1024;
std::unique_ptr<MessagePortData> child_port_data_;
std::shared_ptr<KVStore> env_vars_;
// 用于线程间通信
MessagePort* child_port_ = nullptr;
MessagePort* parent_port_ = nullptr;
// 线程状态
bool stopped_ = true;
// 是否影响事件循环退出
bool has_ref_ = true;
// 子线程执行时的环境变量,基类也定义了
Environment* env_ = nullptr;
};
这里只讲一下env的定义,因为这是一个非常重要的地方。我们看到Worker类继承AsyncWrap,AsyncWrap继承了BaseObject。BaseObject中也定义了env属性。我们看一下在C++中如果子类父类都定义了一个属性时是怎样的。我们来看一个例子
#include <iostream>
using namespace std;
class A
{
public:
int value;
A()
{
value=1;
}
void console()
{
cout<<value<<endl;
}
};
class B: public A
{
public:
int value;
B():A()
{
value=2;
}
};
int main()
{
B b;
// b.value = 3;只会修改子类的,不会修改父类的
b.console();
cout<<b.value<<endl<<"内存大小:"<<sizeof(b)<<endl;
return 0;
}
以上代码执行时输出
1
2
内存大小:8
由输出结果我们可以知道,b内存大小是8个字节。即两个int。所以b的内存布局中两个a属性都分配了内存。当我们通过b.console输出value时,因为console是在A上定义的,所以输出1,但是我们通过b.value访问时,输出的是2。因为访问的是B中定义的value,同理如果我们在B中定义console,输出也会是2。Worker中定义的env_我们后续会看到它的作用。接着我们看一下Worker类的初始化逻辑。
Worker::Worker(Environment* env,
Local<Object> wrap,...)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER),
...
// 分配线程id
thread_id_(Environment::AllocateThreadId()),
// 继承主线程的环境变量
env_vars_(env->env_vars()) {
// 新建一个端口和子线程通信
parent_port_ = MessagePort::New(env, env->context());
/*
关联起来,用于通信
const parent_port_ = {data: {sibling: null}};
const child_port_data_ = {sibling: null};
parent_port_.data.sibling = child_port_data_;
child_port_data_.sibling = parent_port_.data;
*/
child_port_data_ = std::make_unique<MessagePortData>(nullptr);
MessagePort::Entangle(parent_port_, child_port_data_.get());
// 设置JS层Worker对象的messagePort属性为parent_port_
object()->Set(env->context(),
env->message_port_string(),
parent_port_->object()).Check();
// 设置Worker对象的线程id,即threadId属性
object()->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(), static_cast<double>(thread_id_)))
.Check();
}
新建一个Worker,结构如图14-3所示。
图14-3
了解了new Worker的逻辑后,我们看在JS层是如何使用的。我们看JS层Worker类的构造函数。
constructor(filename, options = {}) {
super();
// 忽略一系列参数处理,new Worker就是上面提到的C++层的
this[kHandle] = new Worker(url, options.execArgv, parseResourceLimits(options.resourceLimits));
// messagePort指向_parent_port
this[kPort] = this[kHandle].messagePort;
this[kPort].on('message', (data) => this[kOnMessage](data));
// 开始接收消息
this[kPort].start();
// 申请一个通信通道,两个端口
const { port1, port2 } = new MessageChannel();
this[kPublicPort] = port1;
this[kPublicPort].on('message', (message) => this.emit('message', message));
// 向另一端发送消息
this[kPort].postMessage({
argv,
type: messageTypes.LOAD_SCRIPT,
filename,
doEval: !!options.eval,
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
workerData: options.workerData,
publicPort: port2,
manifestSrc: getOptionValue('--experimental-policy') ?
require('internal/process/policy').src :
null,
hasStdin: !!options.stdin
}, [port2]);
// 开启线程
this[kHandle].startThread();
}
上面的代码主要逻辑如下
1 保存messagePort,监听该端口的message事件,然后给messagePort的对端发送消息,但是这时候还没有接收端口,所以消息会缓存到MessagePortData,即childport_data 中。另外我们看到主线程把通信端口port2发送给了子线程。
2 申请一个通信通道port1和port2,用于主线程和子线程通信。_parent_port和child_port是给Node.js使用的,新申请的端口是给用户使用的。
3 创建子线程。
我们看创建线程的时候,做了什么。
void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Mutex::ScopedLock lock(w->mutex_);
// The object now owns the created thread and should not be garbage collected
// until that finishes.
w->ClearWeak();
// 加入主线程维护的子线程数据结构
w->env()->add_sub_worker_context(w);
w->stopped_ = false;
w->thread_joined_ = false;
// 是否需要阻塞事件循环退出,默认true
if (w->has_ref_)
w->env()->add_refs(1);
// 是否需要栈和栈大小
uv_thread_options_t thread_options;
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
thread_options.stack_size = kStackSize;
// 创建线程
CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
Worker* w = static_cast<Worker*>(arg);
const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
w->stack_base_ = stack_top - (kStackSize - kStackBufferSize);
// 执行主逻辑
w->Run();
Mutex::ScopedLock lock(w->mutex_);
// 给主线程提交一个任务,通知主线程子线程执行完毕,因为主线程不能直接执行join阻塞自己
w->env()->SetImmediateThreadsafe(
[w = std::unique_ptr<Worker>(w)](Environment* env) {
if (w->has_ref_)
env->add_refs(-1);
w->JoinThread();
// implicitly delete w
});
}, static_cast<void*>(w)), 0);
}
StartThread新建了一个子线程,然后在子线程中执行Run,我们继续看Run
void Worker::Run() {
// 线程执行所需要的数据结构,比如loop,isolate,和主线程独立
WorkerThreadData data(this);
{
Locker locker(isolate_);
Isolate::Scope isolate_scope(isolate_);
SealHandleScope outer_seal(isolate_);
// std::unique_ptr<Environment, FreeEnvironment> env_;
DeleteFnPtr<Environment, FreeEnvironment> env_;
// 线程执行完后执行的清除函数
auto cleanup_env = OnScopeLeave([&]() {
// ...
});
{
HandleScope handle_scope(isolate_);
Local<Context> context;
// 新建一个context,和主线程独立
context = NewContext(isolate_);
Context::Scope context_scope(context);
{
// 新建一个env并初始化,env中会和新的context关联
env_.reset(new Environment(data.isolate_data_.get(),
context,
std::move(argv_),
std::move(exec_argv_),
Environment::kNoFlags,
thread_id_));
env_->set_env_vars(std::move(env_vars_));
env_->set_abort_on_uncaught_exception(false);
env_->set_worker_context(this);
env_->InitializeLibuv(start_profiler_idle_notifier_);
}
{
Mutex::ScopedLock lock(mutex_);
// 更新子线程所属的env
this->env_ = env_.get();
}
{
if (!env_->RunBootstrapping().IsEmpty()) {
CreateEnvMessagePort(env_.get());
USE(StartExecution(env_.get(), "internal/main/worker_thread"));
}
}
{
SealHandleScope seal(isolate_);
bool more;
// 开始事件循环
do {
if (is_stopped()) break;
uv_run(&data.loop_, UV_RUN_DEFAULT);
if (is_stopped()) break;
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
if (more && !is_stopped()) continue;
EmitBeforeExit(env_.get());
more = uv_loop_alive(&data.loop_);
} while (more == true && !is_stopped());
}
}
}
我们分步骤分析上面的代码
1 新建Isolate、context和Environment,子线程在独立的环境执行。然后初始化Environment。这个在Node.js启动过程章节已经分析过,不再分析。
2 更新子线程的env。刚才已经分析过,Worker类中定义了env属性,所以这里通过this.env更新时,是不会影响基类(BaseObject)中的值的。因为子线程是在新的环境执行的,所以在新环境中使用该Worker实例时,需要使用新的环境变量。而在主线程使用该Worker实例时,是通过BaseObject的env()访问的。从而获取的是主线程的环境。因为Worker实例是在主线程和子线程之间共享的,Node.js在Worker类中重新定义了一个env属性正是为了解决这个问题。
3 CreateEnvMessagePort
void Worker::CreateEnvMessagePort(Environment* env) {
child_port_ = MessagePort::New(env,
env->context(),
std::move(child_port_data_));
if (child_port_ != nullptr)
env->set_message_port(child_port_->object(isolate_));
}
childport_data这个变量刚才我们已经看到过,在这里首先申请一个新的端口。并且和childport_data互相关联起来。然后在env缓存起来。后续会使用。这时候的关系图如图14-4所示。
图14-4
4 执行internal/main/worker_thread.js
// 设置process对象
patchProcessObject();
// 获取刚才缓存的端口child_port_
onst port = getEnvMessagePort();
port.on('message', (message) => {
// 加载脚本
if (message.type === LOAD_SCRIPT) {
const {
argv,
cwdCounter,
filename,
doEval,
workerData,
publicPort,
manifestSrc,
manifestURL,
hasStdin
} = message;
const CJSLoader = require('internal/modules/cjs/loader');
loadPreloadModules();
/*
由主线程申请的MessageChannel中某一端的端口,
主线程传递过来的,保存用于和主线程通信
*/
publicWorker.parentPort = publicPort;
// 执行时使用的数据
publicWorker.workerData = workerData;
// 通知主线程,正在执行脚本
port.postMessage({ type: UP_AND_RUNNING });
// 执行new Worker(filename)时传入的文件
CJSLoader.Module.runMain(filename);
})
// 开始接收消息
port.start()
我们看到worker_thread.js中通过runMain完成了子线程的代码执行,然后开始事件循环。
我们看一下当事件循环结束时,Node.js的逻辑。
// 给主线程提交一个任务,通知主线程子线程执行完毕,因为主线程不能直接执行join阻塞自己
w->env()->SetImmediateThreadsafe(
[w = std::unique_ptr<Worker>(w)](Environment* env) {
if (w->has_ref_)
env->add_refs(-1);
w->JoinThread();
// implicitly delete w
});
}, static_cast<void*>(w)), 0);
通过w->env()获取的是主线程的执行环境。我们看一下SetImmediateThreadsafe。
template <typename Fn>
void Environment::SetImmediateThreadsafe(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
}
uv_async_send(&task_queues_async_);
}
SetImmediateThreadsafe用于通知执行环境所在的事件循环有异步任务完成。并且是线程安全的。因为可能有多个线程会操作nativeimmediates_threadsafe。在主线程事件循环的Poll IO阶段就会执行taskqueues_async回调。我们看一下taskqueues_async对应的回调。
uv_async_init(
event_loop(),
&task_queues_async_,
[](uv_async_t* async) {
Environment* env = ContainerOf(
&Environment::task_queues_async_, async);
env->CleanupFinalizationGroups();
env->RunAndClearNativeImmediates();
});
所以在Poll IO阶段执行的回调是RunAndClearNativeImmediates
void Environment::RunAndClearNativeImmediates(bool only_refed) {
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunAndClearNativeImmediates", this);
size_t ref_count = 0;
if (native_immediates_threadsafe_.size() > 0) {
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_.ConcatMove(std::move(native_immediates_threadsafe_));
}
auto drain_list = [&]() {
TryCatchScope try_catch(this);
DebugSealHandleScope seal_handle_scope(isolate());
while (std::unique_ptr<NativeImmediateCallback> head =
native_immediates_.Shift()) {
if (head->is_refed())
ref_count++;
if (head->is_refed() || !only_refed)
// 执行回调
head->Call(this);
head.reset();
};
}
RunAndClearNativeImmediates会执行队列里的回调。对应Worker的JoinThread
void Worker::JoinThread() {
// 阻塞等待子线程结束,执行到这子线程已经结束了
CHECK_EQ(uv_thread_join(&tid_), 0);
thread_joined_ = true;
// 从主线程数据结构中删除该线程对应的实例
env()->remove_sub_worker_context(this);
{
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
// Reset the parent port as we're closing it now anyway.
object()->Set(env()->context(),
env()->message_port_string(),
Undefined(env()->isolate())).Check();
// 子线程退出码
Local<Value> args[] = {
Integer::New(env()->isolate(), exit_code_),
custom_error_ != nullptr ?
OneByteString(env()->isolate(), custom_error_).As<Value>() :
Null(env()->isolate()).As<Value>(),
};
// 执行JS层回调,触发exit事件
MakeCallback(env()->onexit_string(), arraysize(args), args);
}
}
最后我们看一下如果结束正在执行的子线程。在JS中我能可以通过terminate函数终止线程的执行。
terminate(callback) {
this[kHandle].stopThread();
}
Terminate是对C++模块stopThread的封装。
void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
w->Exit(1);
}
void Worker::Exit(int code) {
Mutex::ScopedLock lock(mutex_);
// env_是子线程执行的env
if (env_ != nullptr) {
exit_code_ = code;
Stop(env_);
} else {
stopped_ = true;
}
}
int Stop(Environment* env) {
env->ExitEnv();
return 0;
}
void Environment::ExitEnv() {
set_can_call_into_js(false);
set_stopping(true);
isolate_->TerminateExecution();
SetImmediateThreadsafe([](Environment* env) { uv_stop(env->event_loop()); });
}
我们看到主线程最终通过SetImmediateThreadsafe给子线程所属的env提交了一个任务。子线程在Poll IO阶段会设置停止事件循环的标记,等到下一次事件循环开始的时候,就会跳出事件循环从而结束子线程的执行。
14.4 线程间通信
本节我们看一下线程间通信的过程。
const { Worker, isMainThread, parentPort } = require('worker_threads');
if (isMainThread) {
const worker = new Worker(__filename);
worker.once('message', (message) => {
...
});
worker.postMessage('Hello, world!');
} else {
// 做点耗时的事情
parentPort.once('message', (message) => {
parentPort.postMessage(message);
});
}
我们知道isMainThread在子线程里是false,parentPort就是messageChannel中的一端。用于和主线程通信,所以parentPort.postMessage给对端发送消息,就是给主线程发送消息,我们再看看worker.postMessage(‘Hello, world!’)。
postMessage(...args) {
this[kPublicPort].postMessage(...args);
}
kPublicPort指向的就是messageChannel的一端。this[kPublicPort].postMessage(…args)即给另一端发送消息。我们看一下postMessage的实现。
void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Local<Object> obj = args.This();
Local<Context> context = obj->CreationContext();
TransferList transfer_list;
if (args[1]->IsObject()) {
// 处理transfer_list
}
// 拿到JS层使用的对象所关联的MessagePort
MessagePort* port = Unwrap<MessagePort>(args.This());
port->PostMessage(env, args[0], transfer_list);
}
我们接着看port->PostMessage
Maybe<bool> MessagePort::PostMessage(Environment* env,
Local<Value> message_v,
const TransferList& transfer_v) {
Isolate* isolate = env->isolate();
Local<Object> obj = object(isolate);
Local<Context> context = obj->CreationContext();
Message msg;
// 序列化
Maybe<bool> serialization_maybe =
msg.Serialize(env, context, message_v, transfer_v, obj);
// 拿到操作对端sibling的锁
Mutex::ScopedLock lock(*data_->sibling_mutex_);
// 把消息插入到对端队列
data_->sibling_->AddToIncomingQueue(std::move(msg));
return Just(true);
}
PostMessage通过AddToIncomingQueue把消息插入对端的消息队列我们看一下AddToIncomingQueue
void MessagePortData::AddToIncomingQueue(Message&& message) {
// 加锁操作消息队列
Mutex::ScopedLock lock(mutex_);
incoming_messages_.emplace_back(std::move(message));
// 通知owner
if (owner_ != nullptr) {
owner_->TriggerAsync();
}
}
插入消息队列后,如果有关联的端口,则会通知Libuv。我们继续看TriggerAsync。
void MessagePort::TriggerAsync() {
if (IsHandleClosing()) return;
CHECK_EQ(uv_async_send(&async_), 0);
}
Libuv在Poll IO阶段就会执行对应的回调。回调是在new MessagePort时设置的。
auto onmessage = [](uv_async_t* handle) {
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
channel->OnMessage();
};
// 初始化async结构体,实现异步通信
CHECK_EQ(uv_async_init(env->event_loop(),
&async_,
onmessage), 0);
我们继续看OnMessage。
void MessagePort::OnMessage() {
HandleScope handle_scope(env()->isolate());
Local<Context> context = object(env()->isolate())->CreationContext();
// 接收消息条数的阈值
size_t processing_limit;
{
// 加锁操作消息队列
Mutex::ScopedLock(data_->mutex_);
processing_limit = std::max(data_->incoming_messages_.size(),
static_cast<size_t>(1000));
}
while (data_) {
// 读取的条数达到阈值,通知Libuv下一轮Poll IO阶段继续读
if (processing_limit-- == 0) {
// 通知事件循环
TriggerAsync();
return;
}
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(context);
Local<Value> payload;
// 读取消息
if (!ReceiveMessage(context, true).ToLocal(&payload)) break;
// 没有了
if (payload == env()->no_message_symbol()) break;
Local<Object> event;
Local<Value> cb_args[1];
// 新建一个MessageEvent对象,回调onmessage事件
if (!env()->message_event_object_template()->NewInstance(context)
.ToLocal(&event) ||
event->Set(context, env()->data_string(), payload).IsNothing() ||
event->Set(context, env()->target_string(), object()).IsNothing() ||
(cb_args[0] = event, false) ||
MakeCallback(env()->onmessage_string(),
arraysize(cb_args),
cb_args).IsEmpty()) {
// 如果回调失败,通知Libuv下次继续读
if (data_)
TriggerAsync();
return;
}
}
}
我们看到这里会不断地调用ReceiveMessage读取数据,然后回调JS层。直到达到阈值或者回调失败。我们看一下ReceiveMessage的逻辑。
MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
bool only_if_receiving) {
Message received;
{
// Get the head of the message queue.
// 互斥访问消息队列
Mutex::ScopedLock lock(data_->mutex_);
bool wants_message = receiving_messages_ || !only_if_receiving;
// 没有消息、不需要接收消息、消息是关闭消息
if (data_->incoming_messages_.empty() ||
(!wants_message &&
!data_->incoming_messages_.front().IsCloseMessage())) {
return env()->no_message_symbol();
}
// 获取队列第一个消息
received = std::move(data_->incoming_messages_.front());
data_->incoming_messages_.pop_front();
}
// 是关闭消息则关闭端口
if (received.IsCloseMessage()) {
Close();
return env()->no_message_symbol();
}
// 反序列化后返回
return received.Deserialize(env(), context);
}
ReceiveMessage会消息进行反序列化返回。以上就是线程间通信的整个过程。具体步骤如图14-5所示。
图14-5