消息
服务内部采用消息通信,通常是传递一个message指针,这样比进程间通信效率高得多,底层消息结构:
class message
{
uint8_t type;
uint32_t sender;
uint32_t receiver;
int64_t session = 0;
std::shared_ptr<buffer> data;
}
type
消息类型用于区分不同来源的消息,定制不同的处理逻辑, 内置的消息类型有:
moon.PTYPE_SYSTEM = 1
moon.PTYPE_TEXT = 2
moon.PTYPE_LUA = 3
moon.PTYPE_ERROR = 4
moon.PTYPE_DEBUG = 5
moon.PTYPE_SHUTDOWN = 6
moon.PTYPE_TIMER = 7
moon.PTYPE_SOCKET_TCP = 8
moon.PTYPE_SOCKET_UDP = 9
moon.PTYPE_SOCKET_WS = 10
moon.PTYPE_SOCKET_MOON = 11
moon.PTYPE_INTEGER = 12
-
PTYPE_LUA
类型的消息,采用了一种lua
对象序列化的方式,最常用的消息类型。
-
sender
消息的发送者 -
- 对于
PTYPE_LUA
类型的消息sender
表示发送者的服务ID
- 对于
-
- 对于
PTYPE_SOCKET_*
类型的消息sender
表示socket fd
,
- 对于
-
- 对于
PTYPE_TIMER
类型的消息sender
表示timerid
,
- 对于
-
- 其它类型的消息, 是为底层框架服务的, 对于使用者基本不需要关系
-
receiver
消息的接受者,主要用于服务间通信,表示接收者服务id -
sessionid
用于请求回应模式的lua协程封装, 有时我们发送一条消息,并希望得到消息的处理结果:发送消息时附带一个sessionid,并绑定一个lua协程,对方收到后把sessionid发送回来,触发协程resume
。如下面代码,假设other_service_id
所代表的服务提供了func_add
函数:
moon.async(function()
local res,err = moon.call("lua",other_service_id,"func_add",1,2)
if not res then
print(err)
else
print(res)--output 3
end
end)
data
根据不同type
消息类型编码后的数据。使用共享智能指针是为了发送数据给多个目标时减少拷贝。
消息类型注册
信息
内置的消息类型都已经自动注册过, 正常情况下无需再次注册
注册消息类型的API:
moon.register_protocol(t)
- 示例: 注册
PTYPE_LUA
类型的消息
moon.register_protocol {
name = "lua", --关联字符串名字,方便记忆
PTYPE = moon.PTYPE_LUA,
pack = moon.pack, --消息编码函数
unpack = moon.unpack, --消息解码函数
dispatch = function() --消息处理函数, 如果要处理该类型的消息,逻辑层一般需要调用 moon.dispatch 自定义消息处理函数
error("PTYPE_LUA dispatch not implemented")
end
}
- 注册自定义的消息类型
local PTYPE_CLIENT = 100 --建议大于100(1-255), 避免与内置定义冲突
moon.register_protocol({
name = "client",
PTYPE = PTYPE_CLIENT,
pack = function(...) --可选,发送消息时会调用(moon.send, moon.call)
-- body
end,
unpack = function(sz, len) --可选,收到消息时会调用
-- body
end,
dispatch = function(sender, session, ...)
end
})
---如果需要自定义解析,需要设置israw = true, 这样就可以得到message指针, 常用于性能相关场合
moon.register_protocol({
name = "client",
PTYPE = PTYPE_CLIENT,
israw = true,
dispatch = function(msg)
---获取message相关信息
---'S' message:sender()
---
---'R' message:receiver()
---
---'E' message:sessionid()
---
---'Z' message:bytes()
---
---'N' message:size()
---
---'B' message:buffer()
---
---'C' C-Pointer + size
---根据需求获取
print(moon.decode(msg,"C"))
end
})
---或者使用moon.raw_dispatch重写已经注册的解析协议
moon.raw_dispatch("lua", function (m)
---根据需求获取
print(moon.decode(msg,"C"))
end)
定义消息处理函数
moon推荐使用命令模式(第一个参数表示函数名字)处理服务间的消息通信。每个服务可以注册多个消息处理函数来响应不同类型的消息。
基础用法
---设置指定协议类型的消息处理函数
---@param PTYPE string 协议类型
---@param fn fun(sender:integer, session:integer, ...) 处理函数
moon.dispatch(PTYPE, fn)
命令模式实现
- 定义
PTYPE_LUA
消息类型的处理函数
-- 定义命令处理表
local command = {}
-- 注册命令处理函数
command.HELLO = function()
return "world"
end
command.ADD = function(a, b)
return a + b
end
command.ECHO = function(msg)
return msg
end
-- 注册消息分发器
moon.dispatch("lua", function(sender, session, cmd, ...)
local f = command[cmd]
if f then
local res = {xpcall(f, debug.traceback, ...)}
if res[1] then
moon.response("lua", sender, session, table.unpack(res,2))
else
error(string.format("Execute command %s failed: %s", cmd, res[2]))
end
else
error(string.format("Unknown command %s", tostring(cmd)))
end
end)
--- 其他服务调用注册的函数
-- 异步调用
moon.async(function()
-- 简单调用
local result = moon.call("lua", service_id, "HELLO")
print(result) -- 输出: world
-- 带参数调用
local sum = moon.call("lua", service_id, "ADD", 1, 2)
print(sum) -- 输出: 3
-- 发送消息(不需要返回值)
moon.send("lua", service_id, "ECHO", "no response needed")
end)
用于发送消息的API
moon.send
向服务发送消息
---
---向指定服务发送消息,消息内容会根据`PTYPE`类型调用对应的`pack`函数。
---@param PTYPE string @protocol type. e. "lua"
---@param receiver integer @receiver's service id
moon.send(PTYPE, receiver, ...)
moon.raw_send
向服务发送原始消息
---向指定服务发送消息, 不会调用对应的`pack`函数。
---@param PTYPE string @协议类型
---@param receiver integer @接收者服务id
---@param data? string|buffer_ptr @消息内容
---@param sessionid? integer
moon.raw_send(PTYPE, receiver, data, sessionid)
moon.call
向某个服务发起调用并获得返回值
--- 向目标服务发送消息, 然后等待返回值, 接收方必须调用`moon.response`返回结果
--- - 如果请求成功, 返回值为`moon.response(id, response, params...)`中`params`部分。
--- - 如果请求失败, 返回false和错误消息字符串
---@async
---@param PTYPE string @protocol type
---@param receiver integer @receiver service's id
---@return ...
---@nodiscard
moon.call(PTYPE, receiver, ...)
moon.response
响应moon.call
调用
--- 用来响应moon.call的请求, 如果sessionid==0,该调用不会产生任何影响
---@param PTYPE string @protocol type
---@param receiver integer @receiver service's id
---@param sessionid integer
moon.response(PTYPE, receiver, sessionid, ...)
消息的发送和接收
Moon提供两种消息通信模式:回调式和协程式。
回调式通信
完整示例:
-- filepath: sender.lua
local moon = require("moon")
local command = {}
-- 注册回调处理
command.ADDRESULT = function(sender, result)
print("Got result:", result)
end
moon.dispatch("lua", function(sender, session, cmd, ...)
local f = command[cmd]
if f then
f(sender, ...)
end
end)
-- 发送计算请求
moon.send('lua', service_id, "ADD", 1, 2)
-- filepath: receiver.lua
local moon = require("moon")
local command = {}
command.ADD = function(sender, a, b)
local result = a + b
-- 发送计算结果
moon.send('lua', sender, 'ADDRESULT', result)
end
moon.dispatch("lua", function(sender, session, cmd, ...)
local f = command[cmd]
if f then
f(sender, ...)
end
end)
协程式通信
moon.call
和 moon.response
组合
- 发送者:生成一个唯一ID,创建一个协程,并保存ID-协程的映射。 发送消息,把协程ID同时发送给接收者,然后挂起协程。
- 接收者:收到消息处理完逻辑,把 处理结果 和收到的协程ID 返回给发送者。
- 发送者:注册一个总的回调函数, 收到结果数据,根据协程ID唤醒协程。
完整示例:
-- filepath: sender.lua
local moon = require("moon")
moon.async(function()
local res, err = moon.call('lua', service_id, "ADD", 1, 2)
if res then
print("Result:", res)
else
print("Error:", err)
end
end)
-- filepath: receiver.lua
local moon = require("moon")
local command = {}
command.ADD = function(sender, session, a, b)
return a+b
end
moon.dispatch("lua", function(sender, session, cmd, ...)
local f = command[cmd]
if f then
local res = {xpcall(f, debug.traceback, ...)}
if res[1] then
moon.response("lua", sender, session, table.unpack(res,2))
else
moon.response("lua", sender, session, table.unpack(res)) -- 失败 返回false和错误信息
end
else
error(string.format("Unknown command %s", tostring(cmd)))
end
end)