周一硬核干货:通过Node.js的源码彻底理解EventLoop
nodejs的的事件循环由libuv的uv_run函数实现。在该函数中执行while循环,然后处理各种阶段(phase)的事件回调。事件循环的处理相当于一个消费者,消费由各业务代码生产的任务。下面看一下代码。
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
while (r != 0 && loop->stop_flag == 0) {
// 更新loop的time字段
uv__update_time(loop);
// 执行超时回调
uv__run_timers(loop);
// 执行pending回调,ran_pending代表pending队列是否为空,即没有节点可以执行
ran_pending = uv__run_pending(loop);
// 继续执行各种队列
uv__run_idle(loop);
uv__run_prepare(loop);
timeout = 0;
// UV_RUN_ONCE并且有pending节点的时候,会阻塞式poll io,默认模式也是
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
// poll io timeout是epoll_wait的超时时间
uv__io_poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handles(loop);
// 还有一次执行超时回调的机会
if (mode == UV_RUN_ONCE) {
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
if (loop->stop_flag != 0)
loop->stop_flag = 0;
return r;
}
libuv分为几个阶段,这个可以从官网去了解到。下面分别分析各个阶段的相关代码。
1 定时器阶段
// 更新loop的time字段
uv__update_time(loop);
// 执行超时回调
uv__run_timers(loop);
首先更新当前的时间,然后判断哪个节点超时了。
static void uv__update_time(uv_loop_t* loop){
loop->time = uv__hrtime(UV_CLOCK_FAST) / 1000000;
}
// 找出已经超时的节点,并且执行里面的回调
void uv__run_timers(uv_loop_t* loop) {
struct heap_node* heap_node;
uv_timer_t* handle;
for (;;) {
heap_node = heap_min(timer_heap(loop));
if (heap_node == NULL)
break;
handle = container_of(heap_node, uv_timer_t, heap_node);
// 如果当前节点的时间大于当前时间则返回,说明后面的节点也没有超时
if (handle->timeout > loop->time)
break;
// 移除该计时器节点,重新插入最小堆,如果设置了repeat的话
uv_timer_stop(handle);
uv_timer_again(handle);
// 执行超时回调
handle->timer_cb(handle);
}
}
libuv以二叉堆的形式维护了超时任务节点,每次判断最快超时的节点有没有超时,没有的话说明后面的节点也不会超时,有的话继续往下判断。定时器在nodejs里的生产者是setTimeout和setInterval。
2 pending阶段
官网解释是在上一轮的poll io阶段没有执行的io回调,会在下一轮循环的pending阶段被执行。我们先看pending阶段的处理。
static int uv__run_pending(uv_loop_t* loop) {
QUEUE* q;
QUEUE pq;
uv__io_t* w;
if (QUEUE_EMPTY(&loop->pending_queue))
return 0;
// 把pending_queue队列的节点移到pq,即清空了pending_queue
QUEUE_MOVE(&loop->pending_queue, &pq);
// 遍历pq队列
while (!QUEUE_EMPTY(&pq)) {
// 取出当前第一个需要处理的节点,即pq.next
q = QUEUE_HEAD(&pq);
// 把当前需要处理的节点移出队列
QUEUE_REMOVE(q);
// 重置一下prev和next指针,因为这时候这两个指针是指向队列中的两个节点
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, pending_queue);
w->cb(loop, w, POLLOUT);
}
return 1;
}
就是把pending队列了的节点逐个执行。然后我们看一下pending队列的节点是如何生产出来的。
void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
if (QUEUE_EMPTY(&w->pending_queue))
QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue);
}
libuv通过uv__io_feed函数生产pending任务,从libuv的代码中我们看到io错误的时候会调这个函数(还有其他情况)。
if (handle->delayed_error)
uv__io_feed(handle->loop, &handle->io_watcher);
最后io关闭的时候会从pending队列移除对应的节点。
void uv__io_close(uv_loop_t* loop, uv__io_t* w) {
uv__io_stop(loop, w, POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI);
QUEUE_REMOVE(&w->pending_queue);
uv__platform_invalidate_fd(loop, w->fd);
}
3 idle阶段
idle节点是自定义的一些任务,也是维护一个任务队列,每次循环的时候,如果队列不为空则逐个执行任务节点。在nodejs中setImmediate的实现使用了idle这个阶段。
// ToggleImmediateRef在timer.js中使用
void Environment::ToggleImmediateRef(bool ref) {
if (ref) {
/*
Idle handle is needed only to stop the event loop from blocking in poll.
防止在poll io中阻塞,有回调则poll io的timeout是0
*/
uv_idle_start(immediate_idle_handle(), [](uv_idle_t*){ });
} else {
uv_idle_stop(immediate_idle_handle());
}
}
idle的处理逻辑可以参考这篇文章libuv之idle、check、prepare---loop-watcher.c
4 prepare阶段
类似idle阶段,自定义的任务队列,是poll io前最后一个阶段。
5 poll io阶段
poll io是处理网络io、文件io的地方。可能会引起nodejs的短暂阻塞。
// 最长阻塞时间
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
看看最长阻塞时间是怎么算的。
int uv_backend_timeout(const uv_loop_t* loop) {
if (loop->stop_flag != 0)
return 0;
if (!uv__has_active_handles(loop) && !uv__has_active_reqs(loop))
return 0;
if (!QUEUE_EMPTY(&loop->idle_handles))
return 0;
if (!QUEUE_EMPTY(&loop->pending_queue))
return 0;
if (loop->closing_handles)
return 0;
return uv__next_timeout(loop);
}
没有需要处理的任务或者有需要处理的回调则不阻塞,否则取定时器二叉堆中最快到期的节点的时间作为阻塞时间。然后进入uv__io_poll。uv__io_poll是经典的epoll处理模式。使用先把业务感兴趣的事件注册到epoll中,然后在epoll_wait中等待事件的到来。最后执行对应事件的回调。下面看一下核心的代码。
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
e.events = w->pevents;
e.data = w->fd;
if (w->events == 0)
op = UV__EPOLL_CTL_ADD;
else
op = UV__EPOLL_CTL_MOD;
uv__epoll_ctl(loop->backend_fd, op, w->fd, &e);
}
nfds = uv__epoll_wait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout);
loop->watchers[loop->nwatchers] = (void*) events;
loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
for (i = 0; i < nfds; i++) {
pe = events + i;
fd = pe->data;
w->cb(loop, w, pe->events);
}
从上面的代码中我们看到我们把感兴趣的事件和回调打包成一个watcher追加到loop->watcher_queue队列,在poll io阶段就会被libuv处理。
6 check阶段
check阶段类似idle和prepare,是用户自定义任务的。nodejs中setImmediate是利用这个阶段实现的,具体可以看这篇文章nodejs之setImmediate源码分析。
7 closing_handles阶段
当一个handle调用uv_close关闭的时候,可以注册一个回调,在closing_handles阶段就会被执行。
void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
handle->flags |= UV_HANDLE_CLOSING;
handle->close_cb = close_cb;
...
uv__make_close_pending(handle);
}
void uv__make_close_pending(uv_handle_t* handle) {
handle->next_closing = handle->loop->closing_handles;
handle->loop->closing_handles = handle;
}
上面代码把给handle绑定了一个close_cb然后插入到closing_handles队列。然后在closing_handles阶段被执行。
static void uv__run_closing_handles(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;
p = loop->closing_handles;
loop->closing_handles = NULL;
while (p) {
q = p->next_closing;
uv__finish_close(p);
p = q;
}
}
static void uv__finish_close(uv_handle_t* handle) {
...
if (handle->close_cb) {
handle->close_cb(handle);
}
}
至此,完成了一轮事件循环。这就是nodejs用libuv实现的事件循环。我们首先要了解各个阶段都是处理什么的,然后把我们的任务直接或者间接地加到对应阶段的任务队列里就可以了。