skynet mq

skynet跟mq扮演的角色类似,每个skynet进程维护了一个MQ,会dispatch msg到每个skynet_context的私有mq。有skynet就没必要再在自己项目里引入MQ了。

消息队列mq

消息队列是skynet的核心功能之一,它的功能说白了就是入队出队,先进先出,这个数据结构都有讲过。源码实现在skynet_mq.h和skynet_mq.c中。

skynet的消息队列实际上是有两种,一种是全局消息队列,一种是服务消息队列。每个服务都有自己的消息队列,每个服务消息队列中都有服务的handle标识。这个涉及到消息的派发,这里就不展开了。每个服务消息队列被全局消息队列引用。
全局消息队列用的是经典的链表来实现的,而服务的消息队列用的是比较不直观,可能对有些人来说理解起来特别困难的循环数组来实现的。而且数组空间不够的时候,会动态扩展,容量扩展为当前容量的2倍。
消息队列的出队入队函数名都比较简单而且明了,push/pop。这个名字可能会带来一定的误解,如果改成enqueue/dequeue的话,就更符合它的实际功能。

消息的封装

网络库的一个核心功能就是收发网络消息(内部命令也可以看成是一类网络消息),且收发流程都会对原始数据做封装处理,对于收包流程来说,需要网络库把收到的数据封装成服务能够处理的数据结构,同时还需要附加一些额外的数据;而对于发包流程而言,同样需要进行数据封装处理。

消息注册

skynet的消息注册,C服务和lua服务设置回调走的函数是不同的。C的回调可以直接调,但是lua的回调不行,它需要一个默认的回调C函数,将返回参数转换为lua能理解的格式,遵循lua的api协议,传递到lua层。
当服务是lua实现的时候,skynet底层核心框架在处理完消息以后,回调lua层服务的回调函数时,要先经过一次lua api协议的处理,将参数准备好以后,然后调用lua服务中的回调函数。

1
2
3
4
5
6
skynet.dispatch(callback)---------------------->proto[typename].dispach = callback
|
skynet.core.call(skynet.dispatch_message)---->tbl[k] = skynet.dispatch_message
|
|
C dispatch_message->_cb---------------------------------------------------|

消息机制之消息处理

skynet的消息机制准备拆成三个部分来讲,第一部分是收包,第二部分是发包,第三部分是事件的处理。

收包

先总结一下,skynet_context_message_dispatch这个函数实际上就是不停地从全局消息队列里取工作队列,取到了以后呢,就一直处理这个队列里的消息。为了避免某个队列占用太多cpu,当前队列处理到一定的量,就把机会让给全局消息队列里的其它工作队列,把自己又放回全局消息队列。而这个处理的量是根据创建线程时thread_param里的weight权重来判定的,权重越大,流转的就越快,也就是说处理某个队列的消息数量就越少。这就是消息处理的主流程机制。
在主流程之外,还有monitor的触发和取消,每次处理前,触发monitor的检查。处理完了,取消monitor的检查。

我们知道,skynet 是一个 actor 模型的框架,actor 之间使用“消息”进行通讯,且“消息”遵循一个统一的格式,就像信封一样,大家都用一套统一通用的格式,才能相互顺畅通信。
在 skynet 服务间流通的消息被封装为 skynet_message,其结构体定义如下:

1
2
3
4
5
6
struct skynet_message {
uint32_t source; // 消息源的服务id
int session; // 消息的session id
void * data; // 消息的payload(指针)
size_t sz; // payload 大小
};

skynet_message 是 skynet 服务能处理消息的唯一格式,即其他模块派发给服务的通知都需要封装成 skynet_message,以便服务能够处理,例如:定时器模块的定时消息、网络模块的内部命令结果和外部网络消息,都转换成 skynet_message,然后发送给对应的服务,可谓“殊途同归”。

在转换过程中,我们需要关注很多细节,包括消息负载数据是否需要拷贝,确定消息类型等等。下图展示了收包过程中数据的封装及数据的流向:
ay1aq-inlky

发包

消息的处理实际上就是对工作队列里的消息不停地调回调函数。那么消息是怎么放进消息队列的呢。带着这个疑问,让我们从lua层开始追根溯源。
在lua层有两个api,一个是skynet.send,这个是非阻塞发消息。另一个是skynet.call,这个是阻塞式发完消息等回应。skynet.call使用一个session来实现等待,这个session实际就是一个自增的数字,溢出了以后又从1开始。
skynet.send实际上就是往目标服务的消息队列里增加一条消息。

