cmaster.lua 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. local skynet = require "skynet"
  2. local socket = require "skynet.socket"
  3. --[[
  4. master manage data :
  5. 1. all the slaves address : id -> ipaddr:port
  6. 2. all the global names : name -> address
  7. master hold connections from slaves .
  8. protocol slave->master :
  9. package size 1 byte
  10. type 1 byte :
  11. 'H' : HANDSHAKE, report slave id, and address.
  12. 'R' : REGISTER name address
  13. 'Q' : QUERY name
  14. protocol master->slave:
  15. package size 1 byte
  16. type 1 byte :
  17. 'W' : WAIT n
  18. 'C' : CONNECT slave_id slave_address
  19. 'N' : NAME globalname address
  20. 'D' : DISCONNECT slave_id
  21. ]]
  22. local slave_node = {}
  23. local global_name = {}
  24. local function read_package(fd)
  25. local sz = socket.read(fd, 1)
  26. assert(sz, "closed")
  27. sz = string.byte(sz)
  28. local content = assert(socket.read(fd, sz), "closed")
  29. return skynet.unpack(content)
  30. end
  31. local function pack_package(...)
  32. local message = skynet.packstring(...)
  33. local size = #message
  34. assert(size <= 255 , "too long")
  35. return string.char(size) .. message
  36. end
  37. local function report_slave(fd, slave_id, slave_addr)
  38. local message = pack_package("C", slave_id, slave_addr)
  39. local n = 0
  40. for k,v in pairs(slave_node) do
  41. if v.fd ~= 0 then
  42. socket.write(v.fd, message)
  43. n = n + 1
  44. end
  45. end
  46. socket.write(fd, pack_package("W", n))
  47. end
  48. local function handshake(fd)
  49. local t, slave_id, slave_addr = read_package(fd)
  50. assert(t=='H', "Invalid handshake type " .. t)
  51. assert(slave_id ~= 0 , "Invalid slave id 0")
  52. if slave_node[slave_id] then
  53. error(string.format("Slave %d already register on %s", slave_id, slave_node[slave_id].addr))
  54. end
  55. report_slave(fd, slave_id, slave_addr)
  56. slave_node[slave_id] = {
  57. fd = fd,
  58. id = slave_id,
  59. addr = slave_addr,
  60. }
  61. return slave_id , slave_addr
  62. end
  63. local function dispatch_slave(fd)
  64. local t, name, address = read_package(fd)
  65. if t == 'R' then
  66. -- register name
  67. assert(type(address)=="number", "Invalid request")
  68. if not global_name[name] then
  69. global_name[name] = address
  70. end
  71. local message = pack_package("N", name, address)
  72. for k,v in pairs(slave_node) do
  73. socket.write(v.fd, message)
  74. end
  75. elseif t == 'Q' then
  76. -- query name
  77. local address = global_name[name]
  78. if address then
  79. socket.write(fd, pack_package("N", name, address))
  80. end
  81. else
  82. skynet.error("Invalid slave message type " .. t)
  83. end
  84. end
  85. local function monitor_slave(slave_id, slave_address)
  86. local fd = slave_node[slave_id].fd
  87. skynet.error(string.format("Harbor %d (fd=%d) report %s", slave_id, fd, slave_address))
  88. while pcall(dispatch_slave, fd) do end
  89. skynet.error("slave " ..slave_id .. " is down")
  90. local message = pack_package("D", slave_id)
  91. slave_node[slave_id].fd = 0
  92. for k,v in pairs(slave_node) do
  93. socket.write(v.fd, message)
  94. end
  95. socket.close(fd)
  96. end
  97. skynet.start(function()
  98. local master_addr = skynet.getenv "standalone"
  99. skynet.error("master listen socket " .. tostring(master_addr))
  100. local fd = socket.listen(master_addr)
  101. socket.start(fd , function(id, addr)
  102. skynet.error("connect from " .. addr .. " " .. id)
  103. socket.start(id)
  104. local ok, slave, slave_addr = pcall(handshake, id)
  105. if ok then
  106. skynet.fork(monitor_slave, slave, slave_addr)
  107. else
  108. skynet.error(string.format("disconnect fd = %d, error = %s", id, slave))
  109. socket.close(id)
  110. end
  111. end)
  112. end)