调试、诊断子线程最直接的方式就是像调试、诊断主线程一样,但是无论是动态开启还是静态开启,子线程都不可避免地需要内置一些相关的非业务代码,本文介绍另外一种对子线程代码无侵入的调试方式,另外也介绍一下通过子线程调试主线程的方式。

1 初始化子线程的Inspector

在Node.js启动子线程的时候,会初始化Inspector。

  1. env_->InitializeInspector(std::move(inspector_parent_handle_));

在分析InitializeInspector之前,我们先看一下inspectorparent_handle

  1. std::unique_ptr<inspector::ParentInspectorHandle> inspector_parent_handle_;

inspectorparent_handle是一个ParentInspectorHandle对象,这个对象是子线程和主线程通信的桥梁。我们看一下他的初始化逻辑(在主线程里执行)。

  1. inspector_parent_handle_ = env->inspector_agent()->GetParentHandle(thread_id_, url);

调用agent的GetParentHandle获取一个ParentInspectorHandle对象。

  1. std::unique_ptr<ParentInspectorHandle> Agent::GetParentHandle(int thread_id, const std::string& url) {
  2. return client_->getWorkerManager()->NewParentHandle(thread_id, url);
  3. }

内部其实是通过client_->getWorkerManager()对象的NewParentHandle方法获取ParentInspectorHandle对象,接下来我们看一下WorkerManager的NewParentHandle。

  1. std::unique_ptr<ParentInspectorHandle> WorkerManager::NewParentHandle(int thread_id, const std::string& url) {
  2. bool wait = !delegates_waiting_on_start_.empty();
  3. return std::make_unique<ParentInspectorHandle>(thread_id, url, thread_, wait);
  4. }
  5. ParentInspectorHandle::ParentInspectorHandle(
  6. int id, const std::string& url,
  7. std::shared_ptr<MainThreadHandle> parent_thread,
  8. bool wait_for_connect
  9. )
  10. : id_(id),
  11. url_(url),
  12. parent_thread_(parent_thread),
  13. wait_(wait_for_connect) {}

最终的架构图如下入所示。
25-Node.js子线程调试和诊断指南 - 图1
分析完ParentInspectorHandle后继续看一下env->InitializeInspector(std::move(inspector_parent_handle))的逻辑(在子线程里执行)。

  1. int Environment::InitializeInspector(
  2. std::unique_ptr<inspector::ParentInspectorHandle> parent_handle) {
  3. std::string inspector_path;
  4. inspector_path = parent_handle->url();
  5. inspector_agent_->SetParentHandle(std::move(parent_handle));
  6. inspector_agent_->Start(inspector_path,
  7. options_->debug_options(),
  8. inspector_host_port(),
  9. is_main_thread());
  10. }

首先把ParentInspectorHandle对象保存到agent中,然后调用agent的Start方法。

  1. bool Agent::Start(...) {
  2. // 新建client对象
  3. client_ = std::make_shared<NodeInspectorClient>(parent_env_, is_main);
  4. // 调用agent中保存的ParentInspectorHandle对象的WorkerStarted
  5. parent_handle_->WorkerStarted(client_->getThreadHandle(), ...);
  6. }

Agent::Start创建了一个client对象,然后调用ParentInspectorHandle对象的WorkerStarted方法(刚才SetParentHandle的时候保存的),我们看一下这时候的架构图。
25-Node.js子线程调试和诊断指南 - 图2
接着看parenthandle->WorkerStarted。

  1. void ParentInspectorHandle::WorkerStarted(
  2. std::shared_ptr<MainThreadHandle> worker_thread, bool waiting) {
  3. std::unique_ptr<Request> request(
  4. new WorkerStartedRequest(id_, url_, worker_thread, waiting));
  5. parent_thread_->Post(std::move(request));
  6. }

WorkerStarted创建了一个WorkerStartedRequest请求,然后通过parentthread->Post提交,parentthread是MainThreadInterface对象。

  1. void MainThreadInterface::Post(std::unique_ptr<Request> request) {
  2. Mutex::ScopedLock scoped_lock(requests_lock_);
  3. // 之前是空则需要唤醒消费者
  4. bool needs_notify = requests_.empty();
  5. // 消息入队
  6. requests_.push_back(std::move(request));
  7. if (needs_notify) {
  8. // 获取当前对象的一个弱引用
  9. std::weak_ptr<MainThreadInterface>* interface_ptr = new std::weak_ptr<MainThreadInterface>(shared_from_this());
  10. // 请求V8执行RequestInterrupt入参对应的回调
  11. isolate_->RequestInterrupt([](v8::Isolate* isolate, void* opaque) {
  12. // 把执行时传入的参数转成MainThreadInterface
  13. std::unique_ptr<std::weak_ptr<MainThreadInterface>> interface_ptr {
  14. static_cast<std::weak_ptr<MainThreadInterface>*>(opaque)
  15. };
  16. // 判断对象是否还有效,是则调用DispatchMessages
  17. if (auto iface = interface_ptr->lock()) iface->DispatchMessages();
  18. }, static_cast<void*>(interface_ptr));
  19. }
  20. // 唤醒消费者
  21. incoming_message_cond_.Broadcast(scoped_lock);
  22. }

我们看看这时候的架构图。
25-Node.js子线程调试和诊断指南 - 图3
接着看回调里执行MainThreadInterface对象DispatchMessages方法的逻辑。

  1. void MainThreadInterface::DispatchMessages() {
  2. // 遍历请求队列
  3. requests_.swap(dispatching_message_queue_);
  4. while (!dispatching_message_queue_.empty()) {
  5. MessageQueue::value_type task;
  6. std::swap(dispatching_message_queue_.front(), task);
  7. dispatching_message_queue_.pop_front();
  8. // 执行任务函数
  9. task->Call(this);
  10. }
  11. }

task是WorkerStartedRequest对象,看一下Call方法的代码。

  1. void Call(MainThreadInterface* thread) override {
  2. auto manager = thread->inspector_agent()->GetWorkerManager();
  3. manager->WorkerStarted(id_, info_, waiting_);
  4. }

接着调用agent的WorkerManager的WorkerStarted。

  1. void WorkerManager::WorkerStarted(int session_id,
  2. const WorkerInfo& info,
  3. bool waiting) {
  4. children_.emplace(session_id, info);
  5. for (const auto& delegate : delegates_) {
  6. Report(delegate.second, info, waiting);
  7. }
  8. }

WorkerStarted记录了一个id和上下文,因为delegates_初始化的时候是空的,所以不会执行。至此,子线程Inspector初始化的逻辑就分析完了,结构图如下。
25-Node.js子线程调试和诊断指南 - 图4
我们发现,和主线程不一样,主线程会启动一个WebSocket服务器接收客户端的连接请求,而子线程只是初始化了一些数据结构。下面我们看一下基于这些数据结构,主线程是如何动态开启调试子线程的。

2 主线程开启调试子线程的能力

我们可以以以下方式开启对子线程的调试。

  1. const { Worker, workerData } = require('worker_threads');
  2. const { Session } = require('inspector');
  3. // 新建一个新的通信通道
  4. const session = new Session();
  5. session.connect();
  6. // 创建子线程
  7. const worker = new Worker('./httpServer.js', {workerData: {port: 80}});
  8. // 子线程启动成功后开启调试子线程的能力
  9. worker.on('online', () => {
  10. session.post("NodeWorker.enable",
  11. {waitForDebuggerOnStart: false},
  12. (err) => {
  13. err && console.log("NodeWorker.enable", err);
  14. });
  15. });
  16. // 防止主线程退出
  17. setInterval(() => {}, 100000);

我们先来分析一下connect函数的逻辑。

  1. connect() {
  2. this[connectionSymbol] = new Connection((message) => this[onMessageSymbol](message));
  3. }

新建了一个Connection对象并传入一个回调函数,该回调函数在收到消息时被回调。Connection是C++层导出的对象,由模版类JSBindingsConnection实现。

  1. template <typename ConnectionType>
  2. class JSBindingsConnection {}

我们看看导出的路逻辑。

  1. JSBindingsConnection<Connection>::Bind(env, target);