这里所说的发包是指向一个外部网络连接发送数据,需要注意的是,发包可能需要依赖内部命令,因为需要把在工作线程无法直接发送的数据,透过内部管道,将这部分数据转交给网络线程发送(发送流程我在下面会有更加详细的描述)。
任何需要交由网络模块发送的数据,都会封装为 socket_sendbuffer,其结构体定义如下:

1
2
3
4
5
6
7
8
9
10
#define SOCKET_BUFFER_MEMORY 0 		// 内存块,能明确知道内存大小
#define SOCKET_BUFFER_OBJECT 1 // 内存指针,需要做相应处理(send_object_init)才能知道数据的真实大小
#define SOCKET_BUFFER_RAWPOINTER 2 // 原始内存,对应lua userdata

struct socket_sendbuffer {
int id; // socket id
int type; // 要发送的数据类型(参见上面的宏定义)
const void *buffer; // 数据指针(这里并非是真实要发送的数据的内存指针)
size_t sz; // 数据大小
};

在 sendbuffer 这个结构体中,buffer是一个指向待发送数据的指针,type则用来区分这个指针的类型。类型分为一下三种:

  • MEMORY,表示大小已知的内存指针,例如通过 concat_table 得到的字符串数据;
  • OBJECT,表示大小未知的数据对象指针,其真实发送的数据(最终往 fd 中写入的数据)需要二次提取,例如 lua 的 lightuserdata;
  • RAWPOINTER,特指 lua userdata,当该类型的数据透传给网络库时,需要进行内存数据拷贝;

此外,在发包过程中,skynet 还做了一些优化,会优先在工作线程直接发送,若无法直发,则透传给网络线程发送。下图展示了发包过程中数据封装和流向:
ac0pt-9dpgn

poll 流程

在前面已经介绍过网络线程的主要逻辑,在这一节我将“庖丁解牛”般的拆解网络库的 event poll 流程,即socket_server_poll接口,对其进行梳理后,可以分为三个部分:内部处理、事件捕获、事件处理,以下是经过简化后的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
int 
socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
for (;;) {
if (ss->checkctrl) {
// 内部命令处理
if (has_cmd(ss)) {
int type = ctrl_cmd(ss, result);
...
} else {
ss->checkctrl = 0;
}
}

if (ss->event_index == ss->event_n) {
// 事件wait
ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
ss->checkctrl = 1;
...
}

...

// 事件处理
switch (s->type) {
case SOCKET_TYPE_CONNECTING:
// 主动连接
return report_connect(ss, s, &l, result);
case SOCKET_TYPE_LISTEN: {
// 被动连接
int ok = report_accept(ss, s, result);
...
}
case SOCKET_TYPE_INVALID:
// 非法socket
fprintf(stderr, "socket-server: invalid socket\n");
break;
default:
if (e->read) {
// 读取处理
...
}

if (e->write) {
// 写入处理
...
}

if (e->error) {
// 错误处理
...
}

if(e->eof) {
// 文件尾(for kqueue)
...
}
break;
}
}
}

内部命令处理是指在进程内的服务与网络库之间的通讯,例如在 lua 服务中监听一个端口,或发起一个 TCP 连接。这些内部命令,通过网络库提供的管道传递到网络库,最终由网络线程执行命令。网络线程在捕获到事件后,会优先处理所有的内部命令(如果有内部消息的话),具体的命令处理流程可查阅 ctrl_cmd 函数,这里不再详述,需要注意该函数的返回值 type,当 type = -1 表示内部命令还不能返回确切的结果(例如命令’L’),或者这个内部命令不需要返回结果(例如命令’T’),当 type > -1 则表示该命令已经执行成功且能返回明确的结果给服务。

关于返回值 type 的宏定义如下:

1
2
3
4
5
6
7
8
#define SOCKET_DATA 0   		// socket 接收到了数据
#define SOCKET_CLOSE 1 // socket 被关闭
#define SOCKET_OPEN 2 // socket 连接成功(主动连接、被动连接等),返回此值也就表示连接已经真正可用
#define SOCKET_ACCEPT 3 // 接收到新的连接(需要后续 start 才能使用)
#define SOCKET_ERR 4 // socket 错误,需要关闭
#define SOCKET_EXIT 5 // 退出网络线程
#define SOCKET_UDP 6 // 收到 UDP 包
#define SOCKET_WARNING 7 // socket 报警(待发送的数据过大)

此外,当 type 为SOCKET_CLOSE或SOCKET_ERR(向一个已关闭的fd发送数据)时,表示连接已经关闭,则需要回收这个 socket 。

事件捕获部分就较为简单,对于 epoll 就是 epoll_wait,对于 kqueue 就是 kenvet,唯一需要注意的点是 wait api 使用的无限期阻塞,即没有事件则一直阻塞。

