skynet服务

首先,要了解 skynet 内的一个非常重要的概念 ———『服务』。所谓的服务,本质就是一个接收消息然后处理消息的过程,它跟我们日常生活中所说的服务(例如:外卖服务、快递服务等等)是一样的,它不是一个真实可见的物体,而是一个抽象的逻辑过程,它需要一个执行者来执行并完成它。这其实就是skynet实现的Actor模型。

了解了“服务”后,现在,我们把上面的银行场景套用在 skynet 框架上,可以做出下面的转换关系:

服务 = 银行提供的业务(个人业务、对公业务…)
工作线程 = 办事窗口(负责处理客户业务需求并完成需求)
CPU 核心 = 银行柜员(需要在办事窗口上工作)
网络线程 = 大堂接待(也相当于一个办事窗口)
定时器线程 = 银行的排队叫号系统

有了这一层转换后,我们再转入到 skynet 的代码框架中,就清晰很多了。比如,对于工作线程的抢占执行,就类似银行柜员工作人员不足,导致有一些办事窗口没人工作,从而导致银行等待的人增多;在办理某一个业务时,可能会涉及到其他业务,就需要多个窗口沟通合作,这就类似 skynet 的服务间消息传递。

服务的本质

每个skynet服务都是一个lua state,也就是一个lua虚拟机实例。而且,每个服务都是隔离的,各自使用自己独立的内存空间,服务之间通过发消息来完成数据交换。

lua state本身没有多线程支持的,为了实现cpu的摊分,skynet实现上在一个线程运行多个lua state实例。而同一时间下,调度线程只运行一个服务实例。为了提高系统的并发性,skynet会启动一定数量的调度线程。同时,为了提高服务的并发性,就利用lua协程并发处理。

所以,skynet的并发性有3点:
1、多个调度线程并发
2、lua协程并发处理
3、服务调度的切换

skynet服务的设计基于Actor模型。有两个特点:

  1. 每个Actor依次处理收到的消息
  2. 不同的Actor可同时处理各自的消息
    实现上,cpu会按照一定规则分摊给每个Actor,每个Actor不会独占cpu,在处理一定数量消息后主动让出cpu,给其他进程处理消息。

skynet刚开始是单进程多线程的,它是由一个一个的服务组成的。在skynet上做开发,实际上就是在写服务。服务与服务之间通过消息队列进行通信。
一个服务,默认不会执行任何逻辑,需要别人向它发出请求时,才会执行对应的逻辑(定时器也是通过消息队列,告诉指定服务,要执行定时事件),并在需要时返回结果给请求者。请求者往往也是其他服务。服务间的请求、响应和推送,并不是直接调用对方的api来执行,而是通过一个消息队列,也就是说,不论是请求、回应还是推送,都需要通过这个消息队列转发到另一个服务中。skynet的消息队列,分为两级,一个全局消息队列,他包含一个头尾指针,分别指向两个隶属于指定服务的次级消息队列。skynet中的每一个服务,都有一个唯一的、专属的次级消息队列。

服务是怎么调用的

调用\lualib\skynet.lua 文件内的函数:

1
2
3
simpledb.lua: skynet.register “SIMPLEDB” 向skynet里注册一个服务 
agent.lua: skynet.call(“SIMPLEDB”, “text”, text) 调用相应的服务
main.lua: skynet.newservice(“simpledb”) 启动一个服务

以下是几个写服务时经常要用到的函数

newservice(name, …) 启动一个名为 name 的新服务。
uniqueservice(name, …) 启动一个唯一服务,如果服务该服务已经启动,则返回已启动的服务地址。
queryservice(name) 查询一个由 uniqueservice 启动的唯一服务的地址,若该服务尚未启动则等待。
localname(name) 返回同一进程内,用 register 注册的具名服务的地址。

newservice可以在一个进程里启动多个服务,这适用于无状态的服务。
uniqueservice则是类似于设计模式中的单件(singleton),这适用于需要唯一性的服务。举个例子,比如写日志,只想写一份。或者是全局共享的数据。

服务的实现

接下来,我会分成服务的创建、消息的处理以及服务的回收三个部分来深入服务模块。

服务创建流程

服务上下文与actor的关系密切,具体看skynet与actor文中的Actor参与者部分。

创建上下文

接下来看下服务上下文的创建和初始化流程,下图是一个服务上下文的详细创建过程:
a4knd-2myjp