接着看Bind。

  1. static void Bind(Environment* env, Local<Object> target) {
  2. // class_name是Connection
  3. Local<String> class_name = ConnectionType::GetClassName(env);
  4. Local<FunctionTemplate> tmpl = env->NewFunctionTemplate(JSBindingsConnection::New);
  5. tmpl->InstanceTemplate()->SetInternalFieldCount(1);
  6. tmpl->SetClassName(class_name);
  7. tmpl->Inherit(AsyncWrap::GetConstructorTemplate(env));
  8. env->SetProtoMethod(tmpl, "dispatch", JSBindingsConnection::Dispatch);
  9. env->SetProtoMethod(tmpl, "disconnect", JSBindingsConnection::Disconnect);
  10. target->Set(env->context(),
  11. class_name,
  12. tmpl->GetFunction(env->context()).ToLocalChecked())
  13. .ToChecked();
  14. }

当我们在JS层执行new Connection的时候,就会执行JSBindingsConnection::New。

  1. static void New(const FunctionCallbackInfo<Value>& info) {
  2. Environment* env = Environment::GetCurrent(info);
  3. Local<Function> callback = info[0].As<Function>();
  4. new JSBindingsConnection(env, info.This(), callback);
  5. }

我们看看新建一个JSBindingsConnection对象时的逻辑。

  1. JSBindingsConnection(Environment* env,
  2. Local<Object> wrap,
  3. Local<Function> callback)
  4. : AsyncWrap(env, wrap, PROVIDER_INSPECTORJSBINDING),
  5. callback_(env->isolate(), callback) {
  6. Agent* inspector = env->inspector_agent();
  7. session_ = LocalConnection::Connect(
  8. inspector, std::make_unique<JSBindingsSessionDelegate>(env, this)
  9. );
  10. }
  11. static std::unique_ptr<InspectorSession> Connect(
  12. Agent* inspector,
  13. std::unique_ptr<InspectorSessionDelegate> delegate
  14. ) {
  15. return inspector->Connect(std::move(delegate), false);
  16. }

最终是传入了一个JSBindingsSessionDelegate对象调用Agent的Connect方法。

  1. std::unique_ptr<InspectorSession> Agent::Connect(
  2. std::unique_ptr<InspectorSessionDelegate> delegate,
  3. bool prevent_shutdown) {
  4. int session_id = client_->connectFrontend(std::move(delegate),
  5. prevent_shutdown);
  6. // JSBindingsConnection对象的session_字段指向的对象
  7. return std::unique_ptr<InspectorSession>(
  8. new SameThreadInspectorSession(session_id, client_)
  9. );
  10. }

Agent的Connect方法继续调用client_->connectFrontend。

  1. int connectFrontend(std::unique_ptr<InspectorSessionDelegate> delegate,
  2. bool prevent_shutdown) {
  3. int session_id = next_session_id_++;
  4. channels_[session_id] = std::make_unique<ChannelImpl>(env_,
  5. client_,
  6. getWorkerManager(),
  7. std::move(delegate),
  8. getThreadHandle(),
  9. prevent_shutdown);
  10. return session_id;
  11. }

connectFrontend新建了一个ChannelImpl对象,在新建ChannelImpl时,会初始化子线程处理的逻辑。

  1. explicit ChannelImpl(Environment* env,
  2. const std::unique_ptr<V8Inspector>& inspector,
  3. std::shared_ptr<WorkerManager> worker_manager,
  4. std::unique_ptr<InspectorSessionDelegate> delegate,
  5. std::shared_ptr<MainThreadHandle> main_thread_,
  6. bool prevent_shutdown)
  7. : delegate_(std::move(delegate)), prevent_shutdown_(prevent_shutdown),
  8. retaining_context_(false) {
  9. session_ = inspector->connect(CONTEXT_GROUP_ID, this, StringView());
  10. // Node.js拓展命令的处理分发器
  11. node_dispatcher_ = std::make_unique<protocol::UberDispatcher>(this);
  12. // trace相关
  13. tracing_agent_ = std::make_unique<protocol::TracingAgent>(env, main_thread_);
  14. tracing_agent_->Wire(node_dispatcher_.get());
  15. // 处理子线程相关
  16. if (worker_manager) {
  17. worker_agent_ = std::make_unique<protocol::WorkerAgent>(worker_manager);
  18. worker_agent_->Wire(node_dispatcher_.get());
  19. }
  20. // 处理runtime
  21. runtime_agent_ = std::make_unique<protocol::RuntimeAgent>();
  22. runtime_agent_->Wire(node_dispatcher_.get());
  23. }

