local skynet = require "skynet" local queue = require "skynet.queue" local redisdriver = require "skynet.db.redis" local logger = require "logger" local cjson = require "cjson" local keygen = require "model.keygen" local pipeline = require "pipeline" local util = require "util" require "skynet.manager" local stringify = require "stringify" local traceback = debug.traceback local trace = logger.trace local skynet_retpack = skynet.retpack local skynet_send = skynet.send local string_format = string.format local table_insert = table.insert local cjson_decode = cjson.decode -- 解码 local cjson_encode = cjson.encode -- 编码 local os_time = os.time local hashcode = util.hashcode local start_sign = false -- 服务器启动标记 local synchronized = queue() local retention_tm = 30*24*3600 -- 30天 local MAX_MAIL = 1000 -- 最大邮件数量 local redis_thread = 4 -- redis 链接的数量 local redis -- BUGPK3-20 全局变量修复 cjson.encode_sparse_array(true, 1) local MAIL_CONTENT = "mailbox:content:%s" -- 邮件内容 mail_id local MAIL_PRIVATE = "mailbox:private:%s" -- 玩家邮件列表 uid local MAIL_RUBBISH = "mailbox:rubbish:%s:%s"-- 邮件垃圾篓 uid/时间 local online_user = {} local global_mail = { --[[ [id] = { date = , -- 邮件时间 content = , -- 邮件内容 portion = 1, -- 部分人拥有的邮件标记 }, ]] } local user_mail_num = { --[[ [uid] = 0, -- 邮件数量 ]] } local mail_pool_one = { sign = false, name = 'one', mail = {} } local mail_pool_two = { sign = false, name = 'two', mail = {} } -- redis的链接池 local redis_list = { -- [1] = {batch = pipeline(124), redis = nil} } local CMD = {} local mail_pool ={} local redis_pool = {} ---------------------------------- local --------------------------------- local function get_mail_num(uid) local key = string_format(MAIL_CONTENT, uid) local redis = redis_pool.get_redis(redis_pool.hashcode(uid)) local num = redis:zcard(key) user_mail_num[uid] = num return num end -- 从内存中取出玩家邮件的数量 local function get_memory_mail_num(uid) local num = user_mail_num[uid] if num then return num else return get_mail_num(uid) end end -- 修改内存中玩家邮件的数量 local function set_memory_mail_num(uid, num) if not user_mail_num[uid] then get_mail_num(uid) end user_mail_num[uid] = user_mail_num[uid] + num end -- 服务器启动时,加载邮件 local function preload_global_mail() local rets = redis:lrange("mailbox:global", 0, -1) local now_tm = os_time() local num = 0 for _, content in ipairs(rets) do local obj = cjson_decode(content) -- 时间检查, 检查邮件是否过期 if retention_tm and (retention_tm + obj.date >= now_tm) then global_mail[obj.id] = { date=obj.date, content=content, id = obj.id } else num = num + 1 end end -- 维护数据库 local batch = pipeline(124) for i = 1, num do batch.add("lpop", "mailbox:global") end batch.execute(redis) end --[[ 双缓存的邮件池, mail_pool_one = { sign = true, mail = {}, } mail_pool_two = { sign = true, mail = {}, } ]] ---------------------------------------- 邮件池 --------------------------------- -- 邮件垃圾篓 local function mail_rubbish(uid, content) local ok, info = xpcall(function() local hash = redis_pool.hashcode(uid) redis_pool.push(hash, "setex", string_format(MAIL_RUBBISH, uid, os.date("%Y-%m-%d:%H-%M-%S",os.time())), retention_tm,content) end, traceback) if not ok then logger.trace(" 邮件垃圾报错 %s", info) end end -- 取出可用的缓存池 local function get_usable_pool(type) return synchronized(function ( ... ) if not mail_pool_one.sign then mail_pool_one.sign = true return mail_pool_one elseif not mail_pool_two.sign then mail_pool_two.sign = true return mail_pool_two else -- 缓存池 死循环 -- 处理缓存死循环 logger.trace("----- 在取可用邮件池,两个池都被占用了, type = ", type) end end) end -- 邮件池 function mail_pool:add_mail_pool(uids, content) -- 多个id,相同的内容, content 此时为被js编码 local mail_pool = get_usable_pool('w') local id if mail_pool then for _, uid in pairs(uids) do id = "M" .. keygen() content.id = id table_insert(mail_pool.mail, {uid = uid, id = id, content = cjson_encode(content)}) end else skynet.sleep(100) return mail_pool:add_mail_pool(uids, content) end -- 释放池子 mail_pool.sign = false end -- 轮流返回两个邮件的池 local have_num = 0 local function get_have_mail_pool() have_num = have_num + 1 -- math.maxinteger return synchronized(function ( ... ) local ret = math.fmod(have_num, 2) if ret == 0 then if not mail_pool_one.sign then mail_pool_one.sign = true return mail_pool_one, 1 end else if not mail_pool_two.sign then mail_pool_two.sign = true return mail_pool_two, 2 end end end) end function mail_pool:send_mail_pool() skynet.fork(function() local mail_pool = {} local batch = pipeline(1024) local count = 0 local z_count = 0 local hash = 0 while true do skynet.sleep(100) mail_pool = get_have_mail_pool('r') local sign = false z_count = 0 if mail_pool then -- 开始发送邮件 local kaishi = 0 local keys = {} local key -- 读取非在线玩家的邮件数量到缓存中 for _, v in pairs(mail_pool.mail) do if not user_mail_num[v.uid] then key = string_format(MAIL_CONTENT, v.uid) table.insert(keys, v.uid) batch.add("zcard", key) end end if key then local resp = batch.execute(redis, {}) for k, uid in pairs(keys) do user_mail_num[uid] = resp[k].out end end -- 将邮件写入到玩家数据库中 for _, v in pairs(mail_pool.mail) do sign = true z_count = z_count + 1 if get_memory_mail_num(v.uid) < MAX_MAIL then set_memory_mail_num(v.uid, 1) hash = redis_pool.hashcode(v.uid) -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, v.id), retention_tm,v.content) redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, v.id), v.content) redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, v.uid), os_time(), v.id) redis_pool.execute(hash) -- 检查玩家是否已经上线, 在线,通知玩家 if online_user[v.uid] then pcall(skynet_send, online_user[v.uid].agent, "lua", "newmail", v.content, v.id) end count = count + 1 if count >= 200 then skynet.sleep(100) count = 0 end else mail_rubbish(v.uid, v.content) end end -- 邮件发送结束 count = 0 mail_pool.mail = {} if sign then redis_pool.time(z_count) sign = false end redis_pool.execute() end -- 释放池子 mail_pool.sign = false end end) end -- 取得对于的 redis function redis_pool.get_redis(hash) return redis_list[hash].redis end function redis_pool.get_batch(hash) return redis_list[hash].batch end function redis_pool.push(hash, ...) redis_list[hash].batch.add(...) end function redis_pool.execute(hash) skynet.fork(function() coroutine.running() local kaishi = util.gettime() if hash then local v = redis_list[hash] v.batch.execute(v.redis) v.time = (v.time or 0) + (util.gettime() - kaishi) v.num = 0 else for _, v in pairs(redis_list) do if v.num > 0 then local kaishi = util.gettime() v.batch.execute(v.redis) v.time = (v.time or 0) + (util.gettime() - kaishi) v.num = 0 end end -- redis_pool.time(count) end end) end function redis_pool.time(count) local tm = 0 for _, v in pairs(redis_list) do tm = tm + (v.time or 0) v.time = 0 end logger.trace("本次发送邮件: %s, 花费的时间 %s", count, tm) end function redis_pool.hashcode(uid) local hash = hashcode(uid) % redis_thread if hash == 0 then hash = hash + 1 end redis_list[hash].num = redis_list[hash].num + 1 return hash end -------------------------------------------------------------------------------- function CMD.start() logger.info("start") local conf = assert(option.redis) -- 邮件数据库 redis = redisdriver.connect(conf) redis:select(13) local redis_1 local batch for i = 1, redis_thread do redis_1 = redisdriver.connect(conf) redis_1:select(13) batch = pipeline(124) table_insert(redis_list, {redis = redis_1, batch = batch, num = 0}) end preload_global_mail() logger.info("%s:%s", conf.host, conf.port) start_sign = true end -- 群发送离线邮件给玩家,如果在线,就通知玩家, -- 提示: 必须 send 该模块 --[[ table uids: 接收邮件玩家的id number subject: 邮件的主题 string body: 邮件的内容 table attachment: 邮件的附件,格式按照物品添加格式处理 table source: 邮件的来源{ module, -- 发送模块 brief, -- 说明 context, -- 上下文 } ]] function CMD.off_line_mail(uids, subject, body, attachment, source) if type(uids) ~= 'table' then uids = {[1] = uids} end local id local now = os_time() local content = {} local js_content -- 区分玩家是否在线 local off_ids = {} local count = 0 content = { date = now, subject = subject, body = body, attachment = attachment, source = source, } for _, uid in pairs(uids) do if online_user[uid] then if get_memory_mail_num(uid) < MAX_MAIL then set_memory_mail_num(uid, 1) id = "M" .. keygen() content.id = id js_content = cjson_encode(content) local hash = redis_pool.hashcode(uid) -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, id), retention_tm, js_content) redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), js_content) redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), id) redis_pool.execute(hash) pcall(skynet_send, online_user[uid].agent, "lua", "newmail", js_content, id) count = count + 1 if count >= 100 then count = 0 skynet.sleep(10) end else mail_rubbish(uid, cjson_encode(content)) end else table_insert(off_ids, uid) end end mail_pool:add_mail_pool(off_ids, content) end -- 用于集群邮件发放 function CMD.master_mail(...) local ok, info = pcall(CMD.off_line_mail, ...) if not ok then logger.trace(" ### 集群邮件发放失败 %s", info) end end -- 广播邮件(全局邮件) function CMD.broadcast(subject, body, attachment, source) --assert(type(content) == "string") local id = "M" .. keygen() local now = os_time() local content = cjson_encode({ id = id, date = now, subject = subject, body = body, attachment = attachment, source = source, }) redis:rpush("mailbox:global",content) global_mail[id] = { date=now, content=content, id = id} local content = cjson_decode(content) for uid, user in pairs(online_user) do if get_memory_mail_num(uid) < MAX_MAIL then set_memory_mail_num(uid, 1) id = "M" .. keygen() content.id = id local ok = pcall(skynet_send, user.agent, "lua", "newmail", cjson_encode(content), id) assert(ok) local hash = redis_pool.hashcode(uid) redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), id) -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, id), retention_tm, content) redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), cjson_encode(content)) else mail_rubbish(uid, cjson_encode(content)) end end redis_pool.execute() end -- 保存一封邮件, 邮件的内容已经是被 js 处理过的 function CMD.save_mail(id, content, uid, flag) if flag or get_memory_mail_num(uid) < MAX_MAIL then set_memory_mail_num(uid, 1) -- 添加邮件, 到私人邮件公用信箱 local hash = redis_pool.hashcode(uid) -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, id), retention_tm, content) redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), content) -- 维护邮件的id redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), id) redis_pool.execute(hash) else mail_rubbish(uid, content) end end -- 取出玩家邮件 local function get_mail(uid, ids) local temp = {} local temp_ids = {} for _, id in pairs(ids) do table_insert(temp_ids, string_format(MAIL_PRIVATE, id)) end local redis = redis_pool.get_redis(redis_pool.hashcode(uid)) if #temp_ids > 0 then temp = redis:mget(table.unpack(temp_ids)) or {} end return 0, temp end -- 取出玩家邮件 function CMD.get_all_mail(tm, uid) local key = string_format(MAIL_CONTENT, uid) local ids = {} -- ids = redis:lrange(key, 0, -1) local redis = redis_pool.get_redis(redis_pool.hashcode(uid)) local del_ids = redis:ZRANGEBYSCORE (key, 0, os.time() - retention_tm) if next(del_ids or {}) then CMD.del_mail(uid, del_ids) end ids = redis:zrevrange(key, 0, -1) return get_mail(uid, ids) end -- 玩家上线的时候,检测全局邮件 local function check_mail(uid, tm) local mail_num = get_memory_mail_num(uid) local temp_num = 0 temp_num = MAX_MAIL - mail_num local hash = redis_pool.hashcode(uid) local m_u = string_format(MAIL_CONTENT, uid) for _, v in pairs(global_mail) do if v.date > tm then if temp_num <= 0 then break end -- redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), v.id) -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, v.id), retention_tm, v.content) -- 解码 local content = cjson_decode(v.content) local id = "M" .. keygen() -- 更换id content.id = id redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), cjson_encode(content)) -- 维护邮件的id redis_pool.push(hash, "zadd", m_u, tm, id) set_memory_mail_num(uid, 1) temp_num = temp_num - 1 end end redis_pool.execute(hash) return end -- 玩家上线 function CMD.online(agent, uid, tm) online_user[uid] = { agent = agent, } local key = string_format(MAIL_CONTENT, uid) local del_ids = redis:ZRANGEBYSCORE (key, 0, os.time() - retention_tm) if next(del_ids or {}) then CMD.del_mail(uid, del_ids) end -- 收取玩家的全局邮件 check_mail(uid, tm) end -- 玩家下线 function CMD.offline(agent, uid) -- 删除服务器邮件数据 if online_user[uid].agent == agent then online_user[uid] = nil user_mail_num[uid] = nil end end function CMD.mail_num(uid) return get_memory_mail_num(uid) end function CMD.surplus_mail_num(uid) return MAX_MAIL - get_memory_mail_num(uid) end -- 删除邮件 function CMD.del_mail(uid, list) local mail_s = string_format(MAIL_CONTENT, uid) local mail_s_s local del_num = 0 local hash = redis_pool.hashcode(uid) for _, v in pairs(list) do mail_s_s = string_format(MAIL_PRIVATE, v) redis_pool.push(hash, "zrem", mail_s, v) redis_pool.push(hash, "del", mail_s_s) del_num = del_num - 1 end set_memory_mail_num(uid, del_num) redis_pool.execute(hash) return 0 end -- 提取邮件附件 function CMD.get_goods(uid, ids) local err, list = get_mail(uid, ids) return err, list end local function update_global_mail() local now_tm = os_time() local temp = {} local batch = pipeline(124) for _, v in pairs(global_mail) do -- 时间检查, 检查邮件是否过期 if retention_tm and (retention_tm + v.date < now_tm) then batch.add("lpop", "mailbox:global") else temp[v.id] = v end end batch.execute(redis) global_mail = temp end skynet.init(function() skynet.register(".mailbox") cjson.encode_sparse_array(true, 1) mail_pool:send_mail_pool() skynet.fork(function() while true do skynet.sleep(300) if start_sign then update_global_mail() end end end) end) skynet.start(function() logger.label("") skynet.dispatch("lua", function(session, _, cmd, ...) local f = assert(CMD[cmd]) if session == 0 then f(...) else skynet_retpack(f(...)) end end) end)