agent.lua 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. local skynet = require "skynet"
  2. -- local socket = require "skynet.socket"
  3. -- local sprotoloader = require "sprotoloader"
  4. local logger = require "logger"
  5. local timer = require "timer"
  6. local player = require "model.player"
  7. local cjson = require "cjson"
  8. -- local JsSProto = require "JsSProto"
  9. local websocket = require "http.websocket"
  10. local traceback = debug.traceback
  11. local skynet_retpack = skynet.retpack
  12. local skynet_call = skynet.call
  13. local skynet_send = skynet.send
  14. local string_pack = string.pack
  15. local string_format = string.format
  16. local websocket_write = websocket.write
  17. local trace = logger.trace
  18. local warn = logger.warn
  19. local handle_request = player.request
  20. local handle_response = player.response
  21. local handle_command = player.command
  22. local handle_disconnect = player.disconnect
  23. local pbSproto = require "pbSproto"
  24. local cjson_decode = cjson.decode -- 解码
  25. local cjson_encode = cjson.encode -- 编码
  26. -- local mydispatch = JsSProto.decode
  27. -- local mypack = JsSProto.encode
  28. local mydispatch = pbSproto.decode
  29. local mypack = pbSproto.encode
  30. local fd
  31. -- local host = sprotoloader.load(1):host( "package")
  32. -- local pack = host:attach(sprotoloader.load(2))
  33. local abandon
  34. local watchdog
  35. local protocol
  36. local window = 0
  37. local tracker = {}
  38. local function write(data)
  39. return websocket_write(fd, data, "binary")
  40. end
  41. local function send(pname, args, callback)
  42. return write(mypack(pname, args))
  43. end
  44. local function kick(reason, ...)
  45. if abandon then return end
  46. if reason then
  47. logger.error(reason, ...)
  48. end
  49. abandon = true
  50. skynet_call(watchdog, "lua", "close", fd)
  51. end
  52. local kill_timer
  53. local function throw_exception(type, brief, linger)
  54. assert(type)
  55. brief = brief or "unknown"
  56. if abandon then
  57. return
  58. end
  59. -- send('exception', { type=type, brief=brief })
  60. send('exception_nty', { errno=brief })
  61. linger = linger or 500
  62. kill_timer = timer(linger, function()
  63. kick("exception: { type=%s, brief=%s }", type, brief)
  64. end)
  65. end
  66. local function dispatch_request(pname, args, response)
  67. if abandon then
  68. trace("drop a request message: %s", pname)
  69. return
  70. end
  71. local r = handle_request(pname, args)
  72. if response and r then
  73. write(response(r))
  74. end
  75. end
  76. local function dispatch_response(session, args)
  77. local ctx = assert(tracker[session])
  78. local pname = assert(ctx.pname)
  79. local callback = assert(ctx.callback)
  80. tracker[session] = nil
  81. if abandon then
  82. trace("drop a response message: %s", pname)
  83. return
  84. end
  85. handle_response(pname, args, callback)
  86. end
  87. skynet.register_protocol {
  88. name = "client",
  89. id = skynet.PTYPE_CLIENT,
  90. unpack = function (msg, sz)
  91. return mydispatch(msg, sz)
  92. end,
  93. dispatch = function(_, _, type, pname, ...)
  94. skynet.ignoreret()
  95. local f = (type == "REQUEST") and dispatch_request
  96. if f then
  97. local ok, msg = xpcall(f, traceback, pname, ...)
  98. if not ok then
  99. kick("pname: %s, err: %s", pname, msg)
  100. end
  101. end
  102. end
  103. }
  104. local CMD = {}
  105. function CMD.start(ctx)
  106. local args = assert(ctx.args)
  107. local gate = assert(ctx.gate)
  108. local ipaddr = assert(ctx.ipaddr)
  109. fd = assert(ctx.fd)
  110. watchdog = assert(ctx.watchdog)
  111. protocol = ctx.protocol
  112. websocket.forward(fd, protocol, ipaddr)
  113. local resp = player.start({
  114. args = args,
  115. ipaddr = ipaddr,
  116. send = send,
  117. kick = kick,
  118. throw_exception = throw_exception,
  119. pbSproto = pbSproto,
  120. })
  121. skynet_call(gate, "lua", "forward", fd)
  122. local skynet_sleep = skynet.sleep
  123. skynet.fork(function()
  124. collectgarbage "collect"
  125. while true do
  126. skynet_sleep(60*100)
  127. local kb, bytes = collectgarbage "count"
  128. if kb >= 4096 then
  129. logger.trace("内存超过4M,将启动垃圾回收")
  130. collectgarbage "collect"
  131. end
  132. end
  133. end)
  134. return resp
  135. end
  136. local suspend = nil
  137. local dispose = nil
  138. local function coroutine_yield()
  139. local co = coroutine.running()
  140. suspend = suspend or {}
  141. table.insert(suspend, co)
  142. skynet.wait(co)
  143. if dispose then
  144. dispose()
  145. end
  146. end
  147. local function coroutine_resume()
  148. suspend = suspend or {}
  149. local ref = 0
  150. for _, co in ipairs(suspend) do
  151. ref = ref + 1
  152. skynet.wakeup(co)
  153. end
  154. if ref > 0 then
  155. local co = coroutine.running()
  156. dispose = function()
  157. ref = ref - 1
  158. if ref == 0 then
  159. skynet.wakeup(co)
  160. end
  161. end
  162. skynet.wait(co)
  163. end
  164. end
  165. function CMD.disconnect()
  166. -- todo: do something before exit
  167. -- skynet.sleep(1000)
  168. if kill_timer then
  169. kill_timer()
  170. end
  171. handle_disconnect()
  172. websocket.clear_pool(fd)
  173. coroutine_resume()
  174. skynet.exit()
  175. end
  176. -- Reference: watchdog.command.hotfix
  177. function CMD.exit()
  178. skynet.exit()
  179. end
  180. -- Reference: usercenter.command.maintain
  181. function CMD.maintain(...)
  182. local args=select(1, ...)
  183. local key
  184. if select("#",...)==0 then
  185. key=STD_ERR.SERVER_START_MAINTENANCE --"服务器开始维护"
  186. else
  187. key=args
  188. end
  189. throw_exception("maintain",key, 300)
  190. coroutine_yield()
  191. end
  192. -- Reference: usercenter.command.login
  193. function CMD.multilogin()
  194. throw_exception("multilogin", STD_ERR.ACCOUNT_LOGGING_ANOTHER_DEVICE, 100) --"帐号正在其它设备登录"
  195. coroutine_yield()
  196. end
  197. function CMD.broad_cast_msg(pname, args)
  198. assert(pname)
  199. trace("CMD.broad_cast_msg: %s", pname)
  200. send(pname, args or {})
  201. end
  202. function CMD.kick(reason, ...)
  203. -- send('exception',{ type="kick", brief=reason })
  204. kick(reason, ...)
  205. end
  206. skynet.start(function()
  207. logger.label("<Agent>")
  208. skynet.dispatch("lua", function(session,_, cmd, ...)
  209. trace("handle command: '%s'", cmd)
  210. local f = CMD[cmd]
  211. if f then
  212. if session == 0 then
  213. f(...)
  214. else
  215. skynet_retpack(f(...))
  216. end
  217. else
  218. handle_command(session,cmd, ...)
  219. end
  220. end)
  221. end)