ws_watchdog.lua 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. local skynet = require "skynet"
  2. require "skynet.manager"
  3. local socket = require "skynet.socket"
  4. -- local sprotoloader = require "sprotoloader"
  5. local logger = require "logger"
  6. local stringify = require "stringify"
  7. local util = require "util"
  8. local cjson = require "cjson"
  9. -- local JsSProto = require "JsSProto"
  10. local pbSproto = require "pbSproto"
  11. local websocket = require "http.websocket"
  12. local skynet_call = skynet.call
  13. local skynet_send = skynet.send
  14. local skynet_now = skynet.now
  15. local skynet_newservice = skynet.newservice
  16. local skynet_sleep = skynet.sleep
  17. local skynet_fork = skynet.fork
  18. local skynet_wakeup = skynet.wakeup
  19. local skynet_wait = skynet.wait
  20. local skynet_yield = skynet.yield
  21. local skynet_retpack = skynet.retpack
  22. local websocket_write = websocket.write
  23. local string_pack = string.pack
  24. local traceback = debug.traceback
  25. local table_concat = table.concat
  26. local table_insert = table.insert
  27. local table_remove = table.remove
  28. local warn = logger.warn
  29. local trace = logger.trace
  30. local info = logger.info
  31. local cjson_decode = cjson.decode -- 解码
  32. local cjson_encode = cjson.encode -- 编码
  33. -- local mydispatch = JsSProto.decode
  34. -- local mypack = JsSProto.encode
  35. local mydispatch = pbSproto.decode
  36. local mypack = pbSproto.encode
  37. local MAX_ONLINE_NUM = 3000
  38. local MAX_CONCURRENT_NUM = 32 -- 最大并发数
  39. -- local host = sprotoloader.load(1):host( "package")
  40. -- local pack = host:attach(sprotoloader.load(2))
  41. local gate
  42. local watchdog
  43. local loginserver
  44. local protocol
  45. local function ipaddr_parse(addr)
  46. -- check for format 1.11.111.111 for IPV4
  47. local chunks = {addr:match("(%d+)%.(%d+)%.(%d+)%.(%d+)")}
  48. if #chunks == 4 then
  49. return table_concat(chunks, ".")
  50. end
  51. -- check for IPV6 format, should be 8 'chunks' of numbers/letters
  52. -- without trailing chars
  53. local chunks = {addr:match(("([a-fA-F0-9]*):"):rep(8):gsub(":$","$"))}
  54. if #chunks == 8 then
  55. return table_concat(chunks, ":")
  56. end
  57. return addr
  58. end
  59. local function write(fd, data)
  60. return pcall(websocket_write, fd, data, "binary")
  61. -- return websocket_write(fd, data)
  62. end
  63. local function send(fd, pname, args)
  64. pcall(write, fd, mypack(pname, args))
  65. -- write(fd, mypack(pname, args))
  66. end
  67. local sessions = {} -- 会话字典, fd -> { agent, args, fd, ipaddr... }
  68. local total_session_number = 0 -- 当前连接数
  69. local total_player_number = 0 -- 当前在线玩家数
  70. local total_offline_number = 0
  71. local function close_session(fd)
  72. local session = sessions[fd]
  73. sessions[fd] = nil
  74. if session then
  75. session.deadline = false
  76. total_session_number = total_session_number - 1
  77. skynet_call(gate, "lua", "kick", fd)
  78. trace("关闭套接字 %s", fd)
  79. trace("session.agent %s", session.agent)
  80. if session.agent then
  81. total_player_number = total_player_number - 1
  82. total_offline_number = total_offline_number + 1
  83. -- disconnect never return
  84. skynet_send(session.agent, "lua", "disconnect")
  85. end
  86. end
  87. end
  88. local function new_session(fd, ipaddr)
  89. local self = {
  90. agent = false,
  91. args = false,
  92. next = false,
  93. -- deadline = skynet_now()+1000,
  94. deadline = skynet_now()+100000,
  95. fd = fd,
  96. ipaddr = ipaddr,
  97. }
  98. function self.close()
  99. close_session(fd)
  100. websocket.clear_pool(fd)
  101. end
  102. total_session_number = total_session_number + 1
  103. return self
  104. end
  105. local pool = {}
  106. local pool_size = 8
  107. local function spawn()
  108. local min = math.min
  109. while true do
  110. local step = min(pool_size - #pool, 16)
  111. for i=1, step do
  112. table_insert(pool, skynet_newservice("agent"))
  113. end
  114. skynet_sleep(50)
  115. end
  116. end
  117. local function new_agent()
  118. local len = #pool
  119. if len > 0 then
  120. local address = pool[len]
  121. pool[len] = nil
  122. return address
  123. else
  124. return skynet_newservice("agent")
  125. end
  126. end
  127. local offline_avg_number = 1 -- 每分钟的平均下线人数
  128. local queue = {} -- 等待队列
  129. local waiter = {}
  130. local function collect()
  131. local max = math.max
  132. local floor = math.floor
  133. local sampling = { 0,0,0,0,0,0,0,0,0,0 }
  134. while true do
  135. skynet_sleep(60*100)
  136. table_remove(sampling, 1)
  137. table_insert(sampling, total_offline_number)
  138. total_offline_number = 0
  139. local sum = 0
  140. for _, v in ipairs(sampling) do
  141. sum = sum + v
  142. end
  143. offline_avg_number = max(floor(sum / #sampling), 1)
  144. end
  145. end
  146. local function flush_invalid_session()
  147. for pos = #queue, 1, -1 do
  148. local fd = queue[pos]
  149. if sessions[fd] == nil then
  150. table_remove(queue, pos)
  151. trace("从登录队列中移除失效套接字: %s", fd)
  152. end
  153. end
  154. end
  155. local function session_wakeup(session)
  156. assert(not session.agent)
  157. local response = assert(session.response)
  158. local ctx = {
  159. args = assert(session.args),
  160. fd = assert(session.fd),
  161. ipaddr = assert(session.ipaddr),
  162. gate = gate,
  163. watchdog = watchdog,
  164. protocol = protocol
  165. }
  166. -- 启动 agent 的初始化流程
  167. local agent = new_agent()
  168. -- logger.trace("new_agent:%s", agent)
  169. local ok, ret = pcall(skynet_call, agent, "lua", "start", ctx)
  170. if ok and sessions[session.fd] then
  171. trace("Agent 进入游戏, fd=%s", session.fd)
  172. session.args = false
  173. session.response = false
  174. session.agent = agent
  175. total_player_number = total_player_number + 1
  176. response(ret)
  177. websocket.clear_pool(session.fd)
  178. else
  179. warn("Agent 启动失败, fd=%s", session.fd)
  180. assert(session.agent == false)
  181. session.close()
  182. skynet_send(agent, "lua", "disconnect")
  183. logger.trace("Agent 启动失败原因 fd:%s,%s", session.fd, ret or "nil")
  184. end
  185. end
  186. local function dispatch_session_wakeup()
  187. local ti = skynet_now()
  188. local freenum = math.min(MAX_ONLINE_NUM-total_player_number, MAX_CONCURRENT_NUM)
  189. if freenum > 0 and #queue > 0 then
  190. local ref = 0
  191. local current_thread = coroutine.running()
  192. local launch = function(session)
  193. session_wakeup(session)
  194. ref = ref - 1
  195. if ref == 0 then
  196. skynet_wakeup(current_thread)
  197. end
  198. end
  199. while freenum > 0 and #queue > 0 do
  200. local fd = table_remove(queue, 1)
  201. local session = sessions[fd]
  202. if session then
  203. freenum = freenum - 1
  204. ref = ref + 1
  205. skynet_fork(launch, session)
  206. end
  207. end
  208. if ref > 0 then
  209. skynet_wait(current_thread)
  210. assert(ref == 0)
  211. end
  212. end
  213. return skynet_now()-ti
  214. end
  215. local function timecost(pos)
  216. assert(pos > 0)
  217. local freenum = MAX_ONLINE_NUM - total_player_number
  218. local ti = pos / MAX_CONCURRENT_NUM + 1
  219. if freenum < pos then
  220. ti = ti + ((pos-freenum) / offline_avg_number) * 60
  221. return math.ceil(ti), true -- seconds
  222. else
  223. return math.ceil(ti)
  224. end
  225. end
  226. local data = {}
  227. local function sync_progress(fd, pos)
  228. assert(pos > 0)
  229. local ti, queued = timecost(pos)
  230. if ti > 5 or queued then
  231. data.number = pos
  232. data.waiting = ti
  233. -- send(fd, "queue", data)
  234. trace("套接字(%s): 排在第 %s 的位置,预计 %s 秒之后进入游戏", fd, pos, ti)
  235. end
  236. end
  237. local function broadcast()
  238. local min = math.min
  239. local max = math.max
  240. local limit_size = 128
  241. local invalid_session = { agent=true }
  242. local paging = {}
  243. while true do
  244. local ti = skynet_now()
  245. local freenum = MAX_ONLINE_NUM - total_player_number
  246. if freenum < #queue then
  247. -- 清除无效 session 对象
  248. flush_invalid_session()
  249. for pos, fd in ipairs(queue) do
  250. if pos > limit_size then
  251. -- 添加到分页缓存
  252. table_insert(paging, fd)
  253. else
  254. sync_progress(fd, pos)
  255. end
  256. end
  257. end
  258. -- 向客户端推送排队进度
  259. local pos = limit_size
  260. while #paging > 0 do
  261. --给其它协程一些工作机会
  262. skynet_sleep(10)
  263. -- 向部分客户端推送进度
  264. local range = min(#paging, limit_size)
  265. for i=1, range do
  266. local fd = table_remove(paging, 1)
  267. local session = sessions[fd] or invalid_session
  268. if not session.agent then
  269. pos = pos + 1
  270. sync_progress(fd, pos)
  271. end
  272. end
  273. end
  274. -- 执行每5秒一个同步周期(未过载时)
  275. ti = max(500-(skynet_now()-ti), 100)
  276. skynet_sleep(ti)
  277. end
  278. end
  279. function waiter.add(fd)
  280. table_insert(queue, fd)
  281. sync_progress(fd, #queue)
  282. end
  283. function waiter.start()
  284. skynet.fork(broadcast)
  285. skynet.fork(collect)
  286. -- 定期让一些排队玩家进入游戏(有空位的话)
  287. skynet.fork(function()
  288. while true do
  289. local ti = 100 - dispatch_session_wakeup()
  290. if ti > 0 then
  291. skynet_sleep(ti)
  292. else
  293. skynet_yield()
  294. end
  295. end
  296. end)
  297. end
  298. local list = { next=false }
  299. local tail = list
  300. local timeout = {}
  301. local function scan()
  302. local now = skynet_now()
  303. local nxt = list.next -- The front node
  304. local prev = list
  305. while nxt do
  306. if nxt.deadline == false then
  307. -- remove from timeout queue
  308. prev.next = nxt.next
  309. elseif nxt.deadline < now then
  310. prev.next = nxt.next
  311. nxt.close()
  312. warn("The client %s login timeout", nxt.fd)
  313. else
  314. prev = nxt
  315. end
  316. nxt = nxt.next
  317. end
  318. tail = prev
  319. end
  320. function timeout.add(session)
  321. tail.next = session
  322. tail = session
  323. end
  324. function timeout.start()
  325. skynet.fork(function()
  326. while true do
  327. scan()
  328. skynet_sleep(100)
  329. end
  330. end)
  331. end
  332. local SOCKET = {}
  333. function SOCKET.open(fd, addr)
  334. local ipaddr = ipaddr_parse(addr)
  335. trace("New client %d from %s", fd, ipaddr)
  336. -- Create session object
  337. local session = new_session(fd, ipaddr)
  338. websocket.forward(fd,protocol,ipaddr)
  339. sessions[fd] = session
  340. timeout.add(session)
  341. skynet_call(gate, "lua", "accept", fd)
  342. end
  343. function SOCKET.close(fd)
  344. close_session(fd)
  345. end
  346. function SOCKET.error(fd, msg)
  347. close_session(fd)
  348. end
  349. function SOCKET.warning(fd, size)
  350. -- size K bytes havn't send out in fd
  351. end
  352. local REQUEST = setmetatable({}, {__newindex = function (t, k, v)
  353. pbSproto.register_msg(k)
  354. rawset(t, k, v)
  355. end})
  356. local function dispatch_request(session, pname, args, response)
  357. assert(session)
  358. local f = REQUEST[pname]
  359. if f then
  360. f(session, args, response)
  361. else
  362. warn("Not found request '%s'", pname)
  363. session.close()
  364. end
  365. end
  366. local function dispatch_response(session)
  367. assert(session)
  368. warn("Does not support response")
  369. session.close()
  370. end
  371. local function dispatch_message(session, type, ...)
  372. -- local f = (type == "REQUEST") and dispatch_request or dispatch_response
  373. local f = dispatch_request
  374. local ok, msg = xpcall(f, traceback, session, ...)
  375. if not ok then
  376. warn(msg)
  377. session.close()
  378. end
  379. end
  380. function SOCKET.data(fd, msg)
  381. local session = sessions[fd]
  382. if session then
  383. -- dispatch_message(session, host:dispatch(msg))
  384. dispatch_message(session, mydispatch(msg))
  385. else
  386. assert(false, "Does not found session %s", fd)
  387. end
  388. end
  389. local CMD = {}
  390. function CMD.start(conf)
  391. info("start")
  392. watchdog = skynet.self()
  393. loginserver = skynet.localname(".loginserver")
  394. skynet.fork(spawn)
  395. timeout.start()
  396. waiter.start()
  397. protocol = conf.protocol or "ws"
  398. skynet_call(gate, "lua", "open", conf)
  399. end
  400. function CMD.close(fd)
  401. close_session(fd)
  402. end
  403. -- Reference: webpage/shutdown
  404. local maintain = LAUNCH.maintain
  405. function CMD.maintain()
  406. maintain = true
  407. --skynet_call(gate, "lua", "close")
  408. skynet_sleep(50)
  409. end
  410. --服务器维护状态查询
  411. function CMD.getstatus()
  412. return maintain
  413. end
  414. function CMD.open()
  415. maintain = false
  416. skynet_sleep(50)
  417. end
  418. -- Reference: webpage/hotfix
  419. function CMD.hotfix()
  420. local invalid = pool
  421. pool = {}
  422. for _, agent in ipairs(invalid) do
  423. skynet_send(agent, "lua", "exit")
  424. end
  425. end
  426. local account_whitelist = {}
  427. function CMD.addnew_account_whitelist(account)
  428. account_whitelist[account] = true
  429. end
  430. local ipaddr_whitelist = LAUNCH.white_list or {}
  431. function CMD.addnew_ipaddr_whitelist(ipaddr)
  432. ipaddr_whitelist[ipaddr] = true
  433. end
  434. local s_public = nil -- 角色创建限制
  435. function REQUEST.login(session, args, response)
  436. assert(args ~= nil)
  437. assert(type(response) == "function")
  438. local fd = assert(session.fd)
  439. if session.agent or session.args then
  440. return
  441. end
  442. args.channel = args.channel or ""
  443. session.args = args
  444. session.deadline = false -- remove from timeout queue
  445. session.response = function(r) -- login response
  446. write(fd, response(r))
  447. end
  448. if maintain then
  449. -- In the maintenance
  450. while true do
  451. local account = args.account or ""
  452. if account_whitelist[account] then
  453. break
  454. end
  455. local ipaddr = session.ipaddr
  456. if ipaddr_whitelist[ipaddr] then
  457. break
  458. end
  459. logger.info("MAINTAIN: channel=%s, account=%s, ipaddr=%s", args.channel, account, ipaddr)
  460. session.response({errno=STD_ERR.PLYAER_MAINTAIN or 9}) -- 正在维护
  461. return
  462. end
  463. end
  464. -- 检查人口数量限制
  465. -- s_public = s_public or skynet.localname(".public")
  466. -- if skynet.call(s_public, "lua","role_limit", "rolelimit_isclose",args.account) then
  467. -- logger.trace(" ### ipaddr_whitelist:%s,ipaddr:%s", ipaddr_whitelist[session.ipaddr] and "true" or "false", session.ipaddr)
  468. -- if not ipaddr_whitelist[session.ipaddr] then
  469. -- session.response({errno=STD_ERR.PLYAER_LIMIT_CREATE}) -- 限制角色创建
  470. -- send(fd, 'exception', { type='role_limit', brief=STD_ERR.PLYAER_LIMIT_CREATE })
  471. -- session.close()
  472. -- return
  473. -- else
  474. -- logger.trace(" 角色创建限制 白名单登陆不限制角色创建")
  475. -- end
  476. -- end
  477. -- Verify login token
  478. local succ,access_info,account = skynet_call(loginserver, "lua", "login", args)
  479. if succ then
  480. if access_info then
  481. if account then
  482. args.account = account
  483. end
  484. -- send(fd, "get_access_token", {msg = access_info})
  485. end
  486. waiter.add(fd)
  487. else
  488. -- Failed to login
  489. session.response({errno=STD_ERR.PLYAER_ERR_SIGN or 2}) -- 身份验证失败
  490. end
  491. end
  492. -- function REQUEST.client_data_errno(session, args, response)
  493. -- args = args or {}
  494. -- logger.trace(" 客户端异常日志(watchdog) %s", args.errinfo)
  495. -- if response then
  496. -- write(session.fd, response({errno = 0}))
  497. -- end
  498. -- end
  499. skynet.info_func(function()
  500. return {
  501. players = total_player_number,
  502. connections = total_session_number,
  503. offline_avg_number = offline_avg_number
  504. }
  505. end)
  506. skynet.init(function()
  507. skynet.register(".ws_watchdog")
  508. end)
  509. skynet.start(function()
  510. logger.label("<ws_watchdog>,")
  511. skynet.dispatch("lua", function(session, source, cmd, subcmd, ...)
  512. if cmd == "socket" then
  513. local f = SOCKET[subcmd]
  514. f(...)
  515. -- socket api don't need return
  516. else
  517. local f = assert(CMD[cmd])
  518. if session == 0 then
  519. f(subcmd, ...)
  520. else
  521. skynet_retpack(f(subcmd, ...))
  522. end
  523. end
  524. end)
  525. gate = skynet.newservice("ws_gate")
  526. end)