整个流程都封装在 skynet_context_new 这个函数中,函数实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
struct skynet_context * 
skynet_context_new(const char * name, const char *param) {
struct skynet_module * mod = skynet_module_query(name);

if (mod == NULL)
return NULL;

void *inst = skynet_module_instance_create(mod);
if (inst == NULL)
return NULL;
struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
CHECKCALLING_INIT(ctx)

ctx->mod = mod;
ctx->instance = inst;
ctx->ref = 2; // 思考为什么是 2
ctx->cb = NULL;
ctx->cb_ud = NULL;
ctx->session_id = 0;
ctx->logfile = NULL;

ctx->init = false;
ctx->endless = false;

ctx->cpu_cost = 0;
ctx->cpu_start = 0;
ctx->message_count = 0;
ctx->profile = G_NODE.profile;
// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
ctx->handle = 0;
ctx->handle = skynet_handle_register(ctx);
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
// init function maybe use ctx->handle, so it must init at last
context_inc();

CHECKCALLING_BEGIN(ctx)
int r = skynet_module_instance_init(mod, inst, ctx, param);
CHECKCALLING_END(ctx)
if (r == 0) {
// 初始化成功
struct skynet_context * ret = skynet_context_release(ctx);
if (ret) {
ctx->init = true;
}
skynet_globalmq_push(queue);
if (ret) {
skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
}
return ret;
} else {
// 初始化失败,回收 ctx
skynet_error(ctx, "FAILED launch %s", name);
uint32_t handle = ctx->handle;
skynet_context_release(ctx);
skynet_handle_retire(handle);
struct drop_t d = { handle };
skynet_mq_release(queue, drop_message, &d);
return NULL;
}
}

此函数一共有三个地方调用,其中有两处是在 skynet 启动过程中(skynet_start)调用,这两个函数调用会创建两个重要的服务:

logger 服务,用来输出 skynet 的日志信息到文件(由配置字段 logger 指定)或标准输出(stdout),当然,我们可以重载 skynet 自带的 logger 服务,修改配置字段 logservice 即可。
bootstrap 服务,是一个 lua 服务,它负责引导基础服务,例如 launcher 服务等,它的作用类似于电脑开机时的引导程序一样。
前面两处调用都是用来初始化 skynet 进程所需的核心服务,而第三处调用是为了支持在已经启动的服务实例中能方便的启动其他服务,skynet 底层进一步对上面的流程进行封装,然后把封装好的接口暴露个上层使用,类似于 linux 内核暴露出的系统调用。
封装的服务启动接口原型如下:

1
2
static const char *
cmd_launch(struct skynet_context * context, const char * param);

这个接口接收一个 const char *param 的参数,它用来控制skynet 应该启动一个什么服务,以及服务启动时的参数,服务类型名称和服务参数用空格分隔。例如,传递参数是 “snlua launcher”,则表示启动一个 launcher 的 lua 服务(这个服务专门负责 lua 层的服务启动,是一个非常重要的服务)。

下图是 launcher 服务创建一个新服务的调用图:
awgiu-8r86o
创建好的 context 会被注册到服务仓库 handle_storage *H 中,如果注册成功,会给 context 分配一个 handle (即服务上下文的id,类似与文件描述符id),handle 是一个无符号 32 位整型,高 8 位用来表示 harbor_id,低 24 位则表示 context 的索引号 index,即:handle=(harbor_id<<24)|index。需要注意的是,这两部分值都需要大于 0,即:harbor_id>0 && index>01;然后创建服务的消息队列 queue,至此,服务已经创建完成,接下来便是服务的初始化过程。
服务初始化过程,其实是一个服务差异化的过程,每个创建好的服务实例通过传入参数的不同,会有不同的设置,这一部分会在接下来的汽车工厂的例子有更多详细的讲解。 然后设置服务的回调,最后把服务的消息队列 push 到全局消息队列中,以被工作线程消费、执行。

服务模板

所谓的服务模板,有点像一个工厂类,一个服务模板可以创建和初始化一个或多个服务实例,同时在创建服务实例时传递一些参数,对其进行一些定制操作。
套用现实生活场景中,一个汽车工厂可以生产很多辆汽车,每一辆生产出来的汽车就相当于一个服务实例,当然,工厂可以在生产汽车时,为了满足客户需求,提供一些可控的定制化服务,例如:车身颜色定制、配件增加等,而其中用来生产汽车的流水线就是一个服务模板。
在 skynet 中,这些模板是以动态库的形式存在,这些服务模板动态库编译后存放在 cservice 目录,目前,skynet 已有四个服务模板,它们分别是:

logger,日志服务模板,在上面已经有提到;
gate,网关服务模板,最新版的 skynet 已经有另外一套 lua gate 实现方案;
harbor,提供 master/slave 模式的集群方案;
snlua,所有的 lua 服务都由它负责。

当然,所有的服务模板都需要遵守一些约定,它们需要提供以下 4 个 api:

create,必须,此 api 用来创建对应的服务实例;
init,必须,此 api 用来对服务实例进行初始化,同时还会设置 context 的回调实例和回调函数;
release,必须,用来释放服务实例,做一些清理工作;
signal,可选,用来跳出 lua 服务的死循环2。

以上的这些接口定义可以在 skynet_module.h 头文件中找到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// api 原型
typedef void * (*skynet_dl_create)(void);
typedef int (*skynet_dl_init)(void * inst, struct skynet_context *, const char * parm);
typedef void (*skynet_dl_release)(void * inst);
typedef void (*skynet_dl_signal)(void * inst, int signal);

// 服务模板结构
struct skynet_module {
const char * name;
void * module;
skynet_dl_create create;
skynet_dl_init init;
skynet_dl_release release;
skynet_dl_signal signal;
};

消息队列

一个服务实例一定有一个服务消息队列(很多人称之为次级消息队列)与之对应,它的实现方式是一个环形队列。
65436345
消息队列的容量默认是 64,若容量不足则以 2 倍的方式进行增长,另外,当消息队列中堆积的消息过载,则每次达到 1024 的整数倍时,由监控线程发出警报。

消息处理流程

注意,先停一停,从 skynet 的代码世界中跳出来,先思考一个现实生活中快递站点的场景:
有一个快递配送点,每天会把要派送的包裹分成多堆存放在不同仓库或角落,每一堆表示这个站点配送范围内的一个片区,且每一堆包裹由一个快递小哥负责用三轮车进行配送,假如你是这个配送站的负责人,你当然是希望快递尽可能快速、合理的被配送,那应该采用什么方案呢?

最直接的办法是让一个快递员固定负责一个片区,但是如果负责的片区过多,需要的快递员就会增多,这样人力成本就会上升。改进的办法是让快递员不固定在一个片区内,而是在送完一个片区的包裹后,如果其他片区还没有人在负责配送,则动态分配到其他片区,不过这里还是会存在一个问题,如果某个快递员在派送其中一个片区的快递时,又有新的包裹添加进来,这时问题产生了:这个片区一直在进行包裹的配送,而其他片区因为快递员不足导致包裹滞留(即“线程饿死”)。

我们对上面的方案再次进行改进:每个片区按照先后顺序排列,每个快递员按照顺序取出片区队列中的第一个片区并派送固定数量的包裹,配送完一批包裹后,如果这个片区还有未配送的包裹,就把这个片区重新插入到片区队列尾部,以此循环配送,每次的配送数量可以按照一定的规则来决定,例如三轮车大一点、动力好一点就配送多一些包裹。以这样的方式进行配送,既能保证每个片区的包裹都能得到派送,不至于长时间滞留,又能节约人力成本。

好,现在再回到 skynet 中的消息处理流程,它其实就是采用了类似快递配送的最后的改良方案,我对其进行一次转换,就一目了然。

1
2
3
4
片区队列 = 全局消息队列
片区仓库 = 服务消息队列
配送数量 = 一次派发的消息数量
分配派送流程 = 工作线程

要注意的是:消息并不存放在全局消息队列中,类似于包裹是被堆积在不同的片区仓库内,只是这个片区被插入到片区队列内,真正的消息是存放在服务消息队列(或称次级消息队列)中;另外,关于派发消息数量,skynet 用一套权重 weight 规则来计算,后面会详细讲解

工作线程

skynet 有四类线程,其中只有工作线程创建多个,它由配置中的 thread 字段控制,如果不配置默认为 8。此线程负责的逻辑非常简单,就是从全局消息队列中 pop 出一个服务消息队列,然后派发一定数量的消息。它的大致流程如下图所示:
aqj54-9j2is
具体的代码实现在函数 skynet_context_message_dispatch 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
struct message_queue * 
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}

uint32_t handle = skynet_mq_handle(q);

struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
struct drop_t d = { handle };
skynet_mq_release(q, drop_message, &d);
return skynet_globalmq_pop();
}

int i,n=1;
struct skynet_message msg;

for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}

skynet_monitor_trigger(sm, msg.source , handle);

if (ctx->cb == NULL) {
skynet_free(msg.data);
} else {
dispatch_message(ctx, &msg);
}

skynet_monitor_trigger(sm, 0,0);
}

assert(q == ctx->queue);
struct message_queue *nq = skynet_globalmq_pop();
if (nq) {
// If global mq is not empty , push q back, and return next queue (nq)
// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
skynet_globalmq_push(q);
q = nq;
}
skynet_context_release(ctx);

return q;
}

虽然整体逻辑较为简单,不过在其实现过程中还是有几个点可以拿出来研究一番。
动态唤醒
首先要知道一个原则:全局消息队列内是存放的诸多不为空的服务消息队列,也就是说没有消息的服务是不会把它的消息队列 push 进全局队列中的3(除了服务创建时的第一次 push)。那我们思考一个问题:因为工作线程数量有限,而活跃的服务(有消息且在全局队列中)数量是不定的,当它们的比例关系 m(工作线程):n(活跃服务) 小于等于 1 时,表示每个线程都处于工作中;而如果这个比例大于 1 时,则表示工作线程有空余,套用上面快递员的例子,就是存在一些快递员没有包裹需要派送,处于休息状态。
此时,如果不对这些空闲线程做挂起操作,就会浪费 CPU 资源,进而浪费电(→_→)。skynet 采取的策略是在全局消息队列为空时,使用条件变量(pthread_cond_t)来挂起工作线程,而对于线程的唤醒,一共有两处地方:

  • 网络线程唤醒,采用“懒惰唤醒”,即只有在所有工作线程都挂起时,才会唤醒一个。举个栗子,如果有 8 个工作线程,其中 7 个都处于挂起状态,那么在网络线程收到网络消息后,也不会进行唤醒,除非 8 个工作线程都被挂起;
  • 定时器线程唤醒,采用“贪婪唤醒”,即只要有工作线程处于挂起状态,就会唤醒一次;另外在定时器线程结束后,还会广播唤醒所有挂起的工作线程;

均衡派发
均衡派发是指在处理多个服务的消息队列时,尽量做到“雨露均沾”,以此来解决上面提到的“线程饿死”的情况。skynet 会给每个工作线程一个权重值 weight,根据这个权重值计算出工作线程每次应该处理的消息数量 n,计算方式如下:

1
2
3
4
5
6
7
if weight > 0 then
n = 消息队列当前容量 >> weight
elseif weight == 0 then
n = 消息队列当前容量
elseif weight < 0 then
n = 1
end

也就是说,当 weight<0时,每次处理一个消息,当 weight==0 时,每次会处理完“当前队列”中的所有消息,而当 weight>0 时,每次处理“当前队列”容量的 1/(2^weight),权重值越大,每次处理的消息越少。注意:这里所说的“当前队列”容量是指在处理第一个消息时,该时刻服务消息队列的消息容量,本质上是一个“过去时”的值,这也是为什么“当前队列”要加引号的原因。

插入消息

在 skynet 中,服务之间传递的消息都被封装成统一的格式,不论是网络消息还是定时器消息,消息结构体如下:

1
2
3
4
5
6
struct skynet_message {
uint32_t source;//表示发出这个消息的服务 handle id,如果一个服务收到了一个 source=0 的消息,则表示这个消息是不是从一个服务实例中发出的,例如定时器消息和网络消息,或者发出这个消息的服务实例已经被销毁;
int session;//表示这个消息的序列号,类似 TCP 数据报(segment)的 SEQ,需要注意的是每一个请求包才需要生成一个新的 session,而返回包是不需要生成新的 session,只需要把请求包的 session 在返回时赋值给返回包即可,当然,需要标识这个包是一个返回包,接下来就会说到消息包的类型,一个消息包session的生成规则会涉及到 ctx 的 session_id 字段
void * data;它是一个指针,指向了消息携带数据真正的内存地址,可以为空指针,即没有消息数据,对于一个从 lua 服务中传递过来的消息数据(在 lua 中使用了 c.send),可以是 LUA_TSTRING 和 LUA_TLIGHTUSERDATA,前者必须做一次内存拷贝(原因请查阅 lua_tolstring 的说明文档),而后者则不需要。
size_t sz;//sz 字面意思是消息数据的长度,但其实该字段除了包含了数据的长度,还携带了另外一个信息,就是这个消息的类型,消息类型的定义可以在 skynet.h 头文件中找到,它用 sz 的高 8 位(1 byte)来表示,例如在 64 位系统下 sz = 消息类型<<56 | 数据长度。
};

