- Published on
ltask 笔记
- Authors
- Name
- Hanchin Hsieh
- @_yuchanns_
ltask 是一个用 Lua C API 编写的 lua 异步协程运行时
目前主要被用于 ejoy 的 ant 游戏引擎,集合了很多方面的使用
我比较感兴趣的是在网络方面对 ltask 的应用。虽然 wiki 中表示
高效处理网络连接不是 Ant 引擎的设计重点。
但是 Ant 引擎实现了一个简单的 WebServer 向我们揭示了用法
Ant 中的 WebServer 应用
通过源码阅读我发现 WebServer 主要包含以及存在这样的依赖关系
ant/pkg/ant.webserver -> ant/pkg/ant.net -> bee.lua/{lua_select, lua_socket}
其中 ltask 主要参与了 ant.webserver
和 ant.net
部分的运行时工作
就像许多其他语言对网络服务端的处理一样, 在 webserver 包中 ltask 起一个服务的流程简单说也是
创建一个监听给定地址的文件描述符 listen_fd ,然后在协程中循环等待 accept 这个 listen_fd 获取请求的 fd 接着通过
ltask.fork
创建一个协程来处理这个请求
local listen_fd = assert(net.listen(addr, port))
ltask.fork(function()
while true do
local fd = net.accept(listen_fd)
print("Accept", fd)
if fd then
ltask.fork(function()
-- 处理请求
end)
end
end)
首先 net 包封装了 bee.lua 的使用细节,包括:
listen
API- 使用
bee.socket
创建一个 tcp 协议的 **socket **文件描述符
- 指定
reuseaddr
并通过 socket 的bind
方法绑定到给定的 addr 和 port 中
- 然后使用 socket 的
listen
方法进行监听,并为这个 socket 分配一个全局 id 以一个 obj 表的形式保存到 listen_fds 表中
- 接着使用事件循环(这里是
bee.select
)注册一个读事件 (<u>SELECT_READ</u>) 回调
- 在事件唤醒后的回调中调用 socket 的
accept
方法,将拿到的 newfd 追加插入到 obj 中,并通过ltask.wakeup
方法唤醒协程
- 这里有一个我一开始无法理解的点就是
obj.co
是从哪里来的,为什么要唤醒,后面发现和接下来要说的accept
API 有关
- 将全局 id 返回,也就是上面创建流程中的 listen_fd
luafunction api.listen(addr, port) local fd, err = socket "tcp" if not fd then return false, err end fd:option("reuseaddr", 1) local ok, err = fd:bind(addr, port) if not ok then fd:close() return false, err end local ok, err = fd:listen() if not ok then fd:close() return false, err end local obj = { fd = fd } local id = fd_id; fd_id = fd_id + 1 listen_fds[id] = obj local function accept_fd() local newfd, err = fd:accept() if not newfd then selector:event_del(fd) fd:close() table.insert(obj, err) else table.insert(obj, newfd) if obj.co then wakeup(obj.co) obj.co = nil end end end selector:event_add(fd, SELECT_READ, accept_fd) return id end
- 使用
accept
API- 前面我们已经知道这个 API 是在协程循环中调用
- 它做的事情其实相当单纯,就是把自己所在的协程附加到
listen
API 创建的 listen_fd 对应 listen_fds 的 obj 上,然后使用ltask.wait
挂起让出 cpu
- 等
listen
API 中注册的读事件回调时,回调函数中的ltask.wakeup
唤醒的就是accept
被中断的协程
- 被唤醒后它通过 obj 取出回调事件插入的 newfd ,然后将 newfd 分配全局 id 和保存到 listen_fds 中,并事件循环中注册一个触发事件,根据事件类型唤醒相应的读(rd)和写(wt)协程。最后将全局 id 返回
- 也就是说,accept 并不实际调用 socket 的 accept 方法,它只是将自身协程挂起暂停,等待事件循环通过 ltask 唤醒后继续向下执行
lualocal function new_fd(fd) local id = fd_id; fd_id = fd_id + 1 local obj = { fd = fd, flag = 0, update = nil, } function obj.update(event) if event & SELECT_READ then wakeup(obj.rd) obj.rd = nil elseif event & SELECT_WRITE then wakeup(obj.wt) obj.wt = nil end obj.flag = obj.flag & ~event selector:event_mod(obj.fd, obj.flag) end selector:event_add(fd, 0, obj.update) fds[id] = obj return id end local function get_fd(obj) local fd = table.remove(obj, 1) if type(fd) == "string" then return false, fd end return new_fd(fd) end
recv
API- 尝试读取,没有就绪则注册一个读事件
- 然后同样用
ltask.wait
将自身协程挂起,等待accept
中注册的触发事件唤醒
- 不同的地方在于数据的读取确实是在该 API 被唤醒后的执行流程中进行的
- 将读取的内容返回
luafunction api.recv(fd) local obj = assert(fds[fd]) local content, err = obj.fd:recv() if content == false then -- block assert(obj.rd == nil) obj.rd = coroutine.running() add_event(obj, SELECT_READ) local rd = obj.rd ltask.wait(rd) content, err = obj.fd:recv() end return content, err end local function add_event(obj, flag) obj.flag = obj.flag | flag selector:event_mod(obj.fd, obj.flag) end
send
API- 检查连接是否被关闭
- 尝试写入,没有就绪则注册一个写事件
- 然后同样用
ltask
将自身协程挂起,等待accept
中注册的触发事件唤醒
- 数据的写入也在该 API 被唤醒后进行
- 返回截断后的内容
lualocal function send(fd, content) local obj = fds[fd] if not obj then return nil, "Closed" end local n, err = obj.fd:send(content) if n == false then -- block assert(obj.wt == nil) obj.wt = coroutine.running() add_event(obj, SELECT_WRITE) ltask.wait(obj.wt) n, err = obj.fd.send(content) end if not n then if n == false then n = 0 else return nil, err end end return content:sub(n+1) end function api.send(fd, content) local err repeat content, err = send(fd, content) until not content or content == "" return err end
- 其余略过不提(...
connect
,close
)
当然 ant 并不是直接调用 net 的 API 而实际上是通过 io 服务进行的操作(可在 ant.net/main.lua
确认),不过这并不是我目前关心的
那么为什么说 ant 没有设计高效处理网络的功能呢?经过进一步的深究,我们可以发现最底层所依赖的事件循环 bee.select
的实现只是一个简单的 select poll
(对外暴露 wait
, close
, event_{add, mod, del}
)
static int wait(lua_State* L) {
auto& ctx = lua::checkudata<select_ctx>(L, 1);
lua_Number timeo = luaL_optnumber(L, 2, -1);
if (ctx.readset.empty() && ctx.writeset.empty()) {
if (timeo < 0) {
return luaL_error(L, "no open sockets to check and no timeout set");
}
else {
thread_sleep(static_cast<int>(timeo * 1000));
lua_getiuservalue(L, 1, 4);
return 1;
}
}
struct timeval timeout, *timeop = &timeout;
if (timeo < 0) {
timeop = NULL;
}
else {
timeout.tv_sec = (long)timeo;
timeout.tv_usec = (long)((timeo - timeout.tv_sec) * 1000000);
}
ctx.i = 1;
ctx.maxfd = 0;
FD_ZERO(&ctx.readfds);
for (auto fd : ctx.readset) {
FD_SET(fd, &ctx.readfds);
ctx.maxfd = std::max(ctx.maxfd, fd);
}
FD_ZERO(&ctx.writefds);
for (auto fd : ctx.writeset) {
FD_SET(fd, &ctx.writefds);
ctx.maxfd = std::max(ctx.maxfd, fd);
}
int ok;
if (timeop == NULL) {
do
ok = ::select(ctx.maxfd + 1, &ctx.readfds, &ctx.writefds, NULL, NULL);
while (ok == -1 && errno == EINTR);
}
else {
ok = ::select(ctx.maxfd + 1, &ctx.readfds, &ctx.writefds, NULL, timeop);
if (ok == -1 && errno == EINTR) {
ok = 0;
}
}
if (ok < 0) {
push_neterror(L, "select");
return lua_error(L);
}
lua_getiuservalue(L, 1, 3);
return 1;
}
其中并没有使用常见的 epoll/kqeue 等高性能事件循环,因此要直接用作处理海量请求的 webserver 还有欠缺
但是我们可以先简单写一个 demo 玩具体验一下具体的使用
阶段性总结
这里简单绘制一张图表明 ltask 和 bee.{select, socket} 的协作关系
这将有助于接下来写一个独立的 demo 玩具
ltask 的调度流程
简易 demo
由于仓库本身说明极少,我只能自己摸索出一个使用 ltask 的最小 demo
怎么得出这个最小 demo 的过程我就懒得赘述了,总之就是通过不断删除代码测试出来的
--- main.lua
local ltask = require("ltask")
local boot = require("ltask.bootstrap")
local service_path = "./?.lua"
boot.init({})
local SERVICE_ROOT = 1
local MESSSAGE_SYSTEM = 0
boot.new_service("root", "@" .. package.searchpath("service", service_path), SERVICE_ROOT)
local init_msg, sz = ltask.pack("init", {
name = "root",
})
boot.post_message({
from = SERVICE_ROOT,
to = SERVICE_ROOT,
session = 0, -- 0 for root init
type = MESSSAGE_SYSTEM,
message = init_msg,
size = sz,
})
print("ltask Start")
boot.run()
--- service.lua
local i = 0
while true do
print("service" .. i)
i = i + 1
coroutine.yield()
if i > 5 then
return
end
end
该 demo 在执行 lua main.lua
命令后打印如下
ltask Start
service0
service1
service2
service3
service4
service5
假如你将 boot.post_message
这一部分代码注释掉会发现 demo 卡住
于是我对它的调度原理感到十分好奇,下面尝试分析源码总结出它的调度流程
调度流程解析
在上面的 demo 中,除开初始化相关的代码,最核心的动作其实就只有三行
boot.new_service(...)
boot.post_message(...)
boot.run()
对此我们可以结合 ltask 仓库的 c 源码研究这三个方法做了什么事情
通过全局搜索可以找到这三个方法在 c 源码中具有如下对应
luaL_Reg l[] = {
// ...
{ "run", ltask_run },
{ "post_message", lpost_message },
{ "new_service", ltask_newservice },
// ...
};
ltask_newservice
创建一个新的服务,根据仓库的描述
Each lua service (an indepentent lua VM) works in request/response mode, they use message channels to inter-communicate.
可知该方法就是创建一个 lua 虚拟机
下面我们具体看方法体的实现
static int
ltask_newservice(lua_State *L) {
struct ltask *task = (struct ltask *)get_ptr(L, "LTASK_GLOBAL");
const char *label = luaL_checkstring(L, 1);
const char *filename_source = luaL_checkstring(L, 2);
unsigned int sid = luaL_optinteger(L, 3, 0);
service_id id = service_new(task->services, sid);
newservice(L, task, id, label, filename_source, NULL);
lua_pushinteger(L, id.id);
return 1;
}
其中 service_new
的功能是为新的服务分配内存空间和绑定通讯地址(即 id),这个阶段服务状态还是未初始化状态(SERVICE_STATUS_UNINITIALIZED)
其中 id 为 1 的是特殊的根服务
然后使用 newservice
对服务进行初始化,实现如下
static void
newservice(lua_State *L, struct ltask *task, service_id id, const char *label, const char *filename_source, struct preload_thread *preinit) {
struct service_ud ud;
ud.task = task;
ud.id = id;
struct service_pool *S = task->services;
struct service *preS = NULL;
if (preinit) {
atomic_int_store(&preinit->term, 1);
thread_wait(preinit->thread);
preS = preinit->service;
free(preinit);
}
if (service_init(S, id, (void *)&ud, sizeof(ud), L, preS) || service_requiref(S, id, "ltask", luaopen_ltask, L)) {
service_delete(S, id);
luaL_error(L, "New service fail : %s", get_error_message(L));
return;
}
if (service_setlabel(task->services, id, label)) {
service_delete(S, id);
luaL_error(L, "set label fail");
return;
}
if (filename_source) {
const char * err = NULL;
if (filename_source[0] == '@') {
err = service_loadfile(S, id, filename_source+1);
} else {
err = service_loadstring(S, id, filename_source);
}
if (err) {
lua_pushstring(L, err);
service_delete(S, id);
lua_error(L);
}
}
}
大体逻辑是初始化服务,然后将传入的 lua 代码文件(filename_source)加载到服务的虚拟机上
- 其中方法
service_init
用于创建新的虚拟机实例
- 加载代码文件的方法
service_loadfile
和service_loadstring
将服务状态设置为空闲状态(SERVICE_STATUS_IDLE)
至此,服务的创建告一段落
lpost_message
最初我认为初始化服务完成后应该直接跳到 ltask_run
阅读具体的调度逻辑,结果发现如果没有对空闲状态的服务发送消息,调度器不会调度该服务,其实可以从仓库的描述中窥见端倪
Each lua service (an indepentent lua VM) works in request/response mode, they use message channels to inter-communicate.
static int
lpost_message(lua_State *L) {
luaL_checktype(L, 1, LUA_TTABLE);
struct message msg;
msg.from.id = checkfield(L, 1, "from");
msg.to.id = checkfield(L, 1, "to");
msg.session = (session_t)checkfield(L, 1, "session");
msg.type = checkfield(L, 1, "type");
int t = lua_getfield(L, 1, "message");
if (t == LUA_TNIL) {
msg.msg = NULL;
msg.sz = 0;
} else {
if (t != LUA_TLIGHTUSERDATA) {
return luaL_error(L, ".message should be a pointer");
}
msg.msg = lua_touserdata(L, -1);
lua_pop(L, 1);
msg.sz = checkfield(L, 1, "size");
}
struct ltask *task = (struct ltask *)get_ptr(L, "LTASK_GLOBAL");
struct message * m = message_new(&msg);
if (service_push_message(task->services, msg.to, m)) {
message_delete(m);
return luaL_error(L, "push message failed");
}
check_message_to(task, msg.to);
return 0;
}
该方法首先对消息的格式进行校验,然后装填到一个 message
结构中,使用 service_push_message
将消息追加到对应服务实例的消息队列中,后续 lua 服务代码可以通过 recv_message
API 获取到该消息
我们重点看 check_message_to
这个方法
static void
check_message_to(struct ltask *task, service_id to) {
struct service_pool *P = task->services;
int status = service_status_get(P, to);
if (status == SERVICE_STATUS_IDLE) {
debug_printf(task->logger, "Service %x is in schedule", to.id);
service_status_set(P, to, SERVICE_STATUS_SCHEDULE);
schedule_back(task, to);
} else if (status == SERVICE_STATUS_EXCLUSIVE) {
debug_printf(task->logger, "Message to exclusive service %d", to.id);
int ethread = service_thread_id(task->services, to);
struct exclusive_thread *thr = get_exclusive_thread(task, ethread);
assert(thr);
sockevent_trigger(&thr->event);
}
}
这里我们只关注非独占的普通服务,即处于空闲状态的服务,它会被转变成调度状态(SERVICE_STATUS_SCHEDULE),然后通过 schedule_back
方法将服务推到 ltask 的调度队列中
static inline void
schedule_back(struct ltask *task, service_id id) {
int r = queue_push_int(task->schedule, (int)id.id);
// Must succ because task->schedule is large enough.
assert(r == 0);
}
这里就解释了为什么不对服务发送消息不会触发调度,因为只有发送消息才会调用 schedule_back
将服务加入到调度队列中
ltask_run
从这里开始,调度器将会进入工作状态。该方法按照配置实例化了多个线程,分别负责独占服务和普通服务的工作,并挂起等待线程执行完成或者信号打断从而终止
这里比较有趣的一个设计就是任何线程都可以承担调度器的工作,前提是抢夺到调度权
在这些线程中,我们主要关心负责普通服务的方法 thread_worker
static int
ltask_run(lua_State *L) {
int logthread = 0;
struct ltask *task = (struct ltask *)get_ptr(L, "LTASK_GLOBAL");
int ecount = exclusive_count(task);
int threads_count = task->config->worker + ecount + logthread;
struct thread * t = (struct thread *)lua_newuserdatauv(L, threads_count * sizeof(struct thread), 0);
int i;
for (i=0;i<ecount;i++) {
t[i].func = thread_exclusive;
t[i].ud = (void *)&task->exclusives[i];
}
for (i=0;i<task->config->worker;i++) {
t[ecount + i].func = thread_worker;
t[ecount + i].ud = (void *)&task->workers[i];
}
atomic_int_store(&task->thread_count, threads_count-logthread);
if (logthread) {
t[threads_count-1].func = thread_logger;
t[threads_count-1].ud = (void *)task;
}
sig_init();
thread_join(t, threads_count);
if (!logthread) {
close_logger(task);
}
logqueue_delete(task->lqueue);
return 0;
}
该方法实现较长,是一个循环,总体分为两个分支。
static void
thread_worker(void *ud) {
struct worker_thread * w = (struct worker_thread *)ud;
struct service_pool * P = w->task->services;
worker_timelog_init(w);
atomic_int_inc(&w->task->active_worker);
thread_setnamef("ltask!worker-%02d", w->worker_id);
int thread_id = THREAD_WORKER(w->worker_id);
sig_register(crash_log_worker, w);
for (;;) {
if (w->term_signal) {
// quit
break;
}
service_id id = worker_get_job(w);
if (id.id) {
// 第一个分支,获取到就绪的服务
} else {
// 第二个分支,未获取到就绪的服务
int nojob = 1;
if (!acquire_scheduler(w)) {
nojob = schedule_dispatch_worker(w);
release_scheduler(w);
}
if (nojob) {
// go to sleep
atomic_int_dec(&w->task->active_worker);
debug_printf(w->logger, "Sleeping (%d)", atomic_int_load(&w->task->active_worker));
worker_timelog(w, -1);
worker_sleep(w);
worker_timelog(w, -1);
atomic_int_inc(&w->task->active_worker);
debug_printf(w->logger, "Wakeup");
}
}
}
worker_quit(w);
atomic_int_dec(&w->task->thread_count);
debug_printf(w->logger, "Quit");
}
线程首先通过 worker_get_job
检查自己的 service_ready
slot 是否包含就绪的服务,由于刚开始工作,线程上当然是找不到就绪的服务,于是它会走第二个分支。
第二个分支是通过 acquire_scheduler
尝试获取调度权。获取调度权的过程是一个 CAS(CompareAndSwap),即指令级别的原子交换行为,可以确保只有始终只有一个线程可以抢夺到调度权
在成功获取到调度权后,线程通过 schedule_dispatch
(其中的第三部分逻辑)读取调度队列里的服务(注意这里呼应了前面提到的只有对服务发送了消息,服务才会被推入调度队列中的设计),并通过 worker_assign_job
将服务分配到线程的 service_ready
slot 上。
- 如果无法分配到任何线程上,则重新放入调度队列
- 如果没有任何服务,则终止分配行为
- 如果服务较少,可能存在部分线程没有被分配到服务
- 被分配到服务的线程会被标记为唤醒
最终的效果是尽量实现服务:线程的1:1分配。
static int
schedule_dispatch_worker(struct worker_thread *worker) {
schedule_dispatch(worker->task);
if (!worker_has_job(worker)) {
service_id job = steal_job(worker);
if (job.id) {
debug_printf(worker->logger, "Steal service %x", job.id);
worker_assign_job(worker, job);
} else {
return 1;
}
}
return 0;
}
static int
schedule_dispatch(struct ltask *task) {
// Step 3: Assign task to workers
int assign_job = 0;
int job = 0;
for (i=0;i<task->config->worker;i++) {
if (job == 0) {
job = queue_pop_int(task->schedule);
if (job == 0) {
// No job in the queue
break;
}
}
service_id id = { job };
if (!worker_assign_job(&task->workers[i], id)) {
debug_printf(task->logger, "Assign %x to worker %d", id.id, i);
worker_wakeup(&task->workers[i]);
++assign_job;
job = 0;
}
}
if (job != 0) {
// Push unassigned job back
queue_push_int(task->schedule, job);
} else {
wakeup_sleeping_workers(task, assign_job);
}
return assign_job;
}
static inline int
worker_assign_job(struct worker_thread *worker, service_id id) {
if (atomic_int_load(&worker->service_ready) == 0) {
// only one producer (Woker) except itself (worker_steal_job), so don't need use CAS to set
atomic_int_store(&worker->service_ready, id.id);
return 0;
} else {
// Already has a job
return 1;
}
}
接着线程再次检查自己是否被分配到了服务,如果没有,则通过 steal_job
窃取其他线程 slot 上未被处理的服务。窃取的过程也是一个 CAS
无论当前线程最终是否成功为自己分配到服务,都会将调度权释放
如果没有分配到服务,线程被标记为睡眠
进入下一轮循环,这一次线程检查到自己的 service_ready
slot 中包含就绪的服务,进入第一个分支
第一个分支首先检查服务的状态是否为调度状态,并将其标记为运行状态(SERVICE_STATUS_RUNNING),然后使用 service_resume
方法恢复服务中的 lua 代码的执行流程(假如我们在服务代码中使用了 coroutine.yield
让出 cpu)
static void
thread_worker(void *ud) {
struct worker_thread * w = (struct worker_thread *)ud;
struct service_pool * P = w->task->services;
worker_timelog_init(w);
atomic_int_inc(&w->task->active_worker);
thread_setnamef("ltask!worker-%02d", w->worker_id);
int thread_id = THREAD_WORKER(w->worker_id);
sig_register(crash_log_worker, w);
for (;;) {
if (w->term_signal) {
// quit
break;
}
service_id id = worker_get_job(w);
if (id.id) {
// 第一个分支
w->running = id;
int status = service_status_get(P, id);
if (status != SERVICE_STATUS_DEAD) {
assert(status == SERVICE_STATUS_SCHEDULE);
debug_printf(w->logger, "Run service %x", id.id);
service_status_set(P, id, SERVICE_STATUS_RUNNING);
worker_timelog(w, id.id);
if (service_resume(P, id, thread_id)) {
worker_timelog(w, id.id);
debug_printf(w->logger, "Service %x quit", id.id);
service_status_set(P, id, SERVICE_STATUS_DEAD);
if (id.id == SERVICE_ID_ROOT) {
debug_printf(w->logger, "Root quit");
// root quit, wakeup others
quit_all_workers(w->task);
quit_all_exclusives(w->task);
wakeup_all_workers(w->task);
break;
} else {
service_send_signal(P, id);
}
} else {
worker_timelog(w, id.id);
service_status_set(P, id, SERVICE_STATUS_DONE);
}
} else {
debug_printf(w->logger, "Service %x is dead", id.id);
}
while (worker_complete_job(w)) {
// Can't complete (running -> done)
if (!acquire_scheduler(w)) {
if (worker_complete_job(w)) {
// Do it self
schedule_dispatch(w->task);
while (worker_complete_job(w)) {} // CAS may fail spuriously
}
schedule_dispatch_worker(w);
release_scheduler(w);
break;
}
}
} else {
// 第二个分支
}
}
worker_quit(w);
atomic_int_dec(&w->task->thread_count);
debug_printf(w->logger, "Quit");
}
int
service_resume(struct service_pool *p, service_id id, int thread_id) {
struct service *S= get_service(p, id);
if (S == NULL)
return 1;
S->thread_id = thread_id;
lua_State *L = S->L;
if (L == NULL)
return 1;
int nresults = 0;
uint64_t start = systime_thread();
S->clock = start;
int r = lua_resume(L, NULL, 0, &nresults);
uint64_t end = systime_thread();
S->cpucost += end - start;
if (r == LUA_YIELD) {
lua_pop(L, nresults);
return 0;
}
if (r == LUA_OK) {
return 1;
}
if (!lua_checkstack(L, LUA_MINSTACK)) {
lua_writestringerror("%s\n", lua_tostring(L, -1));
lua_pop(L, 1);
return 1;
}
lua_pushfstring(L, "Service %d error: %s", id.id, lua_tostring(L, -1));
luaL_traceback(L, L, lua_tostring(L, -1), 0);
lua_writestringerror("%s\n", lua_tostring(L, -1));
lua_pop(L, 3);
return 1;
}
service_resume
实际上是通过 lua_resume
方法唤醒服务中的 lua 虚拟机执行,并通过返回值来判断执行结果是服务执行结束还是协程再次主动让出 cpu
如果服务执行结果是协程让出,则标记为执行完成,并由 schedule_dispatch
方法的第一部分逻辑收集服务完成的信息,第二部分逻辑决定服务状态标记为空闲或者调度(如果接收到消息)
static int
schedule_dispatch(struct ltask *task) {
// Step 1 : Collect service_done
int done_job_n = 0;
service_id done_job[MAX_WORKER];
int i;
for (i=0;i<task->config->worker;i++) {
service_id job = worker_done_job(&task->workers[i]);
if (job.id) {
debug_printf(task->logger, "Service %x is done", job.id);
done_job[done_job_n++] = job;
}
}
// Step 2: Dispatch out message by service_done
struct service_pool *P = task->services;
for (i=0;i<done_job_n;i++) {
service_id id = done_job[i];
int status = service_status_get(P, id);
if (status == SERVICE_STATUS_DEAD) {
struct message *msg = service_message_out(P, id);
assert(msg && msg->to.id == SERVICE_ID_ROOT && msg->type == MESSAGE_SIGNAL);
switch (service_push_message(P, msg->to, msg)) {
case 0 :
// succ
debug_printf(task->logger, "Signal %x dead to root", id.id);
check_message_to(task, msg->to);
break;
case 1 :
debug_printf(task->logger, "Root service is blocked, Service %x tries to signal it later", id.id);
schedule_back(task, id);
break;
default:
debug_printf(task->logger, "Root service is missing");
service_delete(P, id);
break;
}
} else {
struct message *msg = service_message_out(P, id);
if (msg) {
dispatch_out_message(task, id, msg);
}
assert(status == SERVICE_STATUS_DONE);
if (!service_has_message(P, id)) {
debug_printf(task->logger, "Service %x is idle", id.id);
service_status_set(P, id, SERVICE_STATUS_IDLE);
} else {
debug_printf(task->logger, "Service %x back to schedule", id.id);
service_status_set(P, id, SERVICE_STATUS_SCHEDULE);
schedule_back(task, id);
}
}
}
}
如果服务的执行结果是执行结束,这里有一个特别的判断
- 当服务的通讯地址是 1 也就是根服务时,所有线程(包括独占线程)都会被标记为终止和唤醒,等到线程再次执行时就会退出
- 如果是其他服务,则会发送一个消息给根服务。并等到
schedule_dispatch
方法的第二部分逻辑收集和发送此消息
阶段性总结
通过上面的调度流程,我们可以得出 ltask 的运行特点
- 消息驱动调度
- 调度能力模块化,多线程竞争调度权
- 空闲线程窃取繁忙线程提高处理效率
- 根服务决定运行时存活状态
实现一个基于 epoll 的 lua 事件循环
由于个人对 Rust 的偏好,我决定使用 mlua + mio 实现一个基于高性能实现的事件循环模块