cslave.lua 6.5 KB


  1. local skynet = require "skynet"
  2. local socket = require "skynet.socket"
  3. local socketdriver = require "skynet.socketdriver"
  4. require "skynet.manager" -- import skynet.launch, ...
  5. local table = table
  6. local slaves = {}
  7. local connect_queue = {}
  8. local globalname = {}
  9. local queryname = {}
  10. local harbor = {}
  11. local harbor_service
  12. local monitor = {}
  13. local monitor_master_set = {}
  14. local function read_package(fd)
  15. local sz = socket.read(fd, 1)
  16. assert(sz, "closed")
  17. sz = string.byte(sz)
  18. local content = assert(socket.read(fd, sz), "closed")
  19. return skynet.unpack(content)
  20. end
  21. local function pack_package(...)
  22. local message = skynet.packstring(...)
  23. local size = #message
  24. assert(size <= 255 , "too long")
  25. return string.char(size) .. message
  26. end
  27. local function monitor_clear(id)
  28. local v = monitor[id]
  29. if v then
  30. monitor[id] = nil
  31. for _, v in ipairs(v) do
  32. v(true)
  33. end
  34. end
  35. end
  36. local function connect_slave(slave_id, address)
  37. local ok, err = pcall(function()
  38. if slaves[slave_id] == nil then
  39. local fd = assert(socket.open(address), "Can't connect to "..address)
  40. socketdriver.nodelay(fd)
  41. skynet.error(string.format("Connect to harbor %d (fd=%d), %s", slave_id, fd, address))
  42. slaves[slave_id] = fd
  43. monitor_clear(slave_id)
  44. socket.abandon(fd)
  45. skynet.send(harbor_service, "harbor", string.format("S %d %d",fd,slave_id))
  46. end
  47. end)
  48. if not ok then
  49. skynet.error(err)
  50. end
  51. end
  52. local function ready()
  53. local queue = connect_queue
  54. connect_queue = nil
  55. for k,v in pairs(queue) do
  56. connect_slave(k,v)
  57. end
  58. for name,address in pairs(globalname) do
  59. skynet.redirect(harbor_service, address, "harbor", 0, "N " .. name)
  60. end
  61. end
  62. local function response_name(name)
  63. local address = globalname[name]
  64. if queryname[name] then
  65. local tmp = queryname[name]
  66. queryname[name] = nil
  67. for _,resp in ipairs(tmp) do
  68. resp(true, address)
  69. end
  70. end
  71. end
  72. local function monitor_master(master_fd)
  73. while true do
  74. local ok, t, id_name, address = pcall(read_package,master_fd)
  75. if ok then
  76. if t == 'C' then
  77. if connect_queue then
  78. connect_queue[id_name] = address
  79. else
  80. connect_slave(id_name, address)
  81. end
  82. elseif t == 'N' then
  83. globalname[id_name] = address
  84. response_name(id_name)
  85. if connect_queue == nil then
  86. skynet.redirect(harbor_service, address, "harbor", 0, "N " .. id_name)
  87. end
  88. elseif t == 'D' then
  89. local fd = slaves[id_name]
  90. slaves[id_name] = false
  91. if fd then
  92. monitor_clear(id_name)
  93. socket.close(fd)
  94. end
  95. end
  96. else
  97. skynet.error("Master disconnect")
  98. for _, v in ipairs(monitor_master_set) do
  99. v(true)
  100. end
  101. socket.close(master_fd)
  102. break
  103. end
  104. end
  105. end
  106. local function accept_slave(fd)
  107. socket.start(fd)
  108. local id = socket.read(fd, 1)
  109. if not id then
  110. skynet.error(string.format("Connection (fd =%d) closed", fd))
  111. socket.close(fd)
  112. return
  113. end
  114. id = string.byte(id)
  115. if slaves[id] ~= nil then
  116. skynet.error(string.format("Slave %d exist (fd =%d)", id, fd))
  117. socket.close(fd)
  118. return
  119. end
  120. slaves[id] = fd
  121. monitor_clear(id)
  122. socket.abandon(fd)
  123. skynet.error(string.format("Harbor %d connected (fd = %d)", id, fd))
  124. skynet.send(harbor_service, "harbor", string.format("A %d %d", fd, id))
  125. end
  126. skynet.register_protocol {
  127. name = "harbor",
  128. id = skynet.PTYPE_HARBOR,
  129. pack = function(...) return ... end,
  130. unpack = skynet.tostring,
  131. }
  132. skynet.register_protocol {
  133. name = "text",
  134. id = skynet.PTYPE_TEXT,
  135. pack = function(...) return ... end,
  136. unpack = skynet.tostring,
  137. }
  138. local function monitor_harbor(master_fd)
  139. return function(session, source, command)
  140. local t = string.sub(command, 1, 1)
  141. local arg = string.sub(command, 3)
  142. if t == 'Q' then
  143. -- query name
  144. if globalname[arg] then
  145. skynet.redirect(harbor_service, globalname[arg], "harbor", 0, "N " .. arg)
  146. else
  147. socket.write(master_fd, pack_package("Q", arg))
  148. end
  149. elseif t == 'D' then
  150. -- harbor down
  151. local id = tonumber(arg)
  152. if slaves[id] then
  153. monitor_clear(id)
  154. end
  155. slaves[id] = false
  156. else
  157. skynet.error("Unknown command ", command)
  158. end
  159. end
  160. end
  161. function harbor.REGISTER(fd, name, handle)
  162. assert(globalname[name] == nil)
  163. globalname[name] = handle
  164. response_name(name)
  165. socket.write(fd, pack_package("R", name, handle))
  166. skynet.redirect(harbor_service, handle, "harbor", 0, "N " .. name)
  167. end
  168. function harbor.LINK(fd, id)
  169. if slaves[id] then
  170. if monitor[id] == nil then
  171. monitor[id] = {}
  172. end
  173. table.insert(monitor[id], skynet.response())
  174. else
  175. skynet.ret()
  176. end
  177. end
  178. function harbor.LINKMASTER()
  179. table.insert(monitor_master_set, skynet.response())
  180. end
  181. function harbor.CONNECT(fd, id)
  182. if not slaves[id] then
  183. if monitor[id] == nil then
  184. monitor[id] = {}
  185. end
  186. table.insert(monitor[id], skynet.response())
  187. else
  188. skynet.ret()
  189. end
  190. end
  191. function harbor.QUERYNAME(fd, name)
  192. if name:byte() == 46 then -- "." , local name
  193. skynet.ret(skynet.pack(skynet.localname(name)))
  194. return
  195. end
  196. local result = globalname[name]
  197. if result then
  198. skynet.ret(skynet.pack(result))
  199. return
  200. end
  201. local queue = queryname[name]
  202. if queue == nil then
  203. socket.write(fd, pack_package("Q", name))
  204. queue = { skynet.response() }
  205. queryname[name] = queue
  206. else
  207. table.insert(queue, skynet.response())
  208. end
  209. end
  210. skynet.start(function()
  211. local master_addr = skynet.getenv "master"
  212. local harbor_id = tonumber(skynet.getenv "harbor")
  213. local slave_address = assert(skynet.getenv "address")
  214. local slave_fd = socket.listen(slave_address)
  215. skynet.error("slave connect to master " .. tostring(master_addr))
  216. local master_fd = assert(socket.open(master_addr), "Can't connect to master")
  217. skynet.dispatch("lua", function (_,_,command,...)
  218. local f = assert(harbor[command])
  219. f(master_fd, ...)
  220. end)
  221. skynet.dispatch("text", monitor_harbor(master_fd))
  222. harbor_service = assert(skynet.launch("harbor", harbor_id, skynet.self()))
  223. local hs_message = pack_package("H", harbor_id, slave_address)
  224. socket.write(master_fd, hs_message)
  225. local t, n = read_package(master_fd)
  226. assert(t == "W" and type(n) == "number", "slave shakehand failed")
  227. skynet.error(string.format("Waiting for %d harbors", n))
  228. skynet.fork(monitor_master, master_fd)
  229. if n > 0 then
  230. local co = coroutine.running()
  231. socket.start(slave_fd, function(fd, addr)
  232. skynet.error(string.format("New connection (fd = %d, %s)",fd, addr))
  233. socketdriver.nodelay(fd)
  234. if pcall(accept_slave,fd) then
  235. local s = 0
  236. for k,v in pairs(slaves) do
  237. s = s + 1
  238. end
  239. if s >= n then
  240. skynet.wakeup(co)
  241. end
  242. end
  243. end)
  244. skynet.wait()
  245. end
  246. socket.close(slave_fd)
  247. skynet.error("Shakehand ready")
  248. skynet.fork(ready)
  249. end)