上面已经对 skynet 的消息结构有了一个全方位的了解,对于如何把一个消息插入到目的服务的消息队列中,已经没有太多需要深入的细节。可能唯一需要注意的是,当一个消息插入到服务的消息队列中时,如果这个服务处于“非活跃”状态(即没有加入到全局消息队列),那么会将该服务重新触发为“活跃”状态,实现细节在 skynet_mq_push。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void 
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
assert(message);
SPIN_LOCK(q)

q->queue[q->tail] = *message;
if (++ q->tail >= q->cap) {
q->tail = 0;
}

if (q->head == q->tail) {
expand_queue(q);
}

if (q->in_global == 0) {
q->in_global = MQ_IN_GLOBAL;
skynet_globalmq_push(q);
}

SPIN_UNLOCK(q)
}

消息消费

消息处理的过程已经在前面讲解的差不多了,其过程也较为简单,即服务通过注册的回调函数来处理收到的消息,回调函数(skynet_cb)的定义可以在 skynet.h 中找到(在前面也已经提到),它接收 7 个参数:
context,表示处理该消息的服务上下文;
ud,表示真正处理该消息的服务实体指针(不一定是服务实例,也可以是服务实例内的其他元素,例如 snlua 服务实例的 lua 虚拟机指针);
type,消息的类型(上文已提到);
session,消息的序列号;
source,发生消息的源服务 handle id;
msg,消息数据的指针;
sz,消息数据的长度;
关于消息处理的一个原则是:一个消息的数据必须由最后处理该消息的服务进行回收处理。这是什么意思呢?下面通过一个例子来解释这条原则。
假如你的朋友小明给你送一盒橘子,你会有两种方式收到这一盒橘子并吃掉它们:

  • 小明直接亲自送到你手上
  • 小明用快递邮寄,通过快递员送到你的手上

不管通过哪一种方式,最终你会收到一盒橘子,然后吃掉它们,那吃完后剩下的橘子皮肯定是由你自己负责清扫,不可能让朋友小明或者快递员来给你处理(除非你想被打),在这一个流程中,你就是最后处理消息的人,你就需要负责最后的数据回收,快递员虽然也处理过这条消息,但是他是不能吃掉盒子内的橘子(即消息包中携带的数据)。
理解了上面的例子后,现在回头看看 skynet 的消息处理流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >> MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
if (ctx->logfile) {
skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
}
++ctx->message_count;

/*
reserve_msg : 回调函数的返回值
reserve_msg = 1,表示不能释放消息数据 msg->data,只是做转发处理,在 clusterproxy 中有使用,lua 接口为 skynet.forward_type()
reserve_msg = 0,表示需要释放消息数据 msg->data,也说明这次是消息的终点
*/
int reserve_msg;
if (ctx->profile) {
ctx->cpu_start = skynet_thread_time();
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
ctx->cpu_cost += cost_time;
} else {
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
}
if (!reserve_msg) {
skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}

这里要注意回调函数 ctx->cb 的返回值,当 reserve_msg = 1 时,表示不需要 free 消息数据,只是负责转发这个消息;当 reserve_msg = 0 时,在回调结束后,会 free 消息数据。而对于 snlua 服务来说,是否开启转发是由 api c.callback 的第二个参数决定的,例如 c.callback(func, true) 则表示开启转发(具体参见 skynet.forward_type)。

这里还涉及到了一个多线程的知识点:线程局部存储(Thread-Local Storage),用于在工作线程中获取当前正在处理的服务 handle:

1
2
3
4
5
6
7
8
9
10
uint32_t 
skynet_current_handle(void) {
if (G_NODE.init) {
void * handle = pthread_getspecific(G_NODE.handle_key);
return (uint32_t)(uintptr_t)handle;
} else {
uint32_t v = (uint32_t)(-THREAD_MAIN);
return v;
}
}

服务回收流程

