skynet跟mq扮演的角色类似,每个skynet进程维护了一个MQ,会dispatch msg到每个skynet_context的私有mq。有skynet就没必要再在自己项目里引入MQ了。
消息队列mq
消息队列是skynet的核心功能之一,它的功能说白了就是入队出队,先进先出,这个数据结构都有讲过。源码实现在skynet_mq.h和skynet_mq.c中。
skynet的消息队列实际上是有两种,一种是全局消息队列,一种是服务消息队列。每个服务都有自己的消息队列,每个服务消息队列中都有服务的handle标识。这个涉及到消息的派发,这里就不展开了。每个服务消息队列被全局消息队列引用。
全局消息队列用的是经典的链表来实现的,而服务的消息队列用的是比较不直观,可能对有些人来说理解起来特别困难的循环数组来实现的。而且数组空间不够的时候,会动态扩展,容量扩展为当前容量的2倍。
消息队列的出队入队函数名都比较简单而且明了,push/pop。这个名字可能会带来一定的误解,如果改成enqueue/dequeue的话,就更符合它的实际功能。
消息的封装
网络库的一个核心功能就是收发网络消息(内部命令也可以看成是一类网络消息),且收发流程都会对原始数据做封装处理,对于收包流程来说,需要网络库把收到的数据封装成服务能够处理的数据结构,同时还需要附加一些额外的数据;而对于发包流程而言,同样需要进行数据封装处理。
消息注册
skynet的消息注册,C服务和lua服务设置回调走的函数是不同的。C的回调可以直接调,但是lua的回调不行,它需要一个默认的回调C函数,将返回参数转换为lua能理解的格式,遵循lua的api协议,传递到lua层。
当服务是lua实现的时候,skynet底层核心框架在处理完消息以后,回调lua层服务的回调函数时,要先经过一次lua api协议的处理,将参数准备好以后,然后调用lua服务中的回调函数。
1 | skynet.dispatch(callback)---------------------->proto[typename].dispach = callback |
消息机制之消息处理
skynet的消息机制准备拆成三个部分来讲,第一部分是收包,第二部分是发包,第三部分是事件的处理。
收包
先总结一下,skynet_context_message_dispatch这个函数实际上就是不停地从全局消息队列里取工作队列,取到了以后呢,就一直处理这个队列里的消息。为了避免某个队列占用太多cpu,当前队列处理到一定的量,就把机会让给全局消息队列里的其它工作队列,把自己又放回全局消息队列。而这个处理的量是根据创建线程时thread_param里的weight权重来判定的,权重越大,流转的就越快,也就是说处理某个队列的消息数量就越少。这就是消息处理的主流程机制。
在主流程之外,还有monitor的触发和取消,每次处理前,触发monitor的检查。处理完了,取消monitor的检查。
我们知道,skynet 是一个 actor 模型的框架,actor 之间使用“消息”进行通讯,且“消息”遵循一个统一的格式,就像信封一样,大家都用一套统一通用的格式,才能相互顺畅通信。
在 skynet 服务间流通的消息被封装为 skynet_message,其结构体定义如下:
1 | struct skynet_message { |
skynet_message 是 skynet 服务能处理消息的唯一格式,即其他模块派发给服务的通知都需要封装成 skynet_message,以便服务能够处理,例如:定时器模块的定时消息、网络模块的内部命令结果和外部网络消息,都转换成 skynet_message,然后发送给对应的服务,可谓“殊途同归”。
在转换过程中,我们需要关注很多细节,包括消息负载数据是否需要拷贝,确定消息类型等等。下图展示了收包过程中数据的封装及数据的流向:
发包
消息的处理实际上就是对工作队列里的消息不停地调回调函数。那么消息是怎么放进消息队列的呢。带着这个疑问,让我们从lua层开始追根溯源。
在lua层有两个api,一个是skynet.send,这个是非阻塞发消息。另一个是skynet.call,这个是阻塞式发完消息等回应。skynet.call使用一个session来实现等待,这个session实际就是一个自增的数字,溢出了以后又从1开始。
skynet.send实际上就是往目标服务的消息队列里增加一条消息。
这里所说的发包是指向一个外部网络连接发送数据,需要注意的是,发包可能需要依赖内部命令,因为需要把在工作线程无法直接发送的数据,透过内部管道,将这部分数据转交给网络线程发送(发送流程我在下面会有更加详细的描述)。
任何需要交由网络模块发送的数据,都会封装为 socket_sendbuffer,其结构体定义如下:
1 |
|
在 sendbuffer 这个结构体中,buffer是一个指向待发送数据的指针,type则用来区分这个指针的类型。类型分为一下三种:
- MEMORY,表示大小已知的内存指针,例如通过 concat_table 得到的字符串数据;
- OBJECT,表示大小未知的数据对象指针,其真实发送的数据(最终往 fd 中写入的数据)需要二次提取,例如 lua 的 lightuserdata;
- RAWPOINTER,特指 lua userdata,当该类型的数据透传给网络库时,需要进行内存数据拷贝;
此外,在发包过程中,skynet 还做了一些优化,会优先在工作线程直接发送,若无法直发,则透传给网络线程发送。下图展示了发包过程中数据封装和流向:
poll 流程
在前面已经介绍过网络线程的主要逻辑,在这一节我将“庖丁解牛”般的拆解网络库的 event poll 流程,即socket_server_poll接口,对其进行梳理后,可以分为三个部分:内部处理、事件捕获、事件处理,以下是经过简化后的代码:
1 | int |
内部命令处理是指在进程内的服务与网络库之间的通讯,例如在 lua 服务中监听一个端口,或发起一个 TCP 连接。这些内部命令,通过网络库提供的管道传递到网络库,最终由网络线程执行命令。网络线程在捕获到事件后,会优先处理所有的内部命令(如果有内部消息的话),具体的命令处理流程可查阅 ctrl_cmd 函数,这里不再详述,需要注意该函数的返回值 type,当 type = -1 表示内部命令还不能返回确切的结果(例如命令’L’),或者这个内部命令不需要返回结果(例如命令’T’),当 type > -1 则表示该命令已经执行成功且能返回明确的结果给服务。
关于返回值 type 的宏定义如下:
1 |
此外,当 type 为SOCKET_CLOSE或SOCKET_ERR(向一个已关闭的fd发送数据)时,表示连接已经关闭,则需要回收这个 socket 。
事件捕获部分就较为简单,对于 epoll 就是 epoll_wait,对于 kqueue 就是 kenvet,唯一需要注意的点是 wait api 使用的无限期阻塞,即没有事件则一直阻塞。
事件处理主要负责网络连接的处理,包括对外的主动连接和外部的被动连接。有事件的 socket 依据其状态有不同的事件处理流程,如对于 CONNECTING 的 socket 连接,则会完成之前发起的主动连接请求并上报给服务;对于 LISTEN 的 socket,则接收新连接(未start)并上报给服务;对于其他已经建立好连接,则对其进行读、写以及错误处理。
内部命令
内部命令是指进程内服务发送给网络库有关网络操作的消息,这些消息经由管道透传到网络库,这里主要介绍各个内部命令的功能及其封装结构。
我们知道,在一个通道上要实现信息通信,那么在这个通道上流通的消息就需要遵循统一格式的消息封装。就像 skynet 中两个服务之间进行通讯,就需要把消息封装成 skynet_message; 再如当和 mysql 数据库进行交互时,就需要消息包遵循 mysql 数据包格式;同样,内部命令也会封装成统一的格式在管道上传递,其封装结构体定义如下:
1 | struct request_package { |
所有的命令内容都封装在一个大小为 256 字节的联合体 u 内,具体的消息长度由 char header[7] 控制,这就是为什么联合体需要一个 char buffer[256] 的字符数组;命令类型由 char header[6] 控制,一个大写字母代表一种内部命令。下面列出了目前支持的命令:
1 | S Start socket |
为什么会使用一个 char header[8] 作为头部,而只使用最后两个字节,这样做的原因是要考虑结构体的内存对齐问题。假如我们只用 char header[2] 来作为头部,那么会在往管道写入数据时,会因为内存对齐的原因,导致写入的命令头部和内容之间存在6个“未初始化”的字节。
下图展示了内部命令的内存结构以及管道的工作流程:
事件处理
网络线程在捕获到事件后,若是管道的事件,则走内部命令处理流程,而剩下的网络事件,则根据不同的 socket 状态(或类型)做不同的处理。
主动连接
其核心逻辑在 open_socket 函数中,这里我抠出关键部分的代码:
1 | // return -1 when connecting |
上面的代码是一个标准的非阻塞 connect,我们需要关注 connect(sock, …) api 的返回值 status,如果返回 0,则表示连接已经建立,这通常是在服务器和客户在同一台主机上时发生;如果返回 -1,则需要关注 errno,若 errno = EINPROGRESS,表示连接建立,建立启动但是尚未完成,则需要把这个 sock 注册到 epoll 中,并关注该描述符的可写事件,当 epoll/kqueue 捕获到可写事件时,且该 socket 的状态为 CONNECTING,就表示连接已经建立。
接收新连接
该事件处理主要为监听 fd 服务,当 listen_fd 收到新连接到来时,则生成一个新的 socket 实例,然后上报给 listen_fd 所绑定的那个服务,处理流程较为简单。可能需要注意的点是,新接收到的 socket 需要服务对其 start,才能把新连接注册到 epoll/kqueue 中。
消息读取
网络库读取网络消息的流程非常简单,它并没有提供组包功能(组包功能由上层自己实现,我会在下一篇文章深入探讨),其处理流程就是从有读事件的 socket 实例中的 fd 读取固定字节长度的数据,然后经过一系列的封装和转换,最终把这些读取到的数据封装成 skynet_message(具体的封装流程在本文消息的封装一节已经有详细的叙述)并插入到与该 socket 实例所绑定服务的消息队列中去。
关于 TCP 连接每次读取数据的容量(即socket结构体的p.size字段)大小有以下规则:每个 socket 的初始的读取容量为 64 字节,在接下来的读取中,若读取到的数据大小等于读取容量,则将读取容量扩大一倍;若读取到的数据大小小于容量的一半且容量大小大于初始容量,则将容量缩小一倍。其具体实现如下:
1 | // n=本次读取到的数据大小;sz=该socket当前的读取容量 |
关于 UDP 连接的数据读取则不需要关心读取容量的变化,接收到的 UDP 包数据会先暂存在 socket_server 的 udpbuffer 字段中,该字段是一个长度为 65535 的字节数组。
消息写入
网络库对于发送网络数据有一系列优化,其中主要有:工作线程直接写入(direct write)和 高低优先级写入队列。
在早前的版本,网络数据的收发都是在网络线程中处理的,后来云风做了优化,在写入数据时(服务发起网络写入请求,而服务又是由工作线程驱动),先尝试直接在工作线程发送,若能直接发送则在工作线程直接向 fd 写入数据,这里有个细节需要注意,若数据无法一次性全部写入,则需要把当前发送的数据整个 clone 并保存到 dw_buffer,同时记录下已经写入的数据大小到 dw_offset,这些未发送完的数据会在网络线程发送数时插入到高优先级队列中,所以不用担心数据丢失或发送乱序。关于直接发送的具体流程,可参考云风文章中的描述:
1 | 当每次要写数据时,先检查一下该 fd 中发送队列是否为空,如果为空的话,就尝试直接在当前工作线程发送(这往往是大多数情况)。发送成功就皆大欢喜,如果失败或部分发送,则把没发送的数据放在 socket 结构中,并开启 epoll 的可写事件。 |
一个优化是将 socket 的发送队列由一个队列拆分成高优先级、低优先级两个队列,消息要插入到哪个队列由上层自行控制。例如:游戏拍卖行的数据、排行榜数据就可以放入低优先级队列中,因为这些消息的实时性不需要那么高;而心跳包、战斗包就需要放入高优先级队列。
1 | /* |
reference
深入理解skynet —— 网络库(二) 详见:https://domicat.me/_posts/2020-05-21-learn-skynet-network2/