我们这里只关注处理子线程相关的逻辑。看一下 workeragent->Wire。

  1. void WorkerAgent::Wire(UberDispatcher* dispatcher) {
  2. frontend_.reset(new NodeWorker::Frontend(dispatcher->channel()));
  3. NodeWorker::Dispatcher::wire(dispatcher, this);
  4. auto manager = manager_.lock();
  5. workers_ = std::make_shared<NodeWorkers>(frontend_, manager->MainThread());
  6. }

这时候的架构图如下
25-Node.js子线程调试和诊断指南 - 图5
接着看一下NodeWorker::Dispatcher::wire(dispatcher, this)的逻辑。

  1. void Dispatcher::wire(UberDispatcher* uber, Backend* backend)
  2. {
  3. std::unique_ptr<DispatcherImpl> dispatcher(new DispatcherImpl(uber->channel(), backend));
  4. uber->setupRedirects(dispatcher->redirects());
  5. uber->registerBackend("NodeWorker", std::move(dispatcher));
  6. }

首先新建了一个DispatcherImpl对象。

  1. DispatcherImpl(FrontendChannel* frontendChannel, Backend* backend)
  2. : DispatcherBase(frontendChannel)
  3. , m_backend(backend) {
  4. m_dispatchMap["NodeWorker.sendMessageToWorker"] = &DispatcherImpl::sendMessageToWorker;
  5. m_dispatchMap["NodeWorker.enable"] = &DispatcherImpl::enable;
  6. m_dispatchMap["NodeWorker.disable"] = &DispatcherImpl::disable;
  7. m_dispatchMap["NodeWorker.detach"] = &DispatcherImpl::detach;
  8. }

除了初始化一些字段,另外了一个kv数据结构,这个是一个路由配置,后面我们会看到它的作用。新建完DispatcherImpl后又调用了uber->registerBackend(“NodeWorker”, std::move(dispatcher))注册该对象。

  1. void UberDispatcher::registerBackend(const String& name, std::unique_ptr<protocol::DispatcherBase> dispatcher)
  2. {
  3. m_dispatchers[name] = std::move(dispatcher);
  4. }

这时候的架构图如下。
25-Node.js子线程调试和诊断指南 - 图6
我们看到这里其实是建立了一个路由体系,后面收到命令时就会根据这些路由配置进行转发,类似Node.js Express框架路由机制。这时候可以通过session的post给主线程发送NodeWorker.enable命令来开启子线程的调试。我们分析这个过程。

  1. post(method, params, callback) {
  2. // 忽略参数处理
  3. // 保存请求对应的回调
  4. if (callback) {
  5. this[messageCallbacksSymbol].set(id, callback);
  6. }
  7. // 调用C++的dispatch
  8. this[connectionSymbol].dispatch(JSONStringify(message));
  9. }

this[connectionSymbol]对应的是JSBindingsConnection对象。

  1. static void Dispatch(const FunctionCallbackInfo<Value>& info) {
  2. Environment* env = Environment::GetCurrent(info);
  3. JSBindingsConnection* session;
  4. ASSIGN_OR_RETURN_UNWRAP(&session, info.Holder());
  5. if (session->session_) {
  6. session->session_->Dispatch(
  7. ToProtocolString(env->isolate(), info[0])->string());
  8. }
  9. }

session_是一个SameThreadInspectorSession对象。

  1. void SameThreadInspectorSession::Dispatch(
  2. const v8_inspector::StringView& message) {
  3. auto client = client_.lock();
  4. client->dispatchMessageFromFrontend(session_id_, message);
  5. }
  6. void dispatchMessageFromFrontend(int session_id, const StringView& message) {
  7. channels_[session_id]->dispatchProtocolMessage(message);
  8. }

