clusterd.lua 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. local skynet = require "skynet"
  2. require "skynet.manager"
  3. local cluster = require "skynet.cluster.core"
  4. local config_name = skynet.getenv "cluster"
  5. local node_address = {}
  6. local node_sender = {}
  7. local node_sender_closed = {}
  8. local command = {}
  9. local config = {}
  10. local nodename = cluster.nodename()
  11. local connecting = {}
  12. local function open_channel(t, key)
  13. local ct = connecting[key]
  14. if ct then
  15. local co = coroutine.running()
  16. local channel
  17. while ct do
  18. table.insert(ct, co)
  19. skynet.wait(co)
  20. channel = ct.channel
  21. ct = connecting[key]
  22. -- reload again if ct ~= nil
  23. end
  24. return assert(node_address[key] and channel)
  25. end
  26. ct = {}
  27. connecting[key] = ct
  28. local address = node_address[key]
  29. if address == nil and not config.nowaiting then
  30. local co = coroutine.running()
  31. assert(ct.namequery == nil)
  32. ct.namequery = co
  33. skynet.error("Waiting for cluster node [".. key.."]")
  34. skynet.wait(co)
  35. address = node_address[key]
  36. end
  37. local succ, err, c
  38. if address then
  39. local host, port = string.match(address, "([^:]+):(.*)$")
  40. c = node_sender[key]
  41. if c == nil then
  42. c = skynet.newservice("clustersender", key, nodename, host, port)
  43. if node_sender[key] then
  44. -- double check
  45. skynet.kill(c)
  46. c = node_sender[key]
  47. else
  48. node_sender[key] = c
  49. end
  50. end
  51. succ = pcall(skynet.call, c, "lua", "changenode", host, port)
  52. if succ then
  53. t[key] = c
  54. ct.channel = c
  55. node_sender_closed[key] = nil
  56. else
  57. err = string.format("changenode [%s] (%s:%s) failed", key, host, port)
  58. end
  59. elseif address == false then
  60. c = node_sender[key]
  61. if c == nil or node_sender_closed[key] then
  62. -- no sender or closed, always succ
  63. succ = true
  64. else
  65. -- trun off the sender
  66. succ, err = pcall(skynet.call, c, "lua", "changenode", false)
  67. if succ then --trun off failed, wait next index todo turn off
  68. node_sender_closed[key] = true
  69. end
  70. end
  71. else
  72. err = string.format("cluster node [%s] is absent.", key)
  73. end
  74. connecting[key] = nil
  75. for _, co in ipairs(ct) do
  76. skynet.wakeup(co)
  77. end
  78. if node_address[key] ~= address then
  79. return open_channel(t,key)
  80. end
  81. assert(succ, err)
  82. return c
  83. end
  84. local node_channel = setmetatable({}, { __index = open_channel })
  85. local function loadconfig(tmp)
  86. if tmp == nil then
  87. tmp = {}
  88. if config_name then
  89. local f = assert(io.open(config_name))
  90. local source = f:read "*a"
  91. f:close()
  92. assert(load(source, "@"..config_name, "t", tmp))()
  93. end
  94. end
  95. local reload = {}
  96. for name,address in pairs(tmp) do
  97. if name:sub(1,2) == "__" then
  98. name = name:sub(3)
  99. config[name] = address
  100. skynet.error(string.format("Config %s = %s", name, address))
  101. else
  102. assert(address == false or type(address) == "string")
  103. if node_address[name] ~= address then
  104. -- address changed
  105. if node_sender[name] then
  106. -- reset connection if node_sender[name] exist
  107. node_channel[name] = nil
  108. table.insert(reload, name)
  109. end
  110. node_address[name] = address
  111. end
  112. local ct = connecting[name]
  113. if ct and ct.namequery and not config.nowaiting then
  114. skynet.error(string.format("Cluster node [%s] resloved : %s", name, address))
  115. skynet.wakeup(ct.namequery)
  116. end
  117. end
  118. end
  119. if config.nowaiting then
  120. -- wakeup all connecting request
  121. for name, ct in pairs(connecting) do
  122. if ct.namequery then
  123. skynet.wakeup(ct.namequery)
  124. end
  125. end
  126. end
  127. for _, name in ipairs(reload) do
  128. -- open_channel would block
  129. skynet.fork(open_channel, node_channel, name)
  130. end
  131. end
  132. function command.reload(source, config)
  133. loadconfig(config)
  134. skynet.ret(skynet.pack(nil))
  135. end
  136. function command.listen(source, addr, port, maxclient)
  137. local gate = skynet.newservice("gate")
  138. if port == nil then
  139. local address = assert(node_address[addr], addr .. " is down")
  140. addr, port = string.match(address, "(.+):([^:]+)$")
  141. port = tonumber(port)
  142. assert(port ~= 0)
  143. skynet.call(gate, "lua", "open", { address = addr, port = port, maxclient = maxclient })
  144. skynet.ret(skynet.pack(addr, port))
  145. else
  146. local realaddr, realport = skynet.call(gate, "lua", "open", { address = addr, port = port, maxclient = maxclient })
  147. skynet.ret(skynet.pack(realaddr, realport))
  148. end
  149. end
  150. function command.sender(source, node)
  151. skynet.ret(skynet.pack(node_channel[node]))
  152. end
  153. function command.senders(source)
  154. skynet.retpack(node_sender)
  155. end
  156. local proxy = {}
  157. function command.proxy(source, node, name)
  158. if name == nil then
  159. node, name = node:match "^([^@.]+)([@.].+)"
  160. if name == nil then
  161. error ("Invalid name " .. tostring(node))
  162. end
  163. end
  164. local fullname = node .. "." .. name
  165. local p = proxy[fullname]
  166. if p == nil then
  167. p = skynet.newservice("clusterproxy", node, name)
  168. -- double check
  169. if proxy[fullname] then
  170. skynet.kill(p)
  171. p = proxy[fullname]
  172. else
  173. proxy[fullname] = p
  174. end
  175. end
  176. skynet.ret(skynet.pack(p))
  177. end
  178. local cluster_agent = {} -- fd:service
  179. local register_name = {}
  180. local function clearnamecache()
  181. for fd, service in pairs(cluster_agent) do
  182. if type(service) == "number" then
  183. skynet.send(service, "lua", "namechange")
  184. end
  185. end
  186. end
  187. function command.register(source, name, addr)
  188. assert(register_name[name] == nil)
  189. addr = addr or source
  190. local old_name = register_name[addr]
  191. if old_name then
  192. register_name[old_name] = nil
  193. clearnamecache()
  194. end
  195. register_name[addr] = name
  196. register_name[name] = addr
  197. skynet.ret(nil)
  198. skynet.error(string.format("Register [%s] :%08x", name, addr))
  199. end
  200. function command.unregister(_, name)
  201. if not register_name[name] then
  202. return skynet.ret(nil)
  203. end
  204. local addr = register_name[name]
  205. register_name[addr] = nil
  206. register_name[name] = nil
  207. clearnamecache()
  208. skynet.ret(nil)
  209. skynet.error(string.format("Unregister [%s] :%08x", name, addr))
  210. end
  211. function command.queryname(source, name)
  212. skynet.ret(skynet.pack(register_name[name]))
  213. end
  214. function command.socket(source, subcmd, fd, msg)
  215. if subcmd == "open" then
  216. skynet.error(string.format("socket accept from %s", msg))
  217. -- new cluster agent
  218. cluster_agent[fd] = false
  219. local agent = skynet.newservice("clusteragent", skynet.self(), source, fd)
  220. local closed = cluster_agent[fd]
  221. cluster_agent[fd] = agent
  222. if closed then
  223. skynet.send(agent, "lua", "exit")
  224. cluster_agent[fd] = nil
  225. end
  226. else
  227. if subcmd == "close" or subcmd == "error" then
  228. -- close cluster agent
  229. local agent = cluster_agent[fd]
  230. if type(agent) == "boolean" then
  231. cluster_agent[fd] = true
  232. elseif agent then
  233. skynet.send(agent, "lua", "exit")
  234. cluster_agent[fd] = nil
  235. end
  236. else
  237. skynet.error(string.format("socket %s %d %s", subcmd, fd, msg or ""))
  238. end
  239. end
  240. end
  241. skynet.start(function()
  242. loadconfig()
  243. skynet.dispatch("lua", function(session , source, cmd, ...)
  244. local f = assert(command[cmd])
  245. f(source, ...)
  246. end)
  247. end)