事件处理主要负责网络连接的处理,包括对外的主动连接和外部的被动连接。有事件的 socket 依据其状态有不同的事件处理流程,如对于 CONNECTING 的 socket 连接,则会完成之前发起的主动连接请求并上报给服务;对于 LISTEN 的 socket,则接收新连接(未start)并上报给服务;对于其他已经建立好连接,则对其进行读、写以及错误处理。

内部命令

内部命令是指进程内服务发送给网络库有关网络操作的消息,这些消息经由管道透传到网络库,这里主要介绍各个内部命令的功能及其封装结构。
我们知道,在一个通道上要实现信息通信,那么在这个通道上流通的消息就需要遵循统一格式的消息封装。就像 skynet 中两个服务之间进行通讯,就需要把消息封装成 skynet_message; 再如当和 mysql 数据库进行交互时,就需要消息包遵循 mysql 数据包格式;同样,内部命令也会封装成统一的格式在管道上传递,其封装结构体定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
struct request_package {
uint8_t header[8]; // 6 bytes dummy,头部,前6个字节预留,第7个字节表示命令类型,第8个字节表示命令内容的长度
union {
char buffer[256];
struct request_open open;
struct request_send send;
struct request_send_udp send_udp;
struct request_close close;
struct request_listen listen;
struct request_bind bind;
struct request_start start;
struct request_setopt setopt;
struct request_udp udp;
struct request_setudp set_udp;
} u;
uint8_t dummy[256]; // 预留256字节
};

所有的命令内容都封装在一个大小为 256 字节的联合体 u 内,具体的消息长度由 char header[7] 控制,这就是为什么联合体需要一个 char buffer[256] 的字符数组;命令类型由 char header[6] 控制,一个大写字母代表一种内部命令。下面列出了目前支持的命令:

1
2
3
4
5
6
7
8
9
10
11
12
S Start socket
B Bind socket
L Listen socket
K Close socket
O Connect to (Open)
X Exit
D Send package (high)
P Send package (low)
A Send UDP package
T Set opt
U Create UDP socket
C set udp address

为什么会使用一个 char header[8] 作为头部,而只使用最后两个字节,这样做的原因是要考虑结构体的内存对齐问题。假如我们只用 char header[2] 来作为头部,那么会在往管道写入数据时,会因为内存对齐的原因,导致写入的命令头部和内容之间存在6个“未初始化”的字节。
下图展示了内部命令的内存结构以及管道的工作流程:
a2vwr-4myvo

事件处理

网络线程在捕获到事件后,若是管道的事件,则走内部命令处理流程,而剩下的网络事件,则根据不同的 socket 状态(或类型)做不同的处理。

主动连接

其核心逻辑在 open_socket 函数中,这里我抠出关键部分的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// return -1 when connecting
static int
open_socket(struct socket_server *ss, struct request_open * request, struct socket_message *result) {
...

int sock= -1;
for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next ) {
sock = socket( ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol );
if ( sock < 0 ) {
continue;
}
socket_keepalive(sock);
sp_nonblocking(sock); // 设置为非阻塞
status = connect( sock, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
if ( status != 0 && errno != EINPROGRESS) {
close(sock);
sock = -1;
continue;
}
break;
}

...

if(status == 0) {
ns->type = SOCKET_TYPE_CONNECTED;
...
return SOCKET_OPEN;
} else {
ns->type = SOCKET_TYPE_CONNECTING;
sp_write(ss->event_fd, ns->fd, ns, true);
}

...
}

上面的代码是一个标准的非阻塞 connect,我们需要关注 connect(sock, …) api 的返回值 status,如果返回 0,则表示连接已经建立,这通常是在服务器和客户在同一台主机上时发生;如果返回 -1,则需要关注 errno,若 errno = EINPROGRESS,表示连接建立,建立启动但是尚未完成,则需要把这个 sock 注册到 epoll 中,并关注该描述符的可写事件,当 epoll/kqueue 捕获到可写事件时,且该 socket 的状态为 CONNECTING,就表示连接已经建立。

接收新连接

该事件处理主要为监听 fd 服务,当 listen_fd 收到新连接到来时,则生成一个新的 socket 实例,然后上报给 listen_fd 所绑定的那个服务,处理流程较为简单。可能需要注意的点是,新接收到的 socket 需要服务对其 start,才能把新连接注册到 epoll/kqueue 中。

消息读取

网络库读取网络消息的流程非常简单,它并没有提供组包功能(组包功能由上层自己实现,我会在下一篇文章深入探讨),其处理流程就是从有读事件的 socket 实例中的 fd 读取固定字节长度的数据,然后经过一系列的封装和转换,最终把这些读取到的数据封装成 skynet_message(具体的封装流程在本文消息的封装一节已经有详细的叙述)并插入到与该 socket 实例所绑定服务的消息队列中去。