最终调用了ChannelImpl的dispatchProtocolMessage。

  1. void dispatchProtocolMessage(const StringView& message) {
  2. std::string raw_message = protocol::StringUtil::StringViewToUtf8(message);
  3. std::unique_ptr<protocol::DictionaryValue> value =
  4. protocol::DictionaryValue::cast(protocol::StringUtil::parseMessage(
  5. raw_message, false));
  6. int call_id;
  7. std::string method;
  8. // 解析命令
  9. node_dispatcher_->parseCommand(value.get(), &call_id, &method);
  10. // 判断命令是V8内置命令还是Node.js拓展的命令
  11. if (v8_inspector::V8InspectorSession::canDispatchMethod(
  12. Utf8ToStringView(method)->string())) {
  13. session_->dispatchProtocolMessage(message);
  14. } else {
  15. node_dispatcher_->dispatch(call_id, method, std::move(value),
  16. raw_message);
  17. }
  18. }

因为NodeWorker.enable是Node.js拓展的命令,所以会走到else里面的逻辑。根据路由配置找到该命令对应的处理逻辑(NodeWorker.enable以.切分,对应两级路由)。

  1. void UberDispatcher::dispatch(int callId, const String& in_method, std::unique_ptr<Value> parsedMessage, const ProtocolMessage& rawMessage)
  2. {
  3. // 找到一级路由配置
  4. protocol::DispatcherBase* dispatcher = findDispatcher(method);
  5. std::unique_ptr<protocol::DictionaryValue> messageObject = DictionaryValue::cast(std::move(parsedMessage));
  6. // 交给一级路由处理器处理
  7. dispatcher->dispatch(callId, method, rawMessage, std::move(messageObject));
  8. }

NodeWorker.enable对应的路由处理器代码如下

  1. void DispatcherImpl::dispatch(int callId, const String& method, const ProtocolMessage& message, std::unique_ptr<protocol::DictionaryValue> messageObject)
  2. {
  3. // 查找二级路由
  4. std::unordered_map<String, CallHandler>::iterator it = m_dispatchMap.find(method);
  5. protocol::ErrorSupport errors;
  6. // 找到处理函数
  7. (this->*(it->second))(callId, method, message, std::move(messageObject), &errors);
  8. }

dispatch继续寻找命令对应的处理函数,最终找到NodeWorker.enable命令的处理函数为DispatcherImpl::enable。

  1. void DispatcherImpl::enable(...)
  2. {
  3. std::unique_ptr<DispatcherBase::WeakPtr> weak = weakPtr();
  4. DispatchResponse response = m_backend->enable(...);
  5. // 返回响应给命令(类似请求/响应模式)
  6. weak->get()->sendResponse(callId, response);
  7. }

根据架构图可以知道m_backend是WorkerAgent对象。

  1. DispatchResponse WorkerAgent::enable(bool waitForDebuggerOnStart) {
  2. auto manager = manager_.lock();
  3. std::unique_ptr<AgentWorkerInspectorDelegate> delegate(new AgentWorkerInspectorDelegate(workers_));
  4. event_handle_ = manager->SetAutoAttach(std::move(delegate));
  5. return DispatchResponse::OK();
  6. }

继续调用WorkerManager的SetAutoAttach方法。

  1. std::unique_ptr<WorkerManagerEventHandle> WorkerManager::SetAutoAttach(
  2. std::unique_ptr<WorkerDelegate> attach_delegate) {
  3. int id = ++next_delegate_id_;
  4. // 保存delegate
  5. delegates_[id] = std::move(attach_delegate);
  6. const auto& delegate = delegates_[id];
  7. // 通知子线程
  8. for (const auto& worker : children_) {
  9. Report(delegate, worker.second, false);
  10. }
  11. ...
  12. }

SetAutoAttach遍历子线程。

  1. void Report(const std::unique_ptr<WorkerDelegate>& delegate,
  2. const WorkerInfo& info, bool waiting) {
  3. if (info.worker_thread)
  4. delegate->WorkerCreated(info.title, info.url, waiting, info.worker_thread);
  5. }

info是一个WorkerInfo对象,该对象是子线程初始化和主线程建立关系的数据结构。delegate是AgentWorkerInspectorDelegate对象。

  1. void WorkerCreated(const std::string& title,
  2. const std::string& url,
  3. bool waiting,
  4. std::shared_ptr<MainThreadHandle> target) override {
  5. workers_->WorkerCreated(title, url, waiting, target);
  6. }

