123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626 |
- local skynet = require "skynet"
- local queue = require "skynet.queue"
- local redisdriver = require "skynet.db.redis"
- local logger = require "logger"
- local cjson = require "cjson"
- local keygen = require "model.keygen"
- local pipeline = require "pipeline"
- local util = require "util"
- require "skynet.manager"
- local stringify = require "stringify"
- local traceback = debug.traceback
- local trace = logger.trace
- local skynet_retpack = skynet.retpack
- local skynet_send = skynet.send
- local string_format = string.format
- local table_insert = table.insert
- local cjson_decode = cjson.decode -- 解码
- local cjson_encode = cjson.encode -- 编码
- local os_time = os.time
- local hashcode = util.hashcode
- local start_sign = false -- 服务器启动标记
- local synchronized = queue()
- local retention_tm = 30*24*3600 -- 30天
- local MAX_MAIL = 1000 -- 最大邮件数量
- local redis_thread = 4 -- redis 链接的数量
- local redis -- BUGPK3-20 全局变量修复
- cjson.encode_sparse_array(true, 1)
- local MAIL_CONTENT = "mailbox:content:%s" -- 邮件内容 mail_id
- local MAIL_PRIVATE = "mailbox:private:%s" -- 玩家邮件列表 uid
- local MAIL_RUBBISH = "mailbox:rubbish:%s:%s"-- 邮件垃圾篓 uid/时间
- local online_user = {}
- local global_mail = {
- --[[
- [id] = {
- date = , -- 邮件时间
- content = , -- 邮件内容
- portion = 1, -- 部分人拥有的邮件标记
- },
- ]]
- }
- local user_mail_num = {
- --[[
- [uid] = 0, -- 邮件数量
- ]]
- }
- local mail_pool_one = {
- sign = false,
- name = 'one',
- mail = {}
- }
- local mail_pool_two = {
- sign = false,
- name = 'two',
- mail = {}
- }
- -- redis的链接池
- local redis_list = {
- -- [1] = {batch = pipeline(124), redis = nil}
- }
- local CMD = {}
- local mail_pool ={}
- local redis_pool = {}
- ---------------------------------- local ---------------------------------
- local function get_mail_num(uid)
- local key = string_format(MAIL_CONTENT, uid)
- local redis = redis_pool.get_redis(redis_pool.hashcode(uid))
- local num = redis:zcard(key)
- user_mail_num[uid] = num
- return num
- end
- -- 从内存中取出玩家邮件的数量
- local function get_memory_mail_num(uid)
- local num = user_mail_num[uid]
- if num then
- return num
- else
- return get_mail_num(uid)
- end
- end
- -- 修改内存中玩家邮件的数量
- local function set_memory_mail_num(uid, num)
- if not user_mail_num[uid] then
- get_mail_num(uid)
- end
- user_mail_num[uid] = user_mail_num[uid] + num
- end
- -- 服务器启动时,加载邮件
- local function preload_global_mail()
- local rets = redis:lrange("mailbox:global", 0, -1)
- local now_tm = os_time()
- local num = 0
- for _, content in ipairs(rets) do
- local obj = cjson_decode(content)
- -- 时间检查, 检查邮件是否过期
- if retention_tm and (retention_tm + obj.date >= now_tm) then
- global_mail[obj.id] = { date=obj.date, content=content, id = obj.id }
- else
- num = num + 1
- end
- end
- -- 维护数据库
- local batch = pipeline(124)
- for i = 1, num do
- batch.add("lpop", "mailbox:global")
- end
- batch.execute(redis)
- end
- --[[
- 双缓存的邮件池,
- mail_pool_one = {
- sign = true,
- mail = {},
- }
- mail_pool_two = {
- sign = true,
- mail = {},
- }
- ]]
- ---------------------------------------- 邮件池 ---------------------------------
- -- 邮件垃圾篓
- local function mail_rubbish(uid, content)
- local ok, info = xpcall(function()
- local hash = redis_pool.hashcode(uid)
- redis_pool.push(hash, "setex", string_format(MAIL_RUBBISH, uid, os.date("%Y-%m-%d:%H-%M-%S",os.time())), retention_tm,content)
- end, traceback)
- if not ok then logger.trace(" 邮件垃圾报错 %s", info) end
- end
- -- 取出可用的缓存池
- local function get_usable_pool(type)
- return synchronized(function ( ... )
- if not mail_pool_one.sign then
- mail_pool_one.sign = true
- return mail_pool_one
- elseif not mail_pool_two.sign then
- mail_pool_two.sign = true
- return mail_pool_two
- else -- 缓存池 死循环
- -- 处理缓存死循环
- logger.trace("----- 在取可用邮件池,两个池都被占用了, type = ", type)
- end
- end)
- end
- -- 邮件池
- function mail_pool:add_mail_pool(uids, content) -- 多个id,相同的内容, content 此时为被js编码
- local mail_pool = get_usable_pool('w')
- local id
- if mail_pool then
- for _, uid in pairs(uids) do
- id = "M" .. keygen()
- content.id = id
- table_insert(mail_pool.mail, {uid = uid, id = id, content = cjson_encode(content)})
- end
- else
- skynet.sleep(100)
- return mail_pool:add_mail_pool(uids, content)
- end
- -- 释放池子
- mail_pool.sign = false
- end
- -- 轮流返回两个邮件的池
- local have_num = 0
- local function get_have_mail_pool()
- have_num = have_num + 1
- -- math.maxinteger
- return synchronized(function ( ... )
- local ret = math.fmod(have_num, 2)
- if ret == 0 then
- if not mail_pool_one.sign then
- mail_pool_one.sign = true
- return mail_pool_one, 1
- end
- else
- if not mail_pool_two.sign then
- mail_pool_two.sign = true
- return mail_pool_two, 2
- end
- end
- end)
- end
- function mail_pool:send_mail_pool()
- skynet.fork(function()
- local mail_pool = {}
- local batch = pipeline(1024)
- local count = 0
- local z_count = 0
- local hash = 0
- while true do
- skynet.sleep(100)
- mail_pool = get_have_mail_pool('r')
- local sign = false
- z_count = 0
- if mail_pool then
- -- 开始发送邮件
- local kaishi = 0
- local keys = {}
- local key
- -- 读取非在线玩家的邮件数量到缓存中
- for _, v in pairs(mail_pool.mail) do
- if not user_mail_num[v.uid] then
- key = string_format(MAIL_CONTENT, v.uid)
- table.insert(keys, v.uid)
- batch.add("zcard", key)
- end
- end
- if key then
- local resp = batch.execute(redis, {})
- for k, uid in pairs(keys) do
- user_mail_num[uid] = resp[k].out
- end
- end
- -- 将邮件写入到玩家数据库中
- for _, v in pairs(mail_pool.mail) do
- sign = true
- z_count = z_count + 1
- if get_memory_mail_num(v.uid) < MAX_MAIL then
- set_memory_mail_num(v.uid, 1)
- hash = redis_pool.hashcode(v.uid)
- -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, v.id), retention_tm,v.content)
- redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, v.id), v.content)
- redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, v.uid), os_time(), v.id)
- redis_pool.execute(hash)
- -- 检查玩家是否已经上线, 在线,通知玩家
- if online_user[v.uid] then
- pcall(skynet_send, online_user[v.uid].agent, "lua", "newmail", v.content, v.id)
- end
- count = count + 1
- if count >= 200 then
- skynet.sleep(100)
- count = 0
- end
- else
- mail_rubbish(v.uid, v.content)
- end
- end
- -- 邮件发送结束
- count = 0
- mail_pool.mail = {}
- if sign then
- redis_pool.time(z_count)
- sign = false
- end
- redis_pool.execute()
- end
- -- 释放池子
- mail_pool.sign = false
- end
- end)
- end
- -- 取得对于的 redis
- function redis_pool.get_redis(hash)
- return redis_list[hash].redis
- end
- function redis_pool.get_batch(hash)
- return redis_list[hash].batch
- end
- function redis_pool.push(hash, ...)
- redis_list[hash].batch.add(...)
- end
- function redis_pool.execute(hash)
- skynet.fork(function()
- coroutine.running()
- local kaishi = util.gettime()
- if hash then
- local v = redis_list[hash]
- v.batch.execute(v.redis)
- v.time = (v.time or 0) + (util.gettime() - kaishi)
- v.num = 0
- else
- for _, v in pairs(redis_list) do
- if v.num > 0 then
- local kaishi = util.gettime()
- v.batch.execute(v.redis)
- v.time = (v.time or 0) + (util.gettime() - kaishi)
- v.num = 0
- end
- end
- -- redis_pool.time(count)
- end
- end)
- end
- function redis_pool.time(count)
- local tm = 0
- for _, v in pairs(redis_list) do
- tm = tm + (v.time or 0)
- v.time = 0
- end
- logger.trace("本次发送邮件: %s, 花费的时间 %s", count, tm)
- end
- function redis_pool.hashcode(uid)
- local hash = hashcode(uid) % redis_thread
- if hash == 0 then
- hash = hash + 1
- end
- redis_list[hash].num = redis_list[hash].num + 1
- return hash
- end
- --------------------------------------------------------------------------------
- function CMD.start()
- logger.info("start")
- local conf = assert(option.redis)
- -- 邮件数据库
- redis = redisdriver.connect(conf)
- redis:select(13)
- local redis_1
- local batch
- for i = 1, redis_thread do
- redis_1 = redisdriver.connect(conf)
- redis_1:select(13)
- batch = pipeline(124)
- table_insert(redis_list, {redis = redis_1, batch = batch, num = 0})
- end
- preload_global_mail()
- logger.info("%s:%s", conf.host, conf.port)
- start_sign = true
- end
- -- 群发送离线邮件给玩家,如果在线,就通知玩家,
- -- 提示: 必须 send 该模块
- --[[
- table uids: 接收邮件玩家的id
- number subject: 邮件的主题
- string body: 邮件的内容
- table attachment: 邮件的附件,格式按照物品添加格式处理
- table source: 邮件的来源{
- module, -- 发送模块
- brief, -- 说明
- context, -- 上下文
- }
- ]]
- function CMD.off_line_mail(uids, subject, body, attachment, source)
- if type(uids) ~= 'table' then
- uids = {[1] = uids}
- end
- local id
- local now = os_time()
- local content = {}
- local js_content
- -- 区分玩家是否在线
- local off_ids = {}
- local count = 0
- content = {
- date = now,
- subject = subject,
- body = body,
- attachment = attachment,
- source = source,
- }
- for _, uid in pairs(uids) do
- if online_user[uid] then
- if get_memory_mail_num(uid) < MAX_MAIL then
- set_memory_mail_num(uid, 1)
- id = "M" .. keygen()
- content.id = id
- js_content = cjson_encode(content)
- local hash = redis_pool.hashcode(uid)
- -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, id), retention_tm, js_content)
- redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), js_content)
- redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), id)
- redis_pool.execute(hash)
- pcall(skynet_send, online_user[uid].agent, "lua", "newmail", js_content, id)
- count = count + 1
- if count >= 100 then
- count = 0
- skynet.sleep(10)
- end
- else
- mail_rubbish(uid, cjson_encode(content))
- end
- else
- table_insert(off_ids, uid)
- end
- end
- mail_pool:add_mail_pool(off_ids, content)
- end
- -- 用于集群邮件发放
- function CMD.master_mail(...)
- local ok, info = pcall(CMD.off_line_mail, ...)
- if not ok then
- logger.trace(" ### 集群邮件发放失败 %s", info)
- end
- end
- -- 广播邮件(全局邮件)
- function CMD.broadcast(subject, body, attachment, source)
- --assert(type(content) == "string")
- local id = "M" .. keygen()
- local now = os_time()
- local content = cjson_encode({
- id = id,
- date = now,
- subject = subject,
- body = body,
- attachment = attachment,
- source = source,
- })
- redis:rpush("mailbox:global",content)
- global_mail[id] = { date=now, content=content, id = id}
- local content = cjson_decode(content)
- for uid, user in pairs(online_user) do
- if get_memory_mail_num(uid) < MAX_MAIL then
- set_memory_mail_num(uid, 1)
- id = "M" .. keygen()
- content.id = id
- local ok = pcall(skynet_send, user.agent, "lua", "newmail", cjson_encode(content), id)
- assert(ok)
- local hash = redis_pool.hashcode(uid)
- redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), id)
- -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, id), retention_tm, content)
- redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), cjson_encode(content))
- else
- mail_rubbish(uid, cjson_encode(content))
- end
- end
- redis_pool.execute()
- end
- -- 保存一封邮件, 邮件的内容已经是被 js 处理过的
- function CMD.save_mail(id, content, uid, flag)
- if flag or get_memory_mail_num(uid) < MAX_MAIL then
- set_memory_mail_num(uid, 1)
- -- 添加邮件, 到私人邮件公用信箱
- local hash = redis_pool.hashcode(uid)
- -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, id), retention_tm, content)
- redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), content)
- -- 维护邮件的id
- redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), id)
- redis_pool.execute(hash)
- else
- mail_rubbish(uid, content)
- end
- end
- -- 取出玩家邮件
- local function get_mail(uid, ids)
- local temp = {}
- local temp_ids = {}
- for _, id in pairs(ids) do
- table_insert(temp_ids, string_format(MAIL_PRIVATE, id))
- end
- local redis = redis_pool.get_redis(redis_pool.hashcode(uid))
- if #temp_ids > 0 then
- temp = redis:mget(table.unpack(temp_ids)) or {}
- end
- return 0, temp
- end
- -- 取出玩家邮件
- function CMD.get_all_mail(tm, uid)
- local key = string_format(MAIL_CONTENT, uid)
- local ids = {}
- -- ids = redis:lrange(key, 0, -1)
- local redis = redis_pool.get_redis(redis_pool.hashcode(uid))
- local del_ids = redis:ZRANGEBYSCORE (key, 0, os.time() - retention_tm)
- if next(del_ids or {}) then
- CMD.del_mail(uid, del_ids)
- end
- ids = redis:zrevrange(key, 0, -1)
- return get_mail(uid, ids)
- end
- -- 玩家上线的时候,检测全局邮件
- local function check_mail(uid, tm)
- local mail_num = get_memory_mail_num(uid)
- local temp_num = 0
- temp_num = MAX_MAIL - mail_num
- local hash = redis_pool.hashcode(uid)
- local m_u = string_format(MAIL_CONTENT, uid)
- for _, v in pairs(global_mail) do
- if v.date > tm then
- if temp_num <= 0 then
- break
- end
- -- redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), v.id)
- -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, v.id), retention_tm, v.content)
- -- 解码
- local content = cjson_decode(v.content)
- local id = "M" .. keygen()
- -- 更换id
- content.id = id
- redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), cjson_encode(content))
- -- 维护邮件的id
- redis_pool.push(hash, "zadd", m_u, tm, id)
- set_memory_mail_num(uid, 1)
- temp_num = temp_num - 1
- end
- end
- redis_pool.execute(hash)
- return
- end
- -- 玩家上线
- function CMD.online(agent, uid, tm)
- online_user[uid] = {
- agent = agent,
- }
- local key = string_format(MAIL_CONTENT, uid)
- local del_ids = redis:ZRANGEBYSCORE (key, 0, os.time() - retention_tm)
- if next(del_ids or {}) then
- CMD.del_mail(uid, del_ids)
- end
- -- 收取玩家的全局邮件
- check_mail(uid, tm)
- end
- -- 玩家下线
- function CMD.offline(agent, uid)
- -- 删除服务器邮件数据
- if online_user[uid].agent == agent then
- online_user[uid] = nil
- user_mail_num[uid] = nil
- end
- end
- function CMD.mail_num(uid)
- return get_memory_mail_num(uid)
- end
- function CMD.surplus_mail_num(uid)
- return MAX_MAIL - get_memory_mail_num(uid)
- end
- -- 删除邮件
- function CMD.del_mail(uid, list)
- local mail_s = string_format(MAIL_CONTENT, uid)
- local mail_s_s
- local del_num = 0
- local hash = redis_pool.hashcode(uid)
- for _, v in pairs(list) do
- mail_s_s = string_format(MAIL_PRIVATE, v)
- redis_pool.push(hash, "zrem", mail_s, v)
- redis_pool.push(hash, "del", mail_s_s)
- del_num = del_num - 1
- end
- set_memory_mail_num(uid, del_num)
- redis_pool.execute(hash)
- return 0
- end
- -- 提取邮件附件
- function CMD.get_goods(uid, ids)
- local err, list = get_mail(uid, ids)
- return err, list
- end
- local function update_global_mail()
- local now_tm = os_time()
- local temp = {}
- local batch = pipeline(124)
- for _, v in pairs(global_mail) do
- -- 时间检查, 检查邮件是否过期
- if retention_tm and (retention_tm + v.date < now_tm) then
- batch.add("lpop", "mailbox:global")
- else
- temp[v.id] = v
- end
- end
- batch.execute(redis)
- global_mail = temp
- end
- skynet.init(function()
- skynet.register(".mailbox")
- cjson.encode_sparse_array(true, 1)
- mail_pool:send_mail_pool()
- skynet.fork(function()
- while true do
- skynet.sleep(300)
- if start_sign then
- update_global_mail()
- end
- end
- end)
- end)
- skynet.start(function()
- logger.label("<Mailbox>")
- skynet.dispatch("lua", function(session, _, cmd, ...)
- local f = assert(CMD[cmd])
- if session == 0 then
- f(...)
- else
- skynet_retpack(f(...))
- end
- end)
- end)
|