multicastd.lua 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. local skynet = require "skynet"
  2. local mc = require "skynet.multicast.core"
  3. local datacenter = require "skynet.datacenter"
  4. local harbor_id = skynet.harbor(skynet.self())
  5. local command = {}
  6. local channel = {}
  7. local channel_n = {}
  8. local channel_remote = {}
  9. local channel_id = harbor_id
  10. local NORET = {}
  11. local function get_address(t, id)
  12. local v = assert(datacenter.get("multicast", id))
  13. t[id] = v
  14. return v
  15. end
  16. local node_address = setmetatable({}, { __index = get_address })
  17. -- new LOCAL channel , The low 8bit is the same with harbor_id
  18. function command.NEW()
  19. while channel[channel_id] do
  20. channel_id = mc.nextid(channel_id)
  21. end
  22. channel[channel_id] = {}
  23. channel_n[channel_id] = 0
  24. local ret = channel_id
  25. channel_id = mc.nextid(channel_id)
  26. return ret
  27. end
  28. -- MUST call by the owner node of channel, delete a remote channel
  29. function command.DELR(source, c)
  30. channel[c] = nil
  31. channel_n[c] = nil
  32. return NORET
  33. end
  34. -- delete a channel, if the channel is remote, forward the command to the owner node
  35. -- otherwise, delete the channel, and call all the remote node, DELR
  36. function command.DEL(source, c)
  37. local node = c % 256
  38. if node ~= harbor_id then
  39. skynet.send(node_address[node], "lua", "DEL", c)
  40. return NORET
  41. end
  42. local remote = channel_remote[c]
  43. channel[c] = nil
  44. channel_n[c] = nil
  45. channel_remote[c] = nil
  46. if remote then
  47. for node in pairs(remote) do
  48. skynet.send(node_address[node], "lua", "DELR", c)
  49. end
  50. end
  51. return NORET
  52. end
  53. -- forward multicast message to a node (channel id use the session field)
  54. local function remote_publish(node, channel, source, ...)
  55. skynet.redirect(node_address[node], source, "multicast", channel, ...)
  56. end
  57. -- publish a message, for local node, use the message pointer (call mc.bind to add the reference)
  58. -- for remote node, call remote_publish. (call mc.unpack and skynet.tostring to convert message pointer to string)
  59. local function publish(c , source, pack, size)
  60. local remote = channel_remote[c]
  61. if remote then
  62. -- remote publish should unpack the pack, because we should not publish the pointer out.
  63. local _, msg, sz = mc.unpack(pack, size)
  64. local msg = skynet.tostring(msg,sz)
  65. for node in pairs(remote) do
  66. remote_publish(node, c, source, msg)
  67. end
  68. end
  69. local group = channel[c]
  70. if group == nil or next(group) == nil then
  71. -- dead channel, delete the pack. mc.bind returns the pointer in pack and free the pack (struct mc_package **)
  72. local pack = mc.bind(pack, 1)
  73. mc.close(pack)
  74. return
  75. end
  76. local msg = skynet.tostring(pack, size) -- copy (pack,size) to a string
  77. mc.bind(pack, channel_n[c]) -- mc.bind will free the pack(struct mc_package **)
  78. for k in pairs(group) do
  79. -- the msg is a pointer to the real message, publish pointer in local is ok.
  80. skynet.redirect(k, source, "multicast", c , msg)
  81. end
  82. end
  83. skynet.register_protocol {
  84. name = "multicast",
  85. id = skynet.PTYPE_MULTICAST,
  86. unpack = function(msg, sz)
  87. return mc.packremote(msg, sz)
  88. end,
  89. dispatch = function (...)
  90. skynet.ignoreret()
  91. publish(...)
  92. end,
  93. }
  94. -- publish a message, if the caller is remote, forward the message to the owner node (by remote_publish)
  95. -- If the caller is local, call publish
  96. function command.PUB(source, c, pack, size)
  97. assert(skynet.harbor(source) == harbor_id)
  98. local node = c % 256
  99. if node ~= harbor_id then
  100. -- remote publish
  101. remote_publish(node, c, source, mc.remote(pack))
  102. else
  103. publish(c, source, pack,size)
  104. end
  105. end
  106. -- the node (source) subscribe a channel
  107. -- MUST call by channel owner node (assert source is not local and channel is create by self)
  108. -- If channel is not exist, return true
  109. -- Else set channel_remote[channel] true
  110. function command.SUBR(source, c)
  111. local node = skynet.harbor(source)
  112. if not channel[c] then
  113. -- channel none exist
  114. return true
  115. end
  116. assert(node ~= harbor_id and c % 256 == harbor_id)
  117. local group = channel_remote[c]
  118. if group == nil then
  119. group = {}
  120. channel_remote[c] = group
  121. end
  122. group[node] = true
  123. end
  124. -- the service (source) subscribe a channel
  125. -- If the channel is remote, node subscribe it by send a SUBR to the owner .
  126. function command.SUB(source, c)
  127. local node = c % 256
  128. if node ~= harbor_id then
  129. -- remote group
  130. if channel[c] == nil then
  131. if skynet.call(node_address[node], "lua", "SUBR", c) then
  132. return
  133. end
  134. if channel[c] == nil then
  135. -- double check, because skynet.call whould yield, other SUB may occur.
  136. channel[c] = {}
  137. channel_n[c] = 0
  138. end
  139. end
  140. end
  141. local group = channel[c]
  142. if group and not group[source] then
  143. channel_n[c] = channel_n[c] + 1
  144. group[source] = true
  145. end
  146. end
  147. -- MUST call by a node, unsubscribe a channel
  148. function command.USUBR(source, c)
  149. local node = skynet.harbor(source)
  150. assert(node ~= harbor_id)
  151. local group = assert(channel_remote[c])
  152. group[node] = nil
  153. return NORET
  154. end
  155. -- Unsubscribe a channel, if the subscriber is empty and the channel is remote, send USUBR to the channel owner
  156. function command.USUB(source, c)
  157. local group = assert(channel[c])
  158. if group[source] then
  159. group[source] = nil
  160. channel_n[c] = channel_n[c] - 1
  161. if channel_n[c] == 0 then
  162. local node = c % 256
  163. if node ~= harbor_id then
  164. -- remote group
  165. channel[c] = nil
  166. channel_n[c] = nil
  167. skynet.send(node_address[node], "lua", "USUBR", c)
  168. end
  169. end
  170. end
  171. return NORET
  172. end
  173. skynet.start(function()
  174. skynet.dispatch("lua", function(_,source, cmd, ...)
  175. local f = assert(command[cmd])
  176. local result = f(source, ...)
  177. if result ~= NORET then
  178. skynet.ret(skynet.pack(result))
  179. end
  180. end)
  181. local self = skynet.self()
  182. local id = skynet.harbor(self)
  183. assert(datacenter.set("multicast", id, self) == nil)
  184. end)