workers_是一个NodeWorkers对象。

  1. void NodeWorkers::WorkerCreated(const std::string& title,
  2. const std::string& url,
  3. bool waiting,
  4. std::shared_ptr<MainThreadHandle> target) {
  5. auto frontend = frontend_.lock();
  6. std::string id = std::to_string(++next_target_id_);
  7. // 处理数据通信的delegate
  8. auto delegate = thread_->MakeDelegateThreadSafe(
  9. std::unique_ptr<InspectorSessionDelegate>(
  10. new ParentInspectorSessionDelegate(id, shared_from_this())
  11. )
  12. );
  13. // 建立和子线程V8 Inspector的通信通道
  14. sessions_[id] = target->Connect(std::move(delegate), true);
  15. frontend->attachedToWorker(id, WorkerInfo(id, title, url), waiting);
  16. }

WorkerCreated建立了一条和子线程通信的通道,然后通知命令的发送方通道建立成功。这时候架构图如下。
25-Node.js子线程调试和诊断指南 - 图7
接着看attachedToWorker。

  1. void Frontend::attachedToWorker(const String& sessionId, std::unique_ptr<protocol::NodeWorker::WorkerInfo> workerInfo, bool waitingForDebugger)
  2. {
  3. std::unique_ptr<AttachedToWorkerNotification> messageData = AttachedToWorkerNotification::create()
  4. .setSessionId(sessionId)
  5. .setWorkerInfo(std::move(workerInfo))
  6. .setWaitingForDebugger(waitingForDebugger)
  7. .build();
  8. // 触发NodeWorker.attachedToWorker
  9. m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeWorker.attachedToWorker", std::move(messageData)));
  10. }

继续看sendProtocolNotification

  1. void sendProtocolNotification(
  2. std::unique_ptr<Serializable> message) override {
  3. sendMessageToFrontend(message->serializeToJSON());
  4. }
  5. void sendMessageToFrontend(const StringView& message) {
  6. delegate_->SendMessageToFrontend(message);
  7. }

这里的delegate_是一个JSBindingsSessionDelegate对象。

  1. void SendMessageToFrontend(const v8_inspector::StringView& message)
  2. override {
  3. Isolate* isolate = env_->isolate();
  4. HandleScope handle_scope(isolate);
  5. Context::Scope context_scope(env_->context());
  6. MaybeLocal<String> v8string = String::NewFromTwoByte(isolate,
  7. message.characters16(),
  8. NewStringType::kNormal, message.length()
  9. );
  10. Local<Value> argument = v8string.ToLocalChecked().As<Value>();
  11. // 收到消息执行回调
  12. connection_->OnMessage(argument);
  13. }
  14. // 执行JS层回调
  15. void OnMessage(Local<Value> value) {
  16. MakeCallback(callback_.Get(env()->isolate()), 1, &value);
  17. }

JS层回调逻辑如下。

  1. [onMessageSymbol](message) {
  2. const parsed = JSONParse(message);
  3. // 收到的消息如果是某个请求的响应,则有个id字段记录了请求对应的id,否则则触发事件
  4. if (parsed.id) {
  5. const callback = this[messageCallbacksSymbol].get(parsed.id);
  6. this[messageCallbacksSymbol].delete(parsed.id);
  7. if (callback) {
  8. callback(null, parsed.result);
  9. }
  10. } else {
  11. this.emit(parsed.method, parsed);
  12. this.emit('inspectorNotification', parsed);
  13. }
  14. }

主线程拿到Worker Session对一个的id,后续就可以通过命令NodeWorker.sendMessageToWorker加上该id和子线程通信。大致原理如下,主线程通过自己的channel和子线程的channel进行通信,从而达到控制子线程的目的。
25-Node.js子线程调试和诊断指南 - 图8
我们分析一下NodeWorker.sendMessageToWorker命令的逻辑,对应处理函数为DispatcherImpl::sendMessageToWorker。

  1. void DispatcherImpl::sendMessageToWorker(...)
  2. {
  3. std::unique_ptr<DispatcherBase::WeakPtr> weak = weakPtr();
  4. DispatchResponse response = m_backend->sendMessageToWorker(in_message, in_sessionId);
  5. // 响应
  6. weak->get()->sendResponse(callId, response);
  7. return;
  8. }

