|
- local skynet = require "skynet"
- local mc = require "skynet.multicast.core"
- local datacenter = require "skynet.datacenter"
- local harbor_id = skynet.harbor(skynet.self())
- local command = {}
- local channel = {}
- local channel_n = {}
- local channel_remote = {}
- local channel_id = harbor_id
- local NORET = {}
- local function get_address(t, id)
- local v = assert(datacenter.get("multicast", id))
- t[id] = v
- return v
- end
- local node_address = setmetatable({}, { __index = get_address })
- -- new LOCAL channel , The low 8bit is the same with harbor_id
- function command.NEW()
- while channel[channel_id] do
- channel_id = mc.nextid(channel_id)
- end
- channel[channel_id] = {}
- channel_n[channel_id] = 0
- local ret = channel_id
- channel_id = mc.nextid(channel_id)
- return ret
- end
- -- MUST call by the owner node of channel, delete a remote channel
- function command.DELR(source, c)
- channel[c] = nil
- channel_n[c] = nil
- return NORET
- end
- -- delete a channel, if the channel is remote, forward the command to the owner node
- -- otherwise, delete the channel, and call all the remote node, DELR
- function command.DEL(source, c)
- local node = c % 256
- if node ~= harbor_id then
- skynet.send(node_address[node], "lua", "DEL", c)
- return NORET
- end
- local remote = channel_remote[c]
- channel[c] = nil
- channel_n[c] = nil
- channel_remote[c] = nil
- if remote then
- for node in pairs(remote) do
- skynet.send(node_address[node], "lua", "DELR", c)
- end
- end
- return NORET
- end
- -- forward multicast message to a node (channel id use the session field)
- local function remote_publish(node, channel, source, ...)
- skynet.redirect(node_address[node], source, "multicast", channel, ...)
- end
- -- publish a message, for local node, use the message pointer (call mc.bind to add the reference)
- -- for remote node, call remote_publish. (call mc.unpack and skynet.tostring to convert message pointer to string)
- local function publish(c , source, pack, size)
- local remote = channel_remote[c]
- if remote then
- -- remote publish should unpack the pack, because we should not publish the pointer out.
- local _, msg, sz = mc.unpack(pack, size)
- local msg = skynet.tostring(msg,sz)
- for node in pairs(remote) do
- remote_publish(node, c, source, msg)
- end
- end
- local group = channel[c]
- if group == nil or next(group) == nil then
- -- dead channel, delete the pack. mc.bind returns the pointer in pack and free the pack (struct mc_package **)
- local pack = mc.bind(pack, 1)
- mc.close(pack)
- return
- end
- local msg = skynet.tostring(pack, size) -- copy (pack,size) to a string
- mc.bind(pack, channel_n[c]) -- mc.bind will free the pack(struct mc_package **)
- for k in pairs(group) do
- -- the msg is a pointer to the real message, publish pointer in local is ok.
- skynet.redirect(k, source, "multicast", c , msg)
- end
- end
- skynet.register_protocol {
- name = "multicast",
- id = skynet.PTYPE_MULTICAST,
- unpack = function(msg, sz)
- return mc.packremote(msg, sz)
- end,
- dispatch = function (...)
- skynet.ignoreret()
- publish(...)
- end,
- }
- -- publish a message, if the caller is remote, forward the message to the owner node (by remote_publish)
- -- If the caller is local, call publish
- function command.PUB(source, c, pack, size)
- assert(skynet.harbor(source) == harbor_id)
- local node = c % 256
- if node ~= harbor_id then
- -- remote publish
- remote_publish(node, c, source, mc.remote(pack))
- else
- publish(c, source, pack,size)
- end
- end
- -- the node (source) subscribe a channel
- -- MUST call by channel owner node (assert source is not local and channel is create by self)
- -- If channel is not exist, return true
- -- Else set channel_remote[channel] true
- function command.SUBR(source, c)
- local node = skynet.harbor(source)
- if not channel[c] then
- -- channel none exist
- return true
- end
- assert(node ~= harbor_id and c % 256 == harbor_id)
- local group = channel_remote[c]
- if group == nil then
- group = {}
- channel_remote[c] = group
- end
- group[node] = true
- end
- -- the service (source) subscribe a channel
- -- If the channel is remote, node subscribe it by send a SUBR to the owner .
- function command.SUB(source, c)
- local node = c % 256
- if node ~= harbor_id then
- -- remote group
- if channel[c] == nil then
- if skynet.call(node_address[node], "lua", "SUBR", c) then
- return
- end
- if channel[c] == nil then
- -- double check, because skynet.call whould yield, other SUB may occur.
- channel[c] = {}
- channel_n[c] = 0
- end
- end
- end
- local group = channel[c]
- if group and not group[source] then
- channel_n[c] = channel_n[c] + 1
- group[source] = true
- end
- end
- -- MUST call by a node, unsubscribe a channel
- function command.USUBR(source, c)
- local node = skynet.harbor(source)
- assert(node ~= harbor_id)
- local group = assert(channel_remote[c])
- group[node] = nil
- return NORET
- end
- -- Unsubscribe a channel, if the subscriber is empty and the channel is remote, send USUBR to the channel owner
- function command.USUB(source, c)
- local group = assert(channel[c])
- if group[source] then
- group[source] = nil
- channel_n[c] = channel_n[c] - 1
- if channel_n[c] == 0 then
- local node = c % 256
- if node ~= harbor_id then
- -- remote group
- channel[c] = nil
- channel_n[c] = nil
- skynet.send(node_address[node], "lua", "USUBR", c)
- end
- end
- end
- return NORET
- end
- skynet.start(function()
- skynet.dispatch("lua", function(_,source, cmd, ...)
- local f = assert(command[cmd])
- local result = f(source, ...)
- if result ~= NORET then
- skynet.ret(skynet.pack(result))
- end
- end)
- local self = skynet.self()
- local id = skynet.harbor(self)
- assert(datacenter.set("multicast", id, self) == nil)
- end)
|