|
- local skynet = require "skynet"
- require "skynet.manager"
- local socket = require "skynet.socket"
- -- local sprotoloader = require "sprotoloader"
- local logger = require "logger"
- local stringify = require "stringify"
- local util = require "util"
- local cjson = require "cjson"
- -- local JsSProto = require "JsSProto"
- local pbSproto = require "pbSproto"
- local websocket = require "http.websocket"
- local skynet_call = skynet.call
- local skynet_send = skynet.send
- local skynet_now = skynet.now
- local skynet_newservice = skynet.newservice
- local skynet_sleep = skynet.sleep
- local skynet_fork = skynet.fork
- local skynet_wakeup = skynet.wakeup
- local skynet_wait = skynet.wait
- local skynet_yield = skynet.yield
- local skynet_retpack = skynet.retpack
- local websocket_write = websocket.write
- local string_pack = string.pack
- local traceback = debug.traceback
- local table_concat = table.concat
- local table_insert = table.insert
- local table_remove = table.remove
- local warn = logger.warn
- local trace = logger.trace
- local info = logger.info
- local cjson_decode = cjson.decode -- 解码
- local cjson_encode = cjson.encode -- 编码
- -- local mydispatch = JsSProto.decode
- -- local mypack = JsSProto.encode
- local mydispatch = pbSproto.decode
- local mypack = pbSproto.encode
- local MAX_ONLINE_NUM = 3000
- local MAX_CONCURRENT_NUM = 32 -- 最大并发数
- -- local host = sprotoloader.load(1):host( "package")
- -- local pack = host:attach(sprotoloader.load(2))
- local gate
- local watchdog
- local loginserver
- local protocol
- local function ipaddr_parse(addr)
- -- check for format 1.11.111.111 for IPV4
- local chunks = {addr:match("(%d+)%.(%d+)%.(%d+)%.(%d+)")}
- if #chunks == 4 then
- return table_concat(chunks, ".")
- end
- -- check for IPV6 format, should be 8 'chunks' of numbers/letters
- -- without trailing chars
- local chunks = {addr:match(("([a-fA-F0-9]*):"):rep(8):gsub(":$","$"))}
- if #chunks == 8 then
- return table_concat(chunks, ":")
- end
- return addr
- end
- local function write(fd, data)
- return pcall(websocket_write, fd, data, "binary")
- -- return websocket_write(fd, data)
- end
- local function send(fd, pname, args)
- pcall(write, fd, mypack(pname, args))
- -- write(fd, mypack(pname, args))
- end
- local sessions = {} -- 会话字典, fd -> { agent, args, fd, ipaddr... }
- local total_session_number = 0 -- 当前连接数
- local total_player_number = 0 -- 当前在线玩家数
- local total_offline_number = 0
- local function close_session(fd)
- local session = sessions[fd]
- sessions[fd] = nil
- if session then
- session.deadline = false
- total_session_number = total_session_number - 1
- skynet_call(gate, "lua", "kick", fd)
- trace("关闭套接字 %s", fd)
- trace("session.agent %s", session.agent)
- if session.agent then
- total_player_number = total_player_number - 1
- total_offline_number = total_offline_number + 1
- -- disconnect never return
- skynet_send(session.agent, "lua", "disconnect")
- end
- end
- end
- local function new_session(fd, ipaddr)
- local self = {
- agent = false,
- args = false,
- next = false,
- -- deadline = skynet_now()+1000,
- deadline = skynet_now()+100000,
- fd = fd,
- ipaddr = ipaddr,
- }
- function self.close()
- close_session(fd)
- websocket.clear_pool(fd)
- end
- total_session_number = total_session_number + 1
- return self
- end
- local pool = {}
- local pool_size = 8
- local function spawn()
- local min = math.min
- while true do
- local step = min(pool_size - #pool, 16)
- for i=1, step do
- table_insert(pool, skynet_newservice("agent"))
- end
- skynet_sleep(50)
- end
- end
- local function new_agent()
- local len = #pool
- if len > 0 then
- local address = pool[len]
- pool[len] = nil
- return address
- else
- return skynet_newservice("agent")
- end
- end
- local offline_avg_number = 1 -- 每分钟的平均下线人数
- local queue = {} -- 等待队列
- local waiter = {}
- local function collect()
- local max = math.max
- local floor = math.floor
- local sampling = { 0,0,0,0,0,0,0,0,0,0 }
- while true do
- skynet_sleep(60*100)
- table_remove(sampling, 1)
- table_insert(sampling, total_offline_number)
- total_offline_number = 0
- local sum = 0
- for _, v in ipairs(sampling) do
- sum = sum + v
- end
- offline_avg_number = max(floor(sum / #sampling), 1)
- end
- end
- local function flush_invalid_session()
- for pos = #queue, 1, -1 do
- local fd = queue[pos]
- if sessions[fd] == nil then
- table_remove(queue, pos)
- trace("从登录队列中移除失效套接字: %s", fd)
- end
- end
- end
- local function session_wakeup(session)
- assert(not session.agent)
- local response = assert(session.response)
- local ctx = {
- args = assert(session.args),
- fd = assert(session.fd),
- ipaddr = assert(session.ipaddr),
- gate = gate,
- watchdog = watchdog,
- protocol = protocol
- }
- -- 启动 agent 的初始化流程
- local agent = new_agent()
- -- logger.trace("new_agent:%s", agent)
- local ok, ret = pcall(skynet_call, agent, "lua", "start", ctx)
- if ok and sessions[session.fd] then
- trace("Agent 进入游戏, fd=%s", session.fd)
- session.args = false
- session.response = false
- session.agent = agent
- total_player_number = total_player_number + 1
- response(ret)
- websocket.clear_pool(session.fd)
- else
- warn("Agent 启动失败, fd=%s", session.fd)
- assert(session.agent == false)
- session.close()
- skynet_send(agent, "lua", "disconnect")
- logger.trace("Agent 启动失败原因 fd:%s,%s", session.fd, ret or "nil")
- end
- end
- local function dispatch_session_wakeup()
- local ti = skynet_now()
- local freenum = math.min(MAX_ONLINE_NUM-total_player_number, MAX_CONCURRENT_NUM)
- if freenum > 0 and #queue > 0 then
- local ref = 0
- local current_thread = coroutine.running()
- local launch = function(session)
- session_wakeup(session)
- ref = ref - 1
- if ref == 0 then
- skynet_wakeup(current_thread)
- end
- end
- while freenum > 0 and #queue > 0 do
- local fd = table_remove(queue, 1)
- local session = sessions[fd]
- if session then
- freenum = freenum - 1
- ref = ref + 1
- skynet_fork(launch, session)
- end
- end
- if ref > 0 then
- skynet_wait(current_thread)
- assert(ref == 0)
- end
- end
- return skynet_now()-ti
- end
- local function timecost(pos)
- assert(pos > 0)
- local freenum = MAX_ONLINE_NUM - total_player_number
- local ti = pos / MAX_CONCURRENT_NUM + 1
- if freenum < pos then
- ti = ti + ((pos-freenum) / offline_avg_number) * 60
- return math.ceil(ti), true -- seconds
- else
- return math.ceil(ti)
- end
- end
- local data = {}
- local function sync_progress(fd, pos)
- assert(pos > 0)
- local ti, queued = timecost(pos)
- if ti > 5 or queued then
- data.number = pos
- data.waiting = ti
- -- send(fd, "queue", data)
- trace("套接字(%s): 排在第 %s 的位置,预计 %s 秒之后进入游戏", fd, pos, ti)
- end
- end
- local function broadcast()
- local min = math.min
- local max = math.max
- local limit_size = 128
- local invalid_session = { agent=true }
- local paging = {}
- while true do
- local ti = skynet_now()
- local freenum = MAX_ONLINE_NUM - total_player_number
- if freenum < #queue then
- -- 清除无效 session 对象
- flush_invalid_session()
- for pos, fd in ipairs(queue) do
- if pos > limit_size then
- -- 添加到分页缓存
- table_insert(paging, fd)
- else
- sync_progress(fd, pos)
- end
- end
- end
- -- 向客户端推送排队进度
- local pos = limit_size
- while #paging > 0 do
- --给其它协程一些工作机会
- skynet_sleep(10)
- -- 向部分客户端推送进度
- local range = min(#paging, limit_size)
- for i=1, range do
- local fd = table_remove(paging, 1)
- local session = sessions[fd] or invalid_session
- if not session.agent then
- pos = pos + 1
- sync_progress(fd, pos)
- end
- end
- end
- -- 执行每5秒一个同步周期(未过载时)
- ti = max(500-(skynet_now()-ti), 100)
- skynet_sleep(ti)
- end
- end
- function waiter.add(fd)
- table_insert(queue, fd)
- sync_progress(fd, #queue)
- end
- function waiter.start()
- skynet.fork(broadcast)
- skynet.fork(collect)
- -- 定期让一些排队玩家进入游戏(有空位的话)
- skynet.fork(function()
- while true do
- local ti = 100 - dispatch_session_wakeup()
- if ti > 0 then
- skynet_sleep(ti)
- else
- skynet_yield()
- end
- end
- end)
- end
- local list = { next=false }
- local tail = list
- local timeout = {}
- local function scan()
- local now = skynet_now()
- local nxt = list.next -- The front node
- local prev = list
- while nxt do
- if nxt.deadline == false then
- -- remove from timeout queue
- prev.next = nxt.next
- elseif nxt.deadline < now then
- prev.next = nxt.next
- nxt.close()
- warn("The client %s login timeout", nxt.fd)
- else
- prev = nxt
- end
- nxt = nxt.next
- end
- tail = prev
- end
- function timeout.add(session)
- tail.next = session
- tail = session
- end
- function timeout.start()
- skynet.fork(function()
- while true do
- scan()
- skynet_sleep(100)
- end
- end)
- end
- local SOCKET = {}
- function SOCKET.open(fd, addr)
- local ipaddr = ipaddr_parse(addr)
- trace("New client %d from %s", fd, ipaddr)
- -- Create session object
- local session = new_session(fd, ipaddr)
- websocket.forward(fd,protocol,ipaddr)
- sessions[fd] = session
- timeout.add(session)
- skynet_call(gate, "lua", "accept", fd)
- end
- function SOCKET.close(fd)
- close_session(fd)
- end
- function SOCKET.error(fd, msg)
- close_session(fd)
- end
- function SOCKET.warning(fd, size)
- -- size K bytes havn't send out in fd
- end
- local REQUEST = setmetatable({}, {__newindex = function (t, k, v)
- pbSproto.register_msg(k)
- rawset(t, k, v)
- end})
- local function dispatch_request(session, pname, args, response)
- assert(session)
- local f = REQUEST[pname]
- if f then
- f(session, args, response)
- else
- warn("Not found request '%s'", pname)
- session.close()
- end
- end
- local function dispatch_response(session)
- assert(session)
- warn("Does not support response")
- session.close()
- end
- local function dispatch_message(session, type, ...)
- -- local f = (type == "REQUEST") and dispatch_request or dispatch_response
- local f = dispatch_request
- local ok, msg = xpcall(f, traceback, session, ...)
- if not ok then
- warn(msg)
- session.close()
- end
- end
- function SOCKET.data(fd, msg)
- local session = sessions[fd]
- if session then
- -- dispatch_message(session, host:dispatch(msg))
- dispatch_message(session, mydispatch(msg))
- else
- assert(false, "Does not found session %s", fd)
- end
- end
- local CMD = {}
- function CMD.start(conf)
- info("start")
- watchdog = skynet.self()
- loginserver = skynet.localname(".loginserver")
- skynet.fork(spawn)
- timeout.start()
- waiter.start()
- protocol = conf.protocol or "ws"
- skynet_call(gate, "lua", "open", conf)
- end
- function CMD.close(fd)
- close_session(fd)
- end
- -- Reference: webpage/shutdown
- local maintain = LAUNCH.maintain
- function CMD.maintain()
- maintain = true
- --skynet_call(gate, "lua", "close")
- skynet_sleep(50)
- end
- --服务器维护状态查询
- function CMD.getstatus()
- return maintain
- end
- function CMD.open()
- maintain = false
- skynet_sleep(50)
- end
- -- Reference: webpage/hotfix
- function CMD.hotfix()
- local invalid = pool
- pool = {}
- for _, agent in ipairs(invalid) do
- skynet_send(agent, "lua", "exit")
- end
- end
- local account_whitelist = {}
- function CMD.addnew_account_whitelist(account)
- account_whitelist[account] = true
- end
- local ipaddr_whitelist = LAUNCH.white_list or {}
- function CMD.addnew_ipaddr_whitelist(ipaddr)
- ipaddr_whitelist[ipaddr] = true
- end
- local s_public = nil -- 角色创建限制
- function REQUEST.login(session, args, response)
- assert(args ~= nil)
- assert(type(response) == "function")
- local fd = assert(session.fd)
- if session.agent or session.args then
- return
- end
- args.channel = args.channel or ""
- session.args = args
- session.deadline = false -- remove from timeout queue
- session.response = function(r) -- login response
- write(fd, response(r))
- end
- if maintain then
- -- In the maintenance
- while true do
- local account = args.account or ""
- if account_whitelist[account] then
- break
- end
- local ipaddr = session.ipaddr
- if ipaddr_whitelist[ipaddr] then
- break
- end
- logger.info("MAINTAIN: channel=%s, account=%s, ipaddr=%s", args.channel, account, ipaddr)
- session.response({errno=STD_ERR.PLYAER_MAINTAIN or 9}) -- 正在维护
- return
- end
- end
- -- 检查人口数量限制
- -- s_public = s_public or skynet.localname(".public")
- -- if skynet.call(s_public, "lua","role_limit", "rolelimit_isclose",args.account) then
- -- logger.trace(" ### ipaddr_whitelist:%s,ipaddr:%s", ipaddr_whitelist[session.ipaddr] and "true" or "false", session.ipaddr)
- -- if not ipaddr_whitelist[session.ipaddr] then
- -- session.response({errno=STD_ERR.PLYAER_LIMIT_CREATE}) -- 限制角色创建
- -- send(fd, 'exception', { type='role_limit', brief=STD_ERR.PLYAER_LIMIT_CREATE })
- -- session.close()
- -- return
- -- else
- -- logger.trace(" 角色创建限制 白名单登陆不限制角色创建")
- -- end
- -- end
- -- Verify login token
- local succ,access_info,account = skynet_call(loginserver, "lua", "login", args)
- if succ then
- if access_info then
- if account then
- args.account = account
- end
- -- send(fd, "get_access_token", {msg = access_info})
- end
- waiter.add(fd)
- else
- -- Failed to login
- session.response({errno=STD_ERR.PLYAER_ERR_SIGN or 2}) -- 身份验证失败
- end
- end
- -- function REQUEST.client_data_errno(session, args, response)
- -- args = args or {}
- -- logger.trace(" 客户端异常日志(watchdog) %s", args.errinfo)
- -- if response then
- -- write(session.fd, response({errno = 0}))
- -- end
- -- end
- skynet.info_func(function()
- return {
- players = total_player_number,
- connections = total_session_number,
- offline_avg_number = offline_avg_number
- }
- end)
- skynet.init(function()
- skynet.register(".ws_watchdog")
- end)
- skynet.start(function()
- logger.label("<ws_watchdog>,")
- skynet.dispatch("lua", function(session, source, cmd, subcmd, ...)
- if cmd == "socket" then
- local f = SOCKET[subcmd]
- f(...)
- -- socket api don't need return
- else
- local f = assert(CMD[cmd])
- if session == 0 then
- f(subcmd, ...)
- else
- skynet_retpack(f(subcmd, ...))
- end
- end
- end)
- gate = skynet.newservice("ws_gate")
- end)
|