继续分析m_backend->sendMessageToWorker。

  1. DispatchResponse WorkerAgent::sendMessageToWorker(const String& message,
  2. const String& sessionId) {
  3. workers_->Receive(sessionId, message);
  4. return DispatchResponse::OK();
  5. }
  6. void NodeWorkers::Receive(const std::string& id, const std::string& message) {
  7. auto it = sessions_.find(id);
  8. it->second->Dispatch(Utf8ToStringView(message)->string());
  9. }

sessions_对应的是和子线程的通信的数据结构CrossThreadInspectorSession。看一下该对象的Dispatch方法。

  1. void Dispatch(const StringView& message) override {
  2. state_.Call(&MainThreadSessionState::Dispatch,
  3. StringBuffer::create(message));
  4. }

再次调了MainThreadSessionState::Dispatch

  1. void Dispatch(std::unique_ptr<StringBuffer> message) {
  2. session_->Dispatch(message->string());
  3. }

session_是SameThreadInspectorSession对象。继续看它的Dispatch方法。

  1. void SameThreadInspectorSession::Dispatch(
  2. const v8_inspector::StringView& message) {
  3. auto client = client_.lock();
  4. client->dispatchMessageFromFrontend(session_id_, message);
  5. }
  6. void dispatchMessageFromFrontend(int session_id, const StringView& message) {
  7. channels_[session_id]->dispatchProtocolMessage(message);
  8. }

通过层层调用,最终拿到了一个合子线程通信的channel,dispatchProtocolMessage方法刚才已经分析过,该方法会根据命令做不同的处理,因为我们这里发送的是V8内置的命令,所以会交给V8 Inspector处理。当V8 Inspector处理完后,会通过ChannelImpl的sendResponse返回结果。

  1. void sendResponse(
  2. int callId,
  3. std::unique_ptr<v8_inspector::StringBuffer> message) override {
  4. sendMessageToFrontend(message->string());
  5. }
  6. void sendMessageToFrontend(const StringView& message) {
  7. delegate_->SendMessageToFrontend(message);
  8. }

这里的delegate_是ParentInspectorSessionDelegate对象。

  1. void SendMessageToFrontend(const v8_inspector::StringView& msg) override {
  2. std::string message = protocol::StringUtil::StringViewToUtf8(msg);
  3. workers_->Send(id_, message);
  4. }
  5. void NodeWorkers::Send(const std::string& id, const std::string& message) {
  6. auto frontend = frontend_.lock();
  7. if (frontend)
  8. frontend->receivedMessageFromWorker(id, message);
  9. }
  10. void Frontend::receivedMessageFromWorker(const String& sessionId, const String& message)
  11. {
  12. std::unique_ptr<ReceivedMessageFromWorkerNotification> messageData = ReceivedMessageFromWorkerNotification::create()
  13. .setSessionId(sessionId)
  14. .setMessage(message)
  15. .build();
  16. // 触发NodeWorker.receivedMessageFromWorker
  17. m_frontendChannel->sendProtocolNotification(InternalResponse::createNotification("NodeWorker.receivedMessageFromWorker", std::move(messageData)));
  18. }

m_frontendChannel是主线程的ChannelImpl对象。

  1. void sendProtocolNotification(
  2. std::unique_ptr<Serializable> message) override {
  3. sendMessageToFrontend(message->serializeToJSON());
  4. }
  5. void sendMessageToFrontend(const StringView& message) {
  6. delegate_->SendMessageToFrontend(message);
  7. }

delegate_是C++层传入的JSBindingsSessionDelegate对象。最终通过JSBindingsSessionDelegate对象回调JS层,之前已经分析过就不再赘述。至此,主线程就具备了控制子线程的能力,但是控制方式有很多种。

2.1 使用通用的V8命令

