协程库libtask源码分析之架构篇
前言:假设读者已经了解了协程的概念、实现协程的底层技术支持。本文会介绍基于底层基础,如何实现协程以及协程的应用。
libtask是google大佬Russ Cox(Go的核心开发者)所写。libtask非常有意思,为数不多的代码就可以让人了解和理解协程的具体应用,很值得学习,我感兴趣的点在于如何在服务器中使用协程,传统的服务器,基本都是多进程、多线程、池化、单线程/多线程多路复用等等,而libtask使用少量的代码就让我看到了如何使用协程写一个服务器,非常赞(源码分析https://github.com/theanarkh/read-libtask-code)。
我们从libtask的main函数开始,这个main函数就是我们在c语言中使用的c函数,libtask本身实现了main这个函数,用户使用libtask时,要实现的是taskmain函数。taskmain和main的函数声明是一样的。下面我们看一下main函数。
int
main
(
int
argc
,
char
*
*
argv
)
{
struct
sigaction
sa
,
osa
;
// 注册SIGQUIT信号处理函数
memset
(
&
sa
,
0
,
sizeof
sa
)
;
sa
.
sa_handler
=
taskinfo
;
sa
.
sa_flags
=
SA_RESTART
;
sigaction
(
SIGQUIT
,
&
sa
,
&
osa
)
;
// 保存命令行参数
argv0
=
argv
[
0
]
;
taskargc
=
argc
;
taskargv
=
argv
;
if
(
mainstacksize
==
0
)
mainstacksize
=
256
*
1024
;
// 创建第一个协程
taskcreate
(
taskmainstart
,
nil
,
mainstacksize
)
;
// 开始调度
taskscheduler
(
)
;
fprint
(
2
,
"taskscheduler returned in main!\n"
)
;
abort
(
)
;
return 0;
}
main函数主要的两个逻辑是taskcreate和taskscheduler函数。我们先来看taskcreate。
int
taskcreate
(
void
(
*
fn
)
(
void
*
)
,
void
*
arg
,
uint stack
)
{
int
id
;
Task
*
t
;
t
=
taskalloc
(
fn
,
arg
,
stack
)
;
taskcount
++
;
id
=
t
->
id
;
// 记录位置
t
->
alltaskslot
=
nalltask
;
// 保存到alltask中
alltask
[
nalltask
++
]
=
t
;
// 修改状态为就绪,可以被调度,并且加入到就绪队列
taskready
(
t
)
;
return id;
}
taskcreate首先调用taskalloc分配一个表示协程的结构体Task。我们看看这个结构体的定义。
struct
Task
{
char
name
[
256
]
;
// offset known to acid
char
state
[
256
]
;
// 前后指针
Task
*
next
;
Task
*
prev
;
Task
*
allnext
;
Task
*
allprev
;
// 执行上下文
Context context
;
// 睡眠时间
uvlong alarmtime
;
uint id
;
// 栈信息
uchar
*
stk
;
uint stksize
;
//是否退出了
int
exiting
;
// 在alltask的索引
int
alltaskslot
;
// 是否是系统协程
int
system
;
// 是否就绪状态
int
ready
;
// 入口函数
void
(
*
startfn
)
(
void
*
)
;
// 入口参数
void
*
startarg
;
// 自定义数据
void *udata;
};
接着看看taskalloc的实现。
// 分配一个协程所需要的内存和初始化某些字段
static Task*taskalloc(void (*fn)(void*), void *arg, uint stack){
Task
*
t
;
sigset_t zero
;
uint x
,
y
;
ulong z
;
/* allocate the task and stack together */
// 结构体本身的大小+栈大小
t
=
malloc
(
sizeof
*
t
+
stack
)
;
memset
(
t
,
0
,
sizeof
*
t
)
;
// 栈的内存位置
t
->
stk
=
(
uchar
*
)
(
t
+
1
)
;
// 栈大小
t
->
stksize
=
stack
;
// 协程id
t
->
id
=
++
taskidgen
;
// 协程工作函数和参数
t
->
startfn
=
fn
;
t
->
startarg
=
arg
;
/* do a reasonable initialization */
memset
(
&
t
->
context
.
uc
,
0
,
sizeof
t
->
context
.
uc
)
;
sigemptyset
(
&
zero
)
;
// 初始化uc_sigmask字段为空,即不阻塞信号
sigprocmask
(
SIG_BLOCK
,
&
zero
,
&
t
->
context
.
uc
.
uc_sigmask
)
;
/* must initialize with current context */
// 初始化uc字段
getcontext
(
&
t
->
context
.
uc
)
// 设置协程执行时的栈位置和大小
t
->
context
.
uc
.
uc_stack
.
ss_sp
=
t
->
stk
+
8
;
t
->
context
.
uc
.
uc_stack
.
ss_size
=
t
->
stksize
-
64
;
z
=
(
ulong
)
t
;
y
=
z
;
z
>>=
16
;
/* hide undefined 32-bit shift from 32-bit compilers */
x
=
z
>>
16
;
// 保存信息到uc字段
makecontext
(
&
t
->
context
.
uc
,
(
void
(
*
)
(
)
)
taskstart
,
2
,
y
,
x
)
;
return t;
}
taskalloc函数代码看起来很多,但是逻辑不算复杂,就是申请Task结构体所需的内存和执行时栈的内存,然后初始化各个字段。这样,一个协程就诞生了。接着执行taskready把协程加入就绪队列。
// 修改协程的状态为就绪并加入就绪队列
void taskready(Task *t){
t
->
ready
=
1
;
addtask(&taskrunqueue, t);
}
// 把协程插入队列中,如果之前在其他队列,则会被移除
void addtask(Tasklist *l, Task *t){
if
(
l
->
tail
)
{
l
->
tail
->
next
=
t
;
t
->
prev
=
l
->
tail
;
}
else
{
l
->
head
=
t
;
t
->
prev
=
nil
;
}
l
->
tail
=
t
;
t->next = nil;
}
taskrunqueue记录了所有就绪的协程。创建了协程并加入队列后,协程还没有开始执行,就像操作系统的进程和线程一样,需要有一个调度器来调度执行。下面我们看看调度器的实现。
// 协程调度中心
static void taskscheduler(void){
int
i
;
Task
*
t
;
for
(
;
;
)
{
// 没有用户协程了,则退出
if
(
taskcount
==
0
)
exit
(
taskexitval
)
;
// 从就绪队列拿出一个协程
t
=
taskrunqueue
.
head
;
if
(
t
==
nil
)
{
fprint
(
2
,
"no runnable tasks! %d tasks stalled\n"
,
taskcount
)
;
exit
(
1
)
;
}
// 从就绪队列删除该协程
deltask
(
&
taskrunqueue
,
t
)
;
t
->
ready
=
0
;
// 保存正在执行的协程
taskrunning
=
t
;
// 切换次数加一
tasknswitch
++
;
// 切换到t执行,并且保存当前上下文到taskschedcontext(即下面要执行的代码)
contextswitch
(
&
taskschedcontext
,
&
t
->
context
)
;
// 执行到这说明没有协程在执行(t切换回来的),置空
taskrunning
=
nil
;
// 刚才执行的协程t退出了
if
(
t
->
exiting
)
{
// 不是系统协程,则个数减一
if
(
!
t
->
system
)
taskcount
--
;
// 当前协程在alltask的索引
i
=
t
->
alltaskslot
;
// 把最后一个协程换到当前协程的位置,因为他要退出了
alltask
[
i
]
=
alltask
[
--
nalltask
]
;
// 更新被置换协程的索引
alltask
[
i
]
->
alltaskslot
=
i
;
// 释放堆内存
free
(
t
)
;
}
}
}
调度器的代码看起来很多,但是核心逻辑就三个
1 从就绪队列中拿出一个协程t,并把t移出就绪队列
2 通过contextswitch切换到协程t中执行
3 协程t切换回调度中心,如果t已经退出,则修改数据结构,然后回收他占据的内存。继续调度其他协程执行。
至此,协程就开始跑起来了。并且也有了调度系统。这里的调度机制是比较简单的,就是按着先进先出的方式就绪调度,并且是非抢占的。即没有按时间片调度的概念,一个协程的执行时间由自己决定,放弃执行的权力也是自己控制的,当协程不想执行了可以调用taskyield让出cpu。
// 协程主动让出cpu
int taskyield(void){
int
n
;
// 当前切换协程的次数
n
=
tasknswitch
;
// 插入就绪队列,等待后续的调度
taskready
(
taskrunning
)
;
taskstate
(
"yield"
)
;
// 切换协程
taskswitch
(
)
;
// 等于0说明当前只有自己一个协程,调度的时候tasknswitch加一,所以这里减一
return tasknswitch - n - 1;
}
/*
切换协程,taskrunning是正在执行的协程,taskschedcontext是调度协程(主线程)的上下文,
切换到调度中心,并保持当前上下文到taskrunning->context
*/
void taskswitch(void){
needstack
(
0
)
;
contextswitch
(
&
taskrunning
->
context
,
&
taskschedcontext
)
;
}
// 真正切换协程的逻辑
static
void
contextswitch
(
Context
*
from
,
Context
*
to
)
{
if
(
swapcontext
(
&
from
->
uc
,
&
to
->
uc
)
<
0
)
{
fprint
(
2
,
"swapcontext failed: %r\n"
)
;
assert
(
0
)
;
}
}
yield的逻辑也很简单,因为协程在执行的时候,是不处于就绪队列的,当协程准备让出cpu时,协程首先把自己重新加入到就绪队列,等待下次被调度执行。当然我们也可以直接调度contextswitch切换到其他协程。重点在于什么时候应该让出cpu,又什么时候应该被调度执行。接下来会详细讲解。至此,我们已经有了支持协程所需要的底层基础。我们看到这个实现的思路也不是很复杂,首先有一个队列表示待执行的的协程,每一个协程对应一个Task结构体。然后调度中心不断地按照先进先出的方式去调度协程的执行就可以。因为没有抢占机制,所以调度中心是依赖协程本身去驱动的,协程需要主动让出cpu,把上下文切换回调度中心,调度中心才能进行下一轮的调度。接下来我们看看,基于这些底层基础,如果实现一个基于协程的服务器。下面我们通过一个例子进行讲解。
void
taskmain
(
int
argc
,
char
*
*
argv
)
{
// 启动一个tcp服务器
if
(
(
fd
=
netannounce
(
TCP
,
0
,
atoi
(
argv
[
1
]
)
)
)
<
0
)
{
// ...
}
// 改为非阻塞模式
fdnoblock
(
fd
)
;
// accept成功后创建一个客户端协程
while
(
(
cfd
=
netaccept
(
fd
,
remote
,
&
rport
)
)
>=
0
)
{
taskcreate
(
proxytask
,
(
void
*
)
cfd
,
STACK
)
;
}
}
我们刚才讲过taskmain是我们需要实现的函数,首先通过netannounce建立一个tcp服务器。接着把fd改成非阻塞的,这个非常重要,因为在后面调用accept的时候,如果是阻塞的文件描述符,那么就会引起进程挂起,而非阻塞模式下,操作系统会返回EAGAIN的错误码,通过这个错误码我们可以决定下一步做什么。我们看看netaccept的实现。
// 处理(摘下)连接
int netaccept(int fd, char *server, int *port){
int
cfd
,
one
;
struct
sockaddr_in
sa
;
uchar
*
ip
;
socklen_t len
;
// 注册事件到epoll,等待事件触发
fdwait
(
fd
,
'r'
)
;
len
=
sizeof
sa
;
// 触发后说明有连接了,则执行accept
if
(
(
cfd
=
accept
(
fd
,
(
void
*
)
&
sa
,
&
len
)
)
<
0
)
{
return
-
1
;
}
// 和客户端通信的fd也改成非阻塞模式
fdnoblock
(
cfd
)
;
one
=
1
;
setsockopt
(
cfd
,
IPPROTO_TCP
,
TCP_NODELAY
,
(
char
*
)
&
one
,
sizeof
one
)
;
return cfd;
}
netaccept就是通过调用accept逐个处理tcp连接,但是在accept之前,有一个非常重要的操作fdwait。
// 协程因为等待io需要切换
void fdwait(int fd, int rw){
// 是否已经初始化epoll
if
(
!
startedfdtask
)
{
startedfdtask
=
1
;
epfd
=
epoll_create
(
1
)
;
// 没有初始化则创建一个协程,做io管理
taskcreate
(
fdtask
,
0
,
32768
)
;
}
struct
epoll_event
ev
=
{
0
}
;
// 记录事件对应的协程和感兴趣的事件
ev
.
data
.
ptr
=
taskrunning
;
switch
(
rw
)
{
case
'r'
:
ev
.
events
|=
EPOLLIN
|
EPOLLPRI
;
break
;
case
'w'
:
ev
.
events
|=
EPOLLOUT
;
break
;
}
int
r
=
epoll_ctl
(
epfd
,
EPOLL_CTL_ADD
,
fd
,
&
ev
)
;
// 切换到其他协程,等待被唤醒
taskswitch
(
)
;
// 唤醒后函数刚才注册的事件
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ev);
}
fdwait首先把fd注册到epoll中,然后把协程切换到下一个待执行的协程。这里有个细节,当协程X被调度执行的时候,他是脱离了就绪队列的,而taskswitch函数只是实现了切换上下文到调度中心,调度中心会从就绪队列从选择下一个协程执行,那么这时候,脱离就绪队列的协程X就处于孤岛状态,看起来再也无法给调度中心选中执行,这个问题的处理方式是,把协程、fd和感兴趣的事件信息一起注册到epoll中,当epoll监听到某个fd的事件发生时,就会把对应的协程加入就绪队列,这样协程就可以被调度执行了。在fdwait函数一开始那里处理了epoll相关的逻辑。epoll的逻辑也是在一个协程中执行的,但是epoll所在协程和一般协程不一样,类似于操作系统的内核线程一样,epoll所在的协程成为系统协程,即不是用户定义的,而是系统定义的。我们看一下实现
void
fdtask
(
void
*
v
)
{
int
i
,
ms
;
Task
*
t
;
uvlong now
;
// 变成系统协程
tasksystem
(
)
;
struct
epoll_event
events
[
1000
]
;
for
(
;
;
)
{
/* let everyone else run */
// 大于0说明还有其他就绪协程可执行,则先让给他们执行,否则往下执行
while
(
taskyield
(
)
>
0
)
;
/* we're the only one runnable - poll for i/o */
errno
=
0
;
// 没有定时事件则一直阻塞
if
(
(
t
=
sleeping
.
head
)
==
nil
)
ms
=
-
1
;
else
{
/* sleep at most 5s */
now
=
nsec
(
)
;
if
(
now
>=
t
->
alarmtime
)
ms
=
0
;
else
if
(
now
+
5
*
1000
*
1000
*
1000LL
>=
t
->
alarmtime
)
ms
=
(
t
->
alarmtime
-
now
)
/
1000000
;
else
ms
=
5000
;
}
int
nevents
;
// 等待事件发生,ms是等待的超时时间
if
(
(
nevents
=
epoll_wait
(
epfd
,
events
,
1000
,
ms
)
)
<
0
)
{
if
(
errno
==
EINTR
)
continue
;
fprint
(
2
,
"epoll: %s\n"
,
strerror
(
errno
)
)
;
taskexitall
(
0
)
;
}
/* wake up the guys who deserve it */
// 事件触发,把对应协程插入就绪队列
for
(
i
=
0
;
i
<
nevents
;
i
++
)
{
taskready
(
(
Task
*
)
events
[
i
]
.
data
.
ptr
)
;
}
now
=
nsec
(
)
;
// 处理超时事件
while
(
(
t
=
sleeping
.
head
)
&&
now
>=
t
->
alarmtime
)
{
deltask
(
&
sleeping
,
t
)
;
if
(
!
t
->
system
&&
--
sleepingcounted
==
0
)
taskcount
--
;
taskready
(
t
)
;
}
}
}
我们看到epoll的处理逻辑和一般服务器的类似,通过epoll_wait阻塞,然后epoll_wait返回时,处理每一个发生的事件,而且libtask还支持超时事件。另外libtask中当还有其他就绪协程的时候,是不会进入epoll_wait的,它会把cpu让给就绪的协程(通过taskyield函数),当就绪队列只有epoll所在的协程时才会进入epoll的逻辑。至此,我们看到了libtask中如何把异步变成同步的。当用户要调用一个可能会引起进程挂起的接口时,就可以调用libtask提供的一个相应的API,比如我们想读一个文件,我们可以调用libtask的fdread。
int
fdread
(
int
fd
,
void
*
buf
,
int
n
)
{
int
m
;
// 非阻塞读,如果不满足则再注册到epoll,参考fdread1
while
(
(
m
=
read
(
fd
,
buf
,
n
)
)
<
0
&&
errno
==
EAGAIN
)
fdwait
(
fd
,
'r'
)
;
return m;
}
这样就不需要担心进程被挂起,同时也不需要处理epoll相关的逻辑(注册事件,事件触发时的处理等等)。异步转同步,libtask的方式就是通过提供对应的API,先把用户的fd注册到epoll中,然后切换到其他协程,等epoll监听到事件触发时,就会把对应的协程插入就绪队列,当该协程被调度中心选中执行时,就会继续执行剩下的逻辑而不会引起进程挂起,因为这时候所等待的条件已经满足。
总结:libtask的设计思想就是把业务逻辑封装到一个个协程中,由libtask实现协程的调度,在各个业务逻辑中进行切换,从而驱动着系统的运行。另外libtask也提供了一个网络和文件io异步变同步的解决方案。使得我们使用起来更加方便,高效。
拓展阅读:
