skynet socket

网络部分是一个服务器最基础最核心的部分,这个技术也已经是非常成熟了,现在已经很少有人自己实现一个网络相关的库了。

skynet的网络库是自己实现的。
skynet 网络层的设计目的是,把操作系统层面的 socket 数据从系统内核复制到用户空间,然后再把用户空间的数据地址交给各个不同的服务使用,同时也把用户空间需要发送的数据转移到系统内核中。
只不过一个优秀的网络框架会提供更多的功能和细节优化,例如:socket 的管理、网络读写分离、读写数据缓存等等。

还需要特别注意一点的是,因为 skynet 是一个消息驱动的模型,所以 skynet 的网络模块使用了一种叫 self pipe trick 1的技巧来唤醒异步调用,实现细节是利用了 pipe() 的“半双工”管道 + select,为了解决可在网络线程在即时处理网络事件的同时,也能即时处理来自非网络(比如内部消息队列)的其它消息。

121575990-8d372180-ca5a-11eb-88ce-6125a229cfff

epoll和kqueue

实际上云风只实现了epoll和kqueue,windows上的变种请自行搜索吧。

epoll和kqueue的实现分别在socket_epoll.h和socket_kqueue.h当中。epoll的函数其实就是epoll_create/epoll_ctl/epoll_del/epoll_wait这几个,要注意的是skynet中的epoll_create的参数是1024。所以连接数上不去的话很可能就是这里限制了。
skynet在socket_poll.h中根据平台的不同包含了不同的头文件,屏蔽了平台相关性。然后在socket_server.c中实现了网络服务的逻辑。
然后skynet在skynet_socket.c中对socket_server.c中的逻辑再次做了一个封装,还添加了socket客户端相关的函数,就是connect/send/close之类的函数。
为了方便lua层使用socket,在lua-socket.c中再将对skynet_socket.c进行了一次封装。这个封装就是c语言层和lua语言层的相互转换。目前只支持tcp和udp,基于tcp上的http/websocket之类统统是不支持的。

为什么操作一个网络要费这么大的劲呢,绕来绕去非常的不直观。因为skynet是基于消息的,而且每个服务都有一个monitor,每个消息处理的时候要尽可能的短,这样才不会阻塞服务里其它的请求。而connect这种明显是阻塞的,当然也可以写成非阻塞的,但是非阻塞的话,你需要不断地挂起,因为非阻塞实际上是基于select技术来实现的。而不断地挂起,这个就很麻烦,写起来很痛苦而且很容易出错。因此云风把这些都放到网络线程中来做,这样就不会影响工作线程。但是这样做也有它的缺点,那就是网络线程可能会被阻塞,网络线程被阻塞就会导致服务无响应。或者导致大量的数据包积累,引起波峰。

epoll封装层

./skynet-src/socket_poll.h
网络服务模块通常会有一个大的循环来读取网络消息,skynet也不例外,socket_server_poll函数就是来干这事的。在这个循环中将会有两个不同来源的消息系统,一个是管道消息,另一个则是网络消息了。管道消息后面会提到。网络消息是通过epoll模型的epoll_wait来读取的,采用默认的水平触发模式,这样连续读取数据较为简单。

运行流程:

1.在skynet_start() 中 调用 skynet_socket_init() 初始化socket服务
2.每个socket 服务都有 写缓存队列,所以框架会异步的实现读写。
3.socket 的open close listen apect 等操作是通过给 socket_server 的管道写入请求信息,在server_poll循环中再去处理他。
4.socket 在发送数据时 会尝试的直接发送数据!如果不能直接发送数据 才会把数据写入 socket 对应的写缓存 。
1425134-20190404141706446-2018988792

以 PTYPE_SOCKET 类型的消息发送给发起请求服务

skynet 的 C API 采用异步读写,你可以使用 C 调用,监听一个端口,或发起一个 TCP 连接。但具体的操作结果要等待 skynet 的事件回调。skynet 会把结果以 PTYPE_SOCKET 类型的消息发送给发起请求的服务。(参考skynet_socket.h)
在处理实际业务中,这样的 API 很难使用,所以又提供了一组阻塞模式的 lua API 用于 TCP socket 的读写。它是对 C API 的封装。
所谓阻塞模式,实际上是利用了 lua 的 coroutine 机制。当你调用 socket api 时,服务有可能被挂起(时间片被让给其他业务处理),待结果通过 socket 消息返回,coroutine 将延续执行。

网络管理器

管理器对应了网络库中的 socket_server 实例(或称对象),这里只简单阐述下它的作用,我会在后面分别做详细的剖析。

它主要作用大致可分为两个部分:

  • socket 的管理,包括 socket的存储、socket 的id分配以及socket 状态的维护等等
  • 网络事件的处理,包括 efd 的创建、管道的创建与读写处理(用于内部命令)、外部网络事件的捕获与读写处理等。关于网络(内部和外部)的读写处理封装在 socket_server_poll 接口中,供网络线程调用

管道

在 skynet 中,管道配合 select 使用,管道为“服务”操控网络模块提供了支持。那我们要思考一下,为什么在已经有了 epoll/kqueue 后,还需要配置使用 pipe + select?

这里就涉及到了我之前文章提到的 self pipe trick 技巧,也就是 pipe 的异步使用场景。以 epoll 举例(本文后面都以epoll为例),一般我们使用 epoll 做网络开发时,基本遵循以下套路:

1
2
3
4
5
6
7
8
9
10
11
12
events = epoll_wait(efd, timeout);
for fd, event in events {
if event & EPOLLIN {
读取数据
}
if event & EPOLLOUT {
写入数据
}
if event & EPOLLERR {
错误处理
}
}

在单线程模式下,以上的处理流程是完全没有问题的,但是在多线程的模式下,问题就出现了。就拿 skynet 来说,当一个工作线程中的 lua 服务需要对网络模块执行一些操作时,例如关闭某个socket,在工作线程发出请求后,网络线程却正处于 epoll wait 状态,也就是网络线程阻塞了,epoll 没有网络事件的话,只能等待 wait 超时后,才能有机会处理工作线程发来的请求,而且若 timeout = -1 时,则会导致 epoll_wait 无限期阻塞,这显然很有可能会导致工作线程的请求长时间不被处理,“不幸的是”,skynet 就是将 timeout 设为 -1。
为了解决上面的问题,管道就派上用场,它是半双工的,一端写入,另一端读取,我们只需要把读端 fd 注册到 epoll 中,这就解决了内部请求的唤醒问题,倘如外部网络连接一直没有收据到达,epoll 则会阻塞 wait,此时工作线程写入数据到管道中,就可以“激活” epoll 并处理内部命令。

事件

一般来说,在使用多路复用IO模型时,会对事件进行二次封装或者二次处理。例如 skynet 中 epoll wait 流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static int 
sp_wait(int efd, struct event *e, int max) {
struct epoll_event ev[max];
int n = epoll_wait(efd , ev, max, -1);
int i;
for (i=0;i<n;i++) {
e[i].s = ev[i].data.ptr;
unsigned flag = ev[i].events;
e[i].write = (flag & EPOLLOUT) != 0;
e[i].read = (flag & (EPOLLIN | EPOLLHUP)) != 0; // 需关注EPOLLHUP
e[i].error = (flag & EPOLLERR) != 0;
e[i].eof = false;
}

return n;
}

由 epoll 捕获的事件,会被封装成 struct event,然后把这些封装过的事件交由网络线程处理。它的结构体定义如下:

1
2
3
4
5
6
7
struct event {
void * s; // socket 实例指针,若s为null,表示是管道事件
bool read; // 可读事件标识
bool write; // 可写事件标识
bool error; // 错误事件标识
bool eof; // 文件尾(end-of-file)标识,kqueue专用
};

这里需注意两个小细节:

  • 事件的用户数据指针s。当把一个 fd 注册到 event poll 中时,若该 fd 是一个外部网络连接则 s 指向一个 socket 实例;若该 fd 是管道的读端则 *s 为null,这样在线程调用socket_server_poll时就能根据 *s 的值把内部消息流程和外部消息流程分隔,即管道的内部命令处理都使用 select,而外部网络消息使用 epoll/kqueue**。
  • read 标识需要关注 EPOLLHUP。该事件是一个“不可标记的事件”,也就是说不需要手动注册事件,该事件被触发的核心原则是收到了对端发送过来的 RST 包,例如:向一个对端已经close的 fd 写入数据。实验代码贴在了gist。

socket

skynet 把网络连接都封装成 struct socket,其结构体定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
struct socket {
// 基础信息
int fd; // 网络 fd
int id; // 自增,范围在 0x01 ~ 0x7fffffff
uint8_t protocol; // 协议类型,支持TCP和UDP
uint8_t type; // socket类型(称为“状态”更合适)
uintptr_t opaque; // 与之关联的服务id
union {
int size;
uint8_t udp_address[UDP_ADDRESS_SIZE];
} p; // 联合体,tcp使用size表示每次读取数据的字节数;udp使用udp_address表示远端地址信息

// 写入队列
struct wb_list high; // 高优先级写入队列
struct wb_list low; // 低优先级写入队列
int64_t wb_size; // 待写入的字节数,0表示没有数据需要写入,过大则会发出报警

// 统计和警告
struct socket_stat stat; // 读写统计信息(读写的字节总数、最后读写时间)
int64_t warn_size; // 待写入数据过大时报警的阈值

// 直接写入
struct spinlock dw_lock; // 直接写入的锁(后面会详细解释)
int dw_offset; // 已经直接写入的数据大小
const void * dw_buffer; // 直接写入的数据指针
size_t dw_size; // 直接写入的数据总量
volatile uint32_t sending; // 是否正在发送数据
uint16_t udpconnecting; // udp 正在连接标识
};

