clusteragent.lua 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. local skynet = require "skynet"
  2. local socket = require "skynet.socket"
  3. local cluster = require "skynet.cluster.core"
  4. local ignoreret = skynet.ignoreret
  5. local clusterd, gate, fd = ...
  6. clusterd = tonumber(clusterd)
  7. gate = tonumber(gate)
  8. fd = tonumber(fd)
  9. local large_request = {}
  10. local inquery_name = {}
  11. local register_name_mt = { __index =
  12. function(self, name)
  13. local waitco = inquery_name[name]
  14. if waitco then
  15. local co=coroutine.running()
  16. table.insert(waitco, co)
  17. skynet.wait(co)
  18. return rawget(self, name)
  19. else
  20. waitco = {}
  21. inquery_name[name] = waitco
  22. local addr = skynet.call(clusterd, "lua", "queryname", name:sub(2)) -- name must be '@xxxx'
  23. if addr then
  24. self[name] = addr
  25. end
  26. inquery_name[name] = nil
  27. for _, co in ipairs(waitco) do
  28. skynet.wakeup(co)
  29. end
  30. return addr
  31. end
  32. end
  33. }
  34. local function new_register_name()
  35. return setmetatable({}, register_name_mt)
  36. end
  37. local register_name = new_register_name()
  38. local tracetag
  39. local function dispatch_request(_,_,addr, session, msg, sz, padding, is_push)
  40. ignoreret() -- session is fd, don't call skynet.ret
  41. if session == nil then
  42. -- trace
  43. tracetag = addr
  44. return
  45. end
  46. if padding then
  47. local req = large_request[session] or { addr = addr , is_push = is_push, tracetag = tracetag }
  48. tracetag = nil
  49. large_request[session] = req
  50. cluster.append(req, msg, sz)
  51. return
  52. else
  53. local req = large_request[session]
  54. if req then
  55. tracetag = req.tracetag
  56. large_request[session] = nil
  57. cluster.append(req, msg, sz)
  58. msg,sz = cluster.concat(req)
  59. addr = req.addr
  60. is_push = req.is_push
  61. end
  62. if not msg then
  63. tracetag = nil
  64. local response = cluster.packresponse(session, false, "Invalid large req")
  65. socket.write(fd, response)
  66. return
  67. end
  68. end
  69. local ok, response
  70. if addr == 0 then
  71. local name = skynet.unpack(msg, sz)
  72. skynet.trash(msg, sz)
  73. local addr = register_name["@" .. name]
  74. if addr then
  75. ok = true
  76. msg = skynet.packstring(addr)
  77. else
  78. ok = false
  79. msg = "name not found"
  80. end
  81. sz = nil
  82. else
  83. if cluster.isname(addr) then
  84. addr = register_name[addr]
  85. end
  86. if addr then
  87. if is_push then
  88. skynet.rawsend(addr, "lua", msg, sz)
  89. return -- no response
  90. else
  91. if tracetag then
  92. ok , msg, sz = pcall(skynet.tracecall, tracetag, addr, "lua", msg, sz)
  93. tracetag = nil
  94. else
  95. ok , msg, sz = pcall(skynet.rawcall, addr, "lua", msg, sz)
  96. end
  97. end
  98. else
  99. ok = false
  100. msg = "Invalid name"
  101. end
  102. end
  103. if ok then
  104. response = cluster.packresponse(session, true, msg, sz)
  105. if type(response) == "table" then
  106. for _, v in ipairs(response) do
  107. socket.lwrite(fd, v)
  108. end
  109. else
  110. socket.write(fd, response)
  111. end
  112. else
  113. response = cluster.packresponse(session, false, msg)
  114. socket.write(fd, response)
  115. end
  116. end
  117. skynet.start(function()
  118. skynet.register_protocol {
  119. name = "client",
  120. id = skynet.PTYPE_CLIENT,
  121. unpack = cluster.unpackrequest,
  122. dispatch = dispatch_request,
  123. }
  124. -- fd can write, but don't read fd, the data package will forward from gate though client protocol.
  125. skynet.call(gate, "lua", "forward", fd)
  126. skynet.dispatch("lua", function(_,source, cmd, ...)
  127. if cmd == "exit" then
  128. socket.close_fd(fd)
  129. skynet.exit()
  130. elseif cmd == "namechange" then
  131. register_name = new_register_name()
  132. else
  133. skynet.error(string.format("Invalid command %s from %s", cmd, skynet.address(source)))
  134. end
  135. end)
  136. end)