关于 TCP 连接每次读取数据的容量(即socket结构体的p.size字段)大小有以下规则:每个 socket 的初始的读取容量为 64 字节,在接下来的读取中,若读取到的数据大小等于读取容量,则将读取容量扩大一倍;若读取到的数据大小小于容量的一半且容量大小大于初始容量,则将容量缩小一倍。其具体实现如下:

1
2
3
4
5
6
// n=本次读取到的数据大小;sz=该socket当前的读取容量
if (n == sz) {
s->p.size *= 2;
} else if (sz > MIN_READ_BUFFER && n*2 < sz) {
s->p.size /= 2;
}

关于 UDP 连接的数据读取则不需要关心读取容量的变化,接收到的 UDP 包数据会先暂存在 socket_server 的 udpbuffer 字段中,该字段是一个长度为 65535 的字节数组。

消息写入

网络库对于发送网络数据有一系列优化,其中主要有:工作线程直接写入(direct write)和 高低优先级写入队列。
在早前的版本,网络数据的收发都是在网络线程中处理的,后来云风做了优化,在写入数据时(服务发起网络写入请求,而服务又是由工作线程驱动),先尝试直接在工作线程发送,若能直接发送则在工作线程直接向 fd 写入数据,这里有个细节需要注意,若数据无法一次性全部写入,则需要把当前发送的数据整个 clone 并保存到 dw_buffer,同时记录下已经写入的数据大小到 dw_offset,这些未发送完的数据会在网络线程发送数时插入到高优先级队列中,所以不用担心数据丢失或发送乱序。关于直接发送的具体流程,可参考云风文章中的描述:

1
2
3
4
5
当每次要写数据时,先检查一下该 fd 中发送队列是否为空,如果为空的话,就尝试直接在当前工作线程发送(这往往是大多数情况)。发送成功就皆大欢喜,如果失败或部分发送,则把没发送的数据放在 socket 结构中,并开启 epoll 的可写事件。

网络线程每次发送待发队列前,需要先检查有没有直接发送剩下的部分,有则加到队列头,然后再依次发送。

当然 udp 会更简单一些,一是 udp 包没有部分发送的可能,二是 udp 不需要保证次序。所以 udp 立即发送失败后,可以直接按原流程扔到发送队列尾即可。

一个优化是将 socket 的发送队列由一个队列拆分成高优先级、低优先级两个队列,消息要插入到哪个队列由上层自行控制。例如:游戏拍卖行的数据、排行榜数据就可以放入低优先级队列中,因为这些消息的实时性不需要那么高;而心跳包、战斗包就需要放入高优先级队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/*
Each socket has two write buffer list, high priority and low priority.

1. send high list as far as possible.
2. If high list is empty, try to send low list.
3. If low list head is uncomplete (send a part before), move the head of low list to empty high list (call raise_uncomplete) .
4. If two lists are both empty, turn off the event. (call check_close)
*/
static int
send_buffer_(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
assert(!list_uncomplete(&s->low));
// step 1,优先发送高优先级队列
if (send_list(ss,s,&s->high,l,result) == SOCKET_CLOSE) {
return SOCKET_CLOSE;
}
if (s->high.head == NULL) {
// step 2,高优先级队列为空,则发送低优先级队列
if (s->low.head != NULL) {
if (send_list(ss,s,&s->low,l,result) == SOCKET_CLOSE) {
return SOCKET_CLOSE;
}
// step 3,若低优先级队列未发送完,则提升一个头部的包到高优先级队列
if (list_uncomplete(&s->low)) {
raise_uncomplete(s);
return -1;
}
if (s->low.head)
return -1;
}
// step 4,两个队列都发送完,则关闭写事件
assert(send_buffer_empty(s) && s->wb_size == 0);
sp_write(ss->event_fd, s->fd, s, false);

if (s->type == SOCKET_TYPE_HALFCLOSE) {
// 若 socket 为半关闭状态,发送完数据后完全关闭并回收该 socket
force_close(ss, s, l, result);
return SOCKET_CLOSE;
}
if(s->warn_size > 0){
s->warn_size = 0;
result->opaque = s->opaque;
result->id = s->id;
result->ud = 0;
result->data = NULL;
return SOCKET_WARNING;
}
}

return -1;
}

reference

深入理解skynet —— 网络库(二) 详见:https://domicat.me/_posts/2020-05-21-learn-skynet-network2/


Todos te dan un consejo, cuando lo que necesitas es guita.