服务的回收过程可以分为三个部分:服务实例的回收、服务消息队列的回收以及服务上下文(ctx)的回收,整个回收过程在 delete_context 函数中实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void 
delete_context(struct skynet_context *ctx) {
// 如果服务有日志文件,先关闭该日志文件
if (ctx->logfile) {
fclose(ctx->logfile);
}

// 回收服务实例
skynet_module_instance_release(ctx->mod, ctx->instance);

// 标记服务消息队列为“可回收(release)”
skynet_mq_mark_release(ctx->queue);
CHECKCALLING_DESTROY(ctx)

// 回收ctx
skynet_free(ctx);
context_dec();
}

需要指出的是,服务消息队列的回收方式稍微特殊一点,消息队列的内存并不会立即被 free 掉,还需要处理回收前遗留在队列中的消息,我会在接下来章节详细展开。

引用计数

skynet 使用引用计数的方式来决定是否销毁一个服务上下文,这有点类似 C++ 的智能指针,当 ctx->ref = 0 时,则会触发服务 ctx 的销毁流程。在代码中会发现 skynet_handle_grab 和 skynet_context_release 基本都是成对出现,前者引用一次 ctx(引用次数 +1),后者释放一次引用(引用次数 -1)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct skynet_context * 
skynet_handle_grab(uint32_t handle) {
struct handle_storage *s = H;
struct skynet_context * result = NULL;

rwlock_rlock(&s->lock);

uint32_t hash = handle & (s->slot_size-1);
struct skynet_context * ctx = s->slot[hash];
if (ctx && skynet_context_handle(ctx) == handle) {
result = ctx;
skynet_context_grab(result); // ctx 引用次数 +1
}

rwlock_runlock(&s->lock);
return result;
}

struct skynet_context *
skynet_context_release(struct skynet_context *ctx) {
if (ATOM_DEC(&ctx->ref) == 0) {
delete_context(ctx); // ctx 引用次数等于0,则触发回收流程
return NULL;
}
return ctx;
}

这里回顾一下前面提到的服务 ctx 创建过程,思考一下为什么初始的 ctx->ref 要设置为 2,为什么不是 0 或者 1 呢?
首先,若 ref 设置为 0,肯定是错误的,因为在 ctx 注册到服务仓库 handler_storage *H中时,ctx 其实就已经被引用一次了,也就是说创建并注册成功的 ctx,其引用计数至少是 1。那为什么不直接设置 ctx 的 ref 为 1,而要设置为 2 呢?原因是 skynet 需要在 ctx 初始化后再次确保其是否是真实可用状态,举个例子:一个新的 ctx 在工作线程 A 中被创建,在 ctx 执行初始化操作后,若它在工作线程 B 中意外被减少一次引用(例如被执行了 kill),此时就会在线程 A 中出现一个已经被回收的 ctx 被 push 到全局消息队列的情况。

回收消息队列

服务消息队列的回收过程与服务实例回收和服务 ctx 回收相比会稍微复杂一点,不仅仅需要回收消息队列的内存,还需要处理队列中遗留的消息,要给这些遗留的消息的发送服务一个错误反馈,这就像你从一家公司离职后,当有之前的老客户和你联系时,你需要告知别人“我已经离职了”,而不是没有任何反馈信息。

消息队列的回收过程分为两个步骤:

标记要回收的消息队列(skynet_mq_mark_release),并把它 push 到全局消息队列中;
工作线程执行 skynet_mq_release,使用指定的丢弃函数(drop_message)来处理队列中遗留的消息包;

回收函数 skynet_mq_release 和丢弃函数 drop_message 的函数定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void 
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {
SPIN_LOCK(q)

if (q->release) {
SPIN_UNLOCK(q)
// 循环pop剩余的消息,并drop掉
_drop_queue(q, drop_func, ud);
} else {
skynet_globalmq_push(q);
SPIN_UNLOCK(q)
}
}

static void
drop_message(struct skynet_message *msg, void *ud) {
struct drop_t *d = ud;
skynet_free(msg->data);
uint32_t source = d->handle;
assert(source);
// report error to the message source
skynet_send(NULL, source, msg->source, PTYPE_ERROR, 0, NULL, 0);
}

可以看到,整个回收过程就是一个把遗留的消息一个个 pop 出来,再由丢弃函数向消息源服务发送一个错误消息,最后释放掉消息队列的内存。

reference

深入理解skynet —— 服务 详见:https://domicat.me/_posts/2020-05-10-learn-skynet-service/