local skynet = require "skynet" require "skynet.manager" local socket = require "skynet.socket" -- local sprotoloader = require "sprotoloader" local logger = require "logger" local stringify = require "stringify" local util = require "util" local cjson = require "cjson" -- local JsSProto = require "JsSProto" local pbSproto = require "pbSproto" local websocket = require "http.websocket" local skynet_call = skynet.call local skynet_send = skynet.send local skynet_now = skynet.now local skynet_newservice = skynet.newservice local skynet_sleep = skynet.sleep local skynet_fork = skynet.fork local skynet_wakeup = skynet.wakeup local skynet_wait = skynet.wait local skynet_yield = skynet.yield local skynet_retpack = skynet.retpack local websocket_write = websocket.write local string_pack = string.pack local traceback = debug.traceback local table_concat = table.concat local table_insert = table.insert local table_remove = table.remove local warn = logger.warn local trace = logger.trace local info = logger.info local cjson_decode = cjson.decode -- 解码 local cjson_encode = cjson.encode -- 编码 -- local mydispatch = JsSProto.decode -- local mypack = JsSProto.encode local mydispatch = pbSproto.decode local mypack = pbSproto.encode local MAX_ONLINE_NUM = 3000 local MAX_CONCURRENT_NUM = 32 -- 最大并发数 -- local host = sprotoloader.load(1):host( "package") -- local pack = host:attach(sprotoloader.load(2)) local gate local watchdog local loginserver local protocol local function ipaddr_parse(addr) -- check for format 1.11.111.111 for IPV4 local chunks = {addr:match("(%d+)%.(%d+)%.(%d+)%.(%d+)")} if #chunks == 4 then return table_concat(chunks, ".") end -- check for IPV6 format, should be 8 'chunks' of numbers/letters -- without trailing chars local chunks = {addr:match(("([a-fA-F0-9]*):"):rep(8):gsub(":$","$"))} if #chunks == 8 then return table_concat(chunks, ":") end return addr end local function write(fd, data) return pcall(websocket_write, fd, data, "binary") -- return websocket_write(fd, data) end local function send(fd, pname, args) pcall(write, fd, mypack(pname, args)) -- write(fd, mypack(pname, args)) end local sessions = {} -- 会话字典, fd -> { agent, args, fd, ipaddr... } local total_session_number = 0 -- 当前连接数 local total_player_number = 0 -- 当前在线玩家数 local total_offline_number = 0 local function close_session(fd) local session = sessions[fd] sessions[fd] = nil if session then session.deadline = false total_session_number = total_session_number - 1 skynet_call(gate, "lua", "kick", fd) trace("关闭套接字 %s", fd) trace("session.agent %s", session.agent) if session.agent then total_player_number = total_player_number - 1 total_offline_number = total_offline_number + 1 -- disconnect never return skynet_send(session.agent, "lua", "disconnect") end end end local function new_session(fd, ipaddr) local self = { agent = false, args = false, next = false, -- deadline = skynet_now()+1000, deadline = skynet_now()+100000, fd = fd, ipaddr = ipaddr, } function self.close() close_session(fd) websocket.clear_pool(fd) end total_session_number = total_session_number + 1 return self end local pool = {} local pool_size = 8 local function spawn() local min = math.min while true do local step = min(pool_size - #pool, 16) for i=1, step do table_insert(pool, skynet_newservice("agent")) end skynet_sleep(50) end end local function new_agent() local len = #pool if len > 0 then local address = pool[len] pool[len] = nil return address else return skynet_newservice("agent") end end local offline_avg_number = 1 -- 每分钟的平均下线人数 local queue = {} -- 等待队列 local waiter = {} local function collect() local max = math.max local floor = math.floor local sampling = { 0,0,0,0,0,0,0,0,0,0 } while true do skynet_sleep(60*100) table_remove(sampling, 1) table_insert(sampling, total_offline_number) total_offline_number = 0 local sum = 0 for _, v in ipairs(sampling) do sum = sum + v end offline_avg_number = max(floor(sum / #sampling), 1) end end local function flush_invalid_session() for pos = #queue, 1, -1 do local fd = queue[pos] if sessions[fd] == nil then table_remove(queue, pos) trace("从登录队列中移除失效套接字: %s", fd) end end end local function session_wakeup(session) assert(not session.agent) local response = assert(session.response) local ctx = { args = assert(session.args), fd = assert(session.fd), ipaddr = assert(session.ipaddr), gate = gate, watchdog = watchdog, protocol = protocol } -- 启动 agent 的初始化流程 local agent = new_agent() -- logger.trace("new_agent:%s", agent) local ok, ret = pcall(skynet_call, agent, "lua", "start", ctx) if ok and sessions[session.fd] then trace("Agent 进入游戏, fd=%s", session.fd) session.args = false session.response = false session.agent = agent total_player_number = total_player_number + 1 response(ret) websocket.clear_pool(session.fd) else warn("Agent 启动失败, fd=%s", session.fd) assert(session.agent == false) session.close() skynet_send(agent, "lua", "disconnect") logger.trace("Agent 启动失败原因 fd:%s,%s", session.fd, ret or "nil") end end local function dispatch_session_wakeup() local ti = skynet_now() local freenum = math.min(MAX_ONLINE_NUM-total_player_number, MAX_CONCURRENT_NUM) if freenum > 0 and #queue > 0 then local ref = 0 local current_thread = coroutine.running() local launch = function(session) session_wakeup(session) ref = ref - 1 if ref == 0 then skynet_wakeup(current_thread) end end while freenum > 0 and #queue > 0 do local fd = table_remove(queue, 1) local session = sessions[fd] if session then freenum = freenum - 1 ref = ref + 1 skynet_fork(launch, session) end end if ref > 0 then skynet_wait(current_thread) assert(ref == 0) end end return skynet_now()-ti end local function timecost(pos) assert(pos > 0) local freenum = MAX_ONLINE_NUM - total_player_number local ti = pos / MAX_CONCURRENT_NUM + 1 if freenum < pos then ti = ti + ((pos-freenum) / offline_avg_number) * 60 return math.ceil(ti), true -- seconds else return math.ceil(ti) end end local data = {} local function sync_progress(fd, pos) assert(pos > 0) local ti, queued = timecost(pos) if ti > 5 or queued then data.number = pos data.waiting = ti -- send(fd, "queue", data) trace("套接字(%s): 排在第 %s 的位置,预计 %s 秒之后进入游戏", fd, pos, ti) end end local function broadcast() local min = math.min local max = math.max local limit_size = 128 local invalid_session = { agent=true } local paging = {} while true do local ti = skynet_now() local freenum = MAX_ONLINE_NUM - total_player_number if freenum < #queue then -- 清除无效 session 对象 flush_invalid_session() for pos, fd in ipairs(queue) do if pos > limit_size then -- 添加到分页缓存 table_insert(paging, fd) else sync_progress(fd, pos) end end end -- 向客户端推送排队进度 local pos = limit_size while #paging > 0 do --给其它协程一些工作机会 skynet_sleep(10) -- 向部分客户端推送进度 local range = min(#paging, limit_size) for i=1, range do local fd = table_remove(paging, 1) local session = sessions[fd] or invalid_session if not session.agent then pos = pos + 1 sync_progress(fd, pos) end end end -- 执行每5秒一个同步周期(未过载时) ti = max(500-(skynet_now()-ti), 100) skynet_sleep(ti) end end function waiter.add(fd) table_insert(queue, fd) sync_progress(fd, #queue) end function waiter.start() skynet.fork(broadcast) skynet.fork(collect) -- 定期让一些排队玩家进入游戏(有空位的话) skynet.fork(function() while true do local ti = 100 - dispatch_session_wakeup() if ti > 0 then skynet_sleep(ti) else skynet_yield() end end end) end local list = { next=false } local tail = list local timeout = {} local function scan() local now = skynet_now() local nxt = list.next -- The front node local prev = list while nxt do if nxt.deadline == false then -- remove from timeout queue prev.next = nxt.next elseif nxt.deadline < now then prev.next = nxt.next nxt.close() warn("The client %s login timeout", nxt.fd) else prev = nxt end nxt = nxt.next end tail = prev end function timeout.add(session) tail.next = session tail = session end function timeout.start() skynet.fork(function() while true do scan() skynet_sleep(100) end end) end local SOCKET = {} function SOCKET.open(fd, addr) local ipaddr = ipaddr_parse(addr) trace("New client %d from %s", fd, ipaddr) -- Create session object local session = new_session(fd, ipaddr) websocket.forward(fd,protocol,ipaddr) sessions[fd] = session timeout.add(session) skynet_call(gate, "lua", "accept", fd) end function SOCKET.close(fd) close_session(fd) end function SOCKET.error(fd, msg) close_session(fd) end function SOCKET.warning(fd, size) -- size K bytes havn't send out in fd end local REQUEST = setmetatable({}, {__newindex = function (t, k, v) pbSproto.register_msg(k) rawset(t, k, v) end}) local function dispatch_request(session, pname, args, response) assert(session) local f = REQUEST[pname] if f then f(session, args, response) else warn("Not found request '%s'", pname) session.close() end end local function dispatch_response(session) assert(session) warn("Does not support response") session.close() end local function dispatch_message(session, type, ...) -- local f = (type == "REQUEST") and dispatch_request or dispatch_response local f = dispatch_request local ok, msg = xpcall(f, traceback, session, ...) if not ok then warn(msg) session.close() end end function SOCKET.data(fd, msg) local session = sessions[fd] if session then -- dispatch_message(session, host:dispatch(msg)) dispatch_message(session, mydispatch(msg)) else assert(false, "Does not found session %s", fd) end end local CMD = {} function CMD.start(conf) info("start") watchdog = skynet.self() loginserver = skynet.localname(".loginserver") skynet.fork(spawn) timeout.start() waiter.start() protocol = conf.protocol or "ws" skynet_call(gate, "lua", "open", conf) end function CMD.close(fd) close_session(fd) end -- Reference: webpage/shutdown local maintain = LAUNCH.maintain function CMD.maintain() maintain = true --skynet_call(gate, "lua", "close") skynet_sleep(50) end --服务器维护状态查询 function CMD.getstatus() return maintain end function CMD.open() maintain = false skynet_sleep(50) end -- Reference: webpage/hotfix function CMD.hotfix() local invalid = pool pool = {} for _, agent in ipairs(invalid) do skynet_send(agent, "lua", "exit") end end local account_whitelist = {} function CMD.addnew_account_whitelist(account) account_whitelist[account] = true end local ipaddr_whitelist = LAUNCH.white_list or {} function CMD.addnew_ipaddr_whitelist(ipaddr) ipaddr_whitelist[ipaddr] = true end local s_public = nil -- 角色创建限制 function REQUEST.login(session, args, response) assert(args ~= nil) assert(type(response) == "function") local fd = assert(session.fd) if session.agent or session.args then return end args.channel = args.channel or "" session.args = args session.deadline = false -- remove from timeout queue session.response = function(r) -- login response write(fd, response(r)) end if maintain then -- In the maintenance while true do local account = args.account or "" if account_whitelist[account] then break end local ipaddr = session.ipaddr if ipaddr_whitelist[ipaddr] then break end logger.info("MAINTAIN: channel=%s, account=%s, ipaddr=%s", args.channel, account, ipaddr) session.response({errno=STD_ERR.PLYAER_MAINTAIN or 9}) -- 正在维护 return end end -- 检查人口数量限制 -- s_public = s_public or skynet.localname(".public") -- if skynet.call(s_public, "lua","role_limit", "rolelimit_isclose",args.account) then -- logger.trace(" ### ipaddr_whitelist:%s,ipaddr:%s", ipaddr_whitelist[session.ipaddr] and "true" or "false", session.ipaddr) -- if not ipaddr_whitelist[session.ipaddr] then -- session.response({errno=STD_ERR.PLYAER_LIMIT_CREATE}) -- 限制角色创建 -- send(fd, 'exception', { type='role_limit', brief=STD_ERR.PLYAER_LIMIT_CREATE }) -- session.close() -- return -- else -- logger.trace(" 角色创建限制 白名单登陆不限制角色创建") -- end -- end -- Verify login token local succ,access_info,account = skynet_call(loginserver, "lua", "login", args) if succ then if access_info then if account then args.account = account end -- send(fd, "get_access_token", {msg = access_info}) end waiter.add(fd) else -- Failed to login session.response({errno=STD_ERR.PLYAER_ERR_SIGN or 2}) -- 身份验证失败 end end -- function REQUEST.client_data_errno(session, args, response) -- args = args or {} -- logger.trace(" 客户端异常日志(watchdog) %s", args.errinfo) -- if response then -- write(session.fd, response({errno = 0})) -- end -- end skynet.info_func(function() return { players = total_player_number, connections = total_session_number, offline_avg_number = offline_avg_number } end) skynet.init(function() skynet.register(".ws_watchdog") end) skynet.start(function() logger.label(",") skynet.dispatch("lua", function(session, source, cmd, subcmd, ...) if cmd == "socket" then local f = SOCKET[subcmd] f(...) -- socket api don't need return else local f = assert(CMD[cmd]) if session == 0 then f(subcmd, ...) else skynet_retpack(f(subcmd, ...)) end end end) gate = skynet.newservice("ws_gate") end)