123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- -- 消息队列服务
- local skynet = require "skynet"
- require "skynet.manager"
- -- require "skynet.queue"
- local redisdriver = require "skynet.db.redis"
- local logger = require "logger"
- local cjson = require "cjson"
- -- local util = require "util"
- local skynet_call = skynet.call
- local skynet_send = skynet.send
- local string_format = string.format
- local table_unpack = table.unpack
- local cjson_encode = cjson.encode
- local cjson_decode = cjson.decode
- local logger_trace = logger.trace
- local logger_warn = logger.warn
- -- local hashcode = util.hashcode
- local forwarding = function(agent, ...)
- return pcall(skynet_call, agent, "lua", "publish", ...)
- end
- local keygen = function(k)
- return string_format("mq:%s", k)
- end
- local redis
- local lpop = function(k) return redis:lpop(k) end
- local lpush = function(k, v) return redis:lpush(k, v) end
- local rpush = function(k, v) return redis:rpush(k, v) end
- local dispatch_message = function(agent, uid)
- local key = keygen(uid)
- while true do
- local val = lpop(key)
- if val == nil then
- break
- end
- if forwarding(agent, table_unpack(cjson_decode(val))) then
- logger_trace("玩家 %s 处理了离线消息 %s", uid, val)
- else
- lpush(key, val)
- logger_warn("玩家 %s 处理离线消息 %s 失败", uid, val)
- break
- end
- end
- end
- local online_user = {}
- local CMD = {}
- function CMD.subscribe(agent, uid)
- dispatch_message(agent, uid)
- online_user[uid] = agent
- end
- function CMD.unsubscribe(agent, uid)
- if online_user[uid] == agent then
- online_user[uid] = nil
- end
- end
- function CMD.publish(_, uid, ...)
- local agent = online_user[uid]
- if agent then
- if forwarding(agent, ...) then
- return
- end
- end
- local key = keygen(uid)
- local value = cjson_encode({...})
- rpush(key, value)
- logger_trace("玩家 %s 的离线消息 %s 被存盘", uid, value)
- agent = online_user[uid]
- if agent then
- dispatch_message(agent, uid)
- end
- end
- function CMD.start()
- local conf = assert(option.redis)
- redis = redisdriver.connect(conf)
- redis:select(6)
- logger.info("%s:%s", conf.host, conf.port)
- end
- skynet.init(function()
- cjson.encode_sparse_array(true, 1)
- skynet.register(".mq")
- end)
- skynet.start(function()
- logger.label("<MQ>,")
- skynet.dispatch("lua", function(session, source, cmd, ...)
- local f = assert(CMD[cmd])
- if 0==session then
- f(source, ...)
- else
- skynet.retpack(f(source, ...))
- end
- end)
- end)
|