通过下面代码收集子线程的CPU Profile信息。

  1. const { Worker, workerData } = require('worker_threads');
  2. const { Session } = require('inspector');
  3. const session = new Session();
  4. session.connect();
  5. let id = 1;
  6. function post(sessionId, method, params, callback) {
  7. session.post('NodeWorker.sendMessageToWorker', {
  8. sessionId,
  9. message: JSON.stringify({ id: id++, method, params })
  10. }, callback);
  11. }
  12. session.on('NodeWorker.attachedToWorker', (data) => {
  13. post(data.params.sessionId, 'Profiler.enable');
  14. post(data.params.sessionId, 'Profiler.start');
  15. // 收集一段时间后提交停止收集命令
  16. setTimeout(() => {
  17. post(data.params.sessionId, 'Profiler.stop');
  18. }, 10000)
  19. });
  20. session.on('NodeWorker.receivedMessageFromWorker', ({ params: { message }}) => {
  21. const data = JSON.parse(message);
  22. console.log(data);
  23. });
  24. const worker = new Worker('./httpServer.js', {workerData: {port: 80}});
  25. worker.on('online', () => {
  26. session.post("NodeWorker.enable",{waitForDebuggerOnStart: false}, (err) => { console.log(err, "NodeWorker.enable");});
  27. });
  28. setInterval(() => {}, 100000);

通过这种方式可以通过命令控制子线程的调试和数据收集。

2.2 在子线程中动态执行脚本

可以通过执行脚本开启子线程的WebSocket服务,像调试主线程一样。

  1. const { Worker, workerData } = require('worker_threads');
  2. const { Session } = require('inspector');
  3. const session = new Session();
  4. session.connect();
  5. let workerSessionId;
  6. let id = 1;
  7. function post(method, params) {
  8. session.post('NodeWorker.sendMessageToWorker', {
  9. sessionId: workerSessionId,
  10. message: JSON.stringify({ id: id++, method, params })
  11. });
  12. }
  13. session.on('NodeWorker.receivedMessageFromWorker', ({ params: { message }}) => {
  14. const data = JSON.parse(message);
  15. console.log(data);
  16. });
  17. session.on('NodeWorker.attachedToWorker', (data) => {
  18. workerSessionId = data.params.sessionId;
  19. post("Runtime.evaluate", {
  20. includeCommandLineAPI: true,
  21. expression: `const inspector = process.binding('inspector');
  22. inspector.open();
  23. inspector.url();
  24. `
  25. }
  26. );
  27. });
  28. const worker = new Worker('./httpServer.js', {workerData: {port: 80}});
  29. worker.on('online', () => {
  30. session.post("NodeWorker.enable",{waitForDebuggerOnStart: false}, (err) => { err && console.log("NodeWorker.enable", err);});
  31. });
  32. setInterval(() => {}, 100000);

执行上面的代码就拿到以下输出

  1. {
  2. id: 1,
  3. result: {
  4. result: {
  5. type: 'string',
  6. value: 'ws://127.0.0.1:9229/c0ca16c8-55aa-4651-9776-fca1b27fc718'
  7. }
  8. }
  9. }

通过该地址,客户端就可以对子线程进行调试了。上面代码里使用process.binding而不是require加载inspector,因为刚才通过NodeWorker.enable命令为子线程创建了一个到子线程Inspector的channel,而JS模块里判断如果channel非空则报错Inspector已经打开。所以这里需要绕过这个限制,直接加载C++模块开启WebSocket服务器。

3 子线程调试主线程

不仅可以通过主线程调试子线程,还可以通过子线程调试主线程。Node.js在子线程暴露了connectToMainThread方法连接到主线程的Inspector(只能在work_threads中使用),实现的原理和之前分析的类似,主要是子线程连接到主线程的V8 Inspector,通过和该Inspector完成对主线程的控制。看下面一个例子。
主线程代码

  1. const { Worker, workerData } = require('worker_threads');
  2. const http = require('http');
  3. const worker = new Worker('./worker.js', {workerData: {port: 80}});
  4. http.createServer((_, res) => {
  5. res.end('main');
  6. }).listen(8000);

worker.js代码如下

  1. const fs = require('fs');
  2. const { workerData: { port } } = require('worker_threads');
  3. const { Session } = require('inspector');
  4. const session = new Session();
  5. session.connectToMainThread();
  6. session.post('Profiler.enable');
  7. session.post('Profiler.start');
  8. setTimeout(() => {
  9. session.post('Profiler.stop', (err, data) => {
  10. if (data.profile) {
  11. fs.writeFileSync('./profile.cpuprofile', JSON.stringify(data.profile));
  12. }
  13. });
  14. }, 5000)