可以看到一个 socket 携带了很多信息,这些字段整理后可以分为四部分:

  • 基础信息,例如 socket fd、socket id、协议类型等信息
  • 写入队列相关,包括高优先级和低优先级两种写入队列
  • 直接写入(direct write),包括锁(dw_lock)、直接写入的数据指针、大小和偏移量等等
  • 统计和警告,包括读写数据、读写时间的统计以及待写入数据过载时的警告

每当 accept 函数获得一个新的 socket id 后,并不会立即收到这个 socket 上的数据。这是因为,我们有时会希望把这个 socket 的操作权转让给别的服务去处理。
socket 的 id 对于整个 skynet 节点都是公开的。也就是说,你可以把 id 这个数字通过消息发送给其它服务,其他服务也可以去操作它。任何一个服务只有在调用 socket.start(id) 之后,才可以收到这个 socket 上的数据。skynet 框架是根据调用 start 这个 api 的位置来决定把对应 socket 上的数据转发到哪里去的。
向一个 socket id 写数据也需要先调用 start ,但写数据不限制在调用 start 的同一个服务中。也就是说,你可以在一个服务中调用 start ,然后在另一个服务中向其写入数据。skynet 可以保证一次 write 调用的原子性。即,如果你有多个服务同时向一个 socket id 写数据,每个写操作的串不会被分割开。

接下来我会对我认为需要深入的部分做更仔细的剖析,它们包括:socket id 的生成规则、socket 的状态以及状态的切换、socket 数据发送(直接发送和队列发送)。

ID 生成

通常,应用层不会直接使用内核返回的 fd 作为一个 socket 连接的唯一id,因为内核会复用 fd,因此内核的 fd 并不具有唯一性,例如服务器收到两个了连接,fd 分别是 1 和 2,当连接 2 被关闭后,此时新的连接到来,内核可能复用 fd = 2 分配给新的连接,这就引发一个问题,若应用层还需要对之前关闭的连接 2 做一些收尾操作,而新的连接又复用的连接 2 的 fd,新旧连接的冲突就发生了。
为了解决以上问题,一般服务器都有一套自己的 socket id 生成规则,大部分都是自增 id 作为一个连接的唯一 id。skynet 采用的也是类似方式,且 skynet 把 socket 实例的 id 与 socket 实例所在的数组下标做了哈希映射,其哈希算法也非常简单,就是对 id 做取模运算,取模后的值即为 socket 实例的数组下标。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 哈希映射
#define HASH_ID(id) (((unsigned)id) % MAX_SOCKET)

// 生成socket id
static int
reserve_id(struct socket_server *ss) {
int i;
for (i=0;i<MAX_SOCKET;i++) {
int id = ATOM_INC(&(ss->alloc_id));
if (id < 0) {
id = ATOM_AND(&(ss->alloc_id), 0x7fffffff);
}
struct socket *s = &ss->slot[HASH_ID(id)];
if (s->type == SOCKET_TYPE_INVALID) {
if (ATOM_CAS(&s->type, SOCKET_TYPE_INVALID, SOCKET_TYPE_RESERVE)) {
s->id = id;
s->protocol = PROTOCOL_UNKNOWN;
// socket_server_udp_connect may inc s->udpconncting directly (from other thread, before new_fd),
// so reset it to 0 here rather than in new_fd.
s->udpconnecting = 0;
s->fd = -1;
return id;
} else {
// retry
--i;
}
}
}
return -1;
}

代码很简单易懂,其核心逻辑:循环查找 slot 数组中的 socket 实例,同时自增 alloc_id 并通过 HASH_ID 计算得到 slot 数组下标,直到找到一个处于空闲状态的的 socket 实例。该算法计算得到的 id 分部如下图所示:
捕获

socket状态

socket 的状态目前一共有 9 种,其宏定义如下:

1
2
3
4
5
6
7
8
9
#define SOCKET_TYPE_INVALID 0		// 初始状态
#define SOCKET_TYPE_RESERVE 1 // 保留状态
#define SOCKET_TYPE_PLISTEN 2 // 监听前状态
#define SOCKET_TYPE_LISTEN 3 // 监听中状态
#define SOCKET_TYPE_CONNECTING 4 // 连接中状态
#define SOCKET_TYPE_CONNECTED 5 // 已连接状态
#define SOCKET_TYPE_HALFCLOSE 6 // 半关闭状态(能写入但会丢弃读取的数据)
#define SOCKET_TYPE_PACCEPT 7 // 已接收,但是未添加到epoll(需要start)
#define SOCKET_TYPE_BIND 8 // 绑定系统fd(stdin、stdout、stderr)

这些状态有的属于公共状态,每个 socket 实例都能适用,例如 INVALID 和 RESERVE;有的状态则只适用于与之对应的网络操作,例如 PLISTEN 和 LISTEN 属于与监听(listen)关联的状态。
首先介绍下两个公共状态的作用和意义:

  • INVALID 是一个 socket 的初始状态,表示这个 socket 实例暂时未被使用,即可用状态,可用参考上面的 ID 生成 那一节内容
  • RESERVE 保留状态,它表示一个 socket 实例正处于某些网络操作(例如listen、connect)的开始和正式结束的“中间”状态

这里我们思考一个问题,为什么需要 RESERVE 中间状态呢?
这是因为 skynet 是一个多线程框架,为了保证工作线程尽量少的出现阻塞调用(服务都是由工作线程驱动),就需要把网络操作中的阻塞部分交由网络线程处理,当网络线程处理完阻塞逻辑后,抛出消息异步通知给服务,而 RESERVE 状态则确保了工作线程在发起网络调用后能立即返回一个可用的 socket 实例并保留住,以便异步消息的回调。正如云风所说:

目前的设计是,所有网络请求,都通过把指令写到一个进程内的 pipe ,串行化到网络处理线程,依次处理,然后再把结果投> 递到 skynet 的其它服务中。

除了公共状态,其余的状态与网络调用相对应,这些网络调用包括:监听连接、被动连接、主动连接、关闭连接。

  • 监听,skynet 把监听操作分成两个步骤:建立监听socket、绑定并监听(有阻塞,如 getaddrinfo);手动将监听 fd 注册到 epoll/kqueue 中
    adc3x-9j2im
  • 被动连接,同样分为两个步骤:发起主动连接请求;注册连接 fd 到 epoll/kqueue 中
    a2gj4-3mpt0
  • 主动连接,同样分为两个步骤:发起主动连接请求;注册连接 fd 到 epoll/kqueue 中。
    aky9g-917zw
  • 关闭连接,分为正常关闭(close)和强制停止(shutdown)
    ajhg1-e8j3q
    从上面这些网络操作的时序图中,我们可以看到它们都采用了异步消息通知的机制,网络调用由服务发起,通过内部命令管道转发给网络库,最终由网络线程执行具体的逻辑。这也印证了RESERVE状态的必要性。

需要注意的是,skynet 网络库使用的系统网络 API 基本都是非阻塞的,除了 getaddrinfo,它是一个阻塞 API,linux 提供的各DNS API函数都是阻塞式的,无法设置超时时间等(关于此 api,官方的wiki也有说明);另外,由 socket() 创建的 fd 也基本会设置成非阻塞(nonblocking)模式,包括 connect 的 fd,只有 listen fd 为阻塞。

写入队列

一个 socket 实例有两个发送队列:高优先级队列、低优先级队列。它们的结构相同,只是在网络线程发送数据时有优先顺序,高优先级队列中的数据会优先发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct write_buffer {
struct write_buffer * next;
const void *buffer;
char *ptr;
size_t sz;
bool userobject;
uint8_t udp_address[UDP_ADDRESS_SIZE];
};

// 发送队列
struct wb_list {
struct write_buffer * head;
struct write_buffer * tail;
};

关于这两个队列的处理规则,可参考云风《对 skynet 的 gate 服务的重构》文中所述:
socket 发送规则如下:

如果 socket 可写,且两个队列都为空,立即发送。
如果上一次有一个数据包没有发送完,无论它在哪个队列里,都确保先将其发送完。
如果高优先级队列有数据包,先保证发送高优先级队列上的数据。
如果高优先级队列为空,且低优先级队列不为空,发送一个低优先级队列上的数据包。

reference

深入理解skynet —— 网络库(一) 详见:https://domicat.me/_posts/2020-05-20-learn-skynet-network1/
pipe异步使用场景 详见:https://www.zhihu.com/question/39752285/answer/82906915


En la vida hay dos palabras que te abrirán muchas puertas…?? ???TIRE y EMPUJE!!!