-- 消息队列服务 local skynet = require "skynet" require "skynet.manager" -- require "skynet.queue" local redisdriver = require "skynet.db.redis" local logger = require "logger" local cjson = require "cjson" -- local util = require "util" local skynet_call = skynet.call local skynet_send = skynet.send local string_format = string.format local table_unpack = table.unpack local cjson_encode = cjson.encode local cjson_decode = cjson.decode local logger_trace = logger.trace local logger_warn = logger.warn -- local hashcode = util.hashcode local forwarding = function(agent, ...) return pcall(skynet_call, agent, "lua", "publish", ...) end local keygen = function(k) return string_format("mq:%s", k) end local redis local lpop = function(k) return redis:lpop(k) end local lpush = function(k, v) return redis:lpush(k, v) end local rpush = function(k, v) return redis:rpush(k, v) end local dispatch_message = function(agent, uid) local key = keygen(uid) while true do local val = lpop(key) if val == nil then break end if forwarding(agent, table_unpack(cjson_decode(val))) then logger_trace("玩家 %s 处理了离线消息 %s", uid, val) else lpush(key, val) logger_warn("玩家 %s 处理离线消息 %s 失败", uid, val) break end end end local online_user = {} local CMD = {} function CMD.subscribe(agent, uid) dispatch_message(agent, uid) online_user[uid] = agent end function CMD.unsubscribe(agent, uid) if online_user[uid] == agent then online_user[uid] = nil end end function CMD.publish(_, uid, ...) local agent = online_user[uid] if agent then if forwarding(agent, ...) then return end end local key = keygen(uid) local value = cjson_encode({...}) rpush(key, value) logger_trace("玩家 %s 的离线消息 %s 被存盘", uid, value) agent = online_user[uid] if agent then dispatch_message(agent, uid) end end function CMD.start() local conf = assert(option.redis) redis = redisdriver.connect(conf) redis:select(6) logger.info("%s:%s", conf.host, conf.port) end skynet.init(function() cjson.encode_sparse_array(true, 1) skynet.register(".mq") end) skynet.start(function() logger.label(",") skynet.dispatch("lua", function(session, source, cmd, ...) local f = assert(CMD[cmd]) if 0==session then f(source, ...) else skynet.retpack(f(source, ...)) end end) end)