mailbox.lua 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. local skynet = require "skynet"
  2. local queue = require "skynet.queue"
  3. local redisdriver = require "skynet.db.redis"
  4. local logger = require "logger"
  5. local cjson = require "cjson"
  6. local keygen = require "model.keygen"
  7. local pipeline = require "pipeline"
  8. local util = require "util"
  9. require "skynet.manager"
  10. local stringify = require "stringify"
  11. local traceback = debug.traceback
  12. local trace = logger.trace
  13. local skynet_retpack = skynet.retpack
  14. local skynet_send = skynet.send
  15. local string_format = string.format
  16. local table_insert = table.insert
  17. local cjson_decode = cjson.decode -- 解码
  18. local cjson_encode = cjson.encode -- 编码
  19. local os_time = os.time
  20. local hashcode = util.hashcode
  21. local start_sign = false -- 服务器启动标记
  22. local synchronized = queue()
  23. local retention_tm = 30*24*3600 -- 30天
  24. local MAX_MAIL = 1000 -- 最大邮件数量
  25. local redis_thread = 4 -- redis 链接的数量
  26. local redis -- BUGPK3-20 全局变量修复
  27. cjson.encode_sparse_array(true, 1)
  28. local MAIL_CONTENT = "mailbox:content:%s" -- 邮件内容 mail_id
  29. local MAIL_PRIVATE = "mailbox:private:%s" -- 玩家邮件列表 uid
  30. local MAIL_RUBBISH = "mailbox:rubbish:%s:%s"-- 邮件垃圾篓 uid/时间
  31. local online_user = {}
  32. local global_mail = {
  33. --[[
  34. [id] = {
  35. date = , -- 邮件时间
  36. content = , -- 邮件内容
  37. portion = 1, -- 部分人拥有的邮件标记
  38. },
  39. ]]
  40. }
  41. local user_mail_num = {
  42. --[[
  43. [uid] = 0, -- 邮件数量
  44. ]]
  45. }
  46. local mail_pool_one = {
  47. sign = false,
  48. name = 'one',
  49. mail = {}
  50. }
  51. local mail_pool_two = {
  52. sign = false,
  53. name = 'two',
  54. mail = {}
  55. }
  56. -- redis的链接池
  57. local redis_list = {
  58. -- [1] = {batch = pipeline(124), redis = nil}
  59. }
  60. local CMD = {}
  61. local mail_pool ={}
  62. local redis_pool = {}
  63. ---------------------------------- local ---------------------------------
  64. local function get_mail_num(uid)
  65. local key = string_format(MAIL_CONTENT, uid)
  66. local redis = redis_pool.get_redis(redis_pool.hashcode(uid))
  67. local num = redis:zcard(key)
  68. user_mail_num[uid] = num
  69. return num
  70. end
  71. -- 从内存中取出玩家邮件的数量
  72. local function get_memory_mail_num(uid)
  73. local num = user_mail_num[uid]
  74. if num then
  75. return num
  76. else
  77. return get_mail_num(uid)
  78. end
  79. end
  80. -- 修改内存中玩家邮件的数量
  81. local function set_memory_mail_num(uid, num)
  82. if not user_mail_num[uid] then
  83. get_mail_num(uid)
  84. end
  85. user_mail_num[uid] = user_mail_num[uid] + num
  86. end
  87. -- 服务器启动时,加载邮件
  88. local function preload_global_mail()
  89. local rets = redis:lrange("mailbox:global", 0, -1)
  90. local now_tm = os_time()
  91. local num = 0
  92. for _, content in ipairs(rets) do
  93. local obj = cjson_decode(content)
  94. -- 时间检查, 检查邮件是否过期
  95. if retention_tm and (retention_tm + obj.date >= now_tm) then
  96. global_mail[obj.id] = { date=obj.date, content=content, id = obj.id }
  97. else
  98. num = num + 1
  99. end
  100. end
  101. -- 维护数据库
  102. local batch = pipeline(124)
  103. for i = 1, num do
  104. batch.add("lpop", "mailbox:global")
  105. end
  106. batch.execute(redis)
  107. end
  108. --[[
  109. 双缓存的邮件池,
  110. mail_pool_one = {
  111. sign = true,
  112. mail = {},
  113. }
  114. mail_pool_two = {
  115. sign = true,
  116. mail = {},
  117. }
  118. ]]
  119. ---------------------------------------- 邮件池 ---------------------------------
  120. -- 邮件垃圾篓
  121. local function mail_rubbish(uid, content)
  122. local ok, info = xpcall(function()
  123. local hash = redis_pool.hashcode(uid)
  124. redis_pool.push(hash, "setex", string_format(MAIL_RUBBISH, uid, os.date("%Y-%m-%d:%H-%M-%S",os.time())), retention_tm,content)
  125. end, traceback)
  126. if not ok then logger.trace(" 邮件垃圾报错 %s", info) end
  127. end
  128. -- 取出可用的缓存池
  129. local function get_usable_pool(type)
  130. return synchronized(function ( ... )
  131. if not mail_pool_one.sign then
  132. mail_pool_one.sign = true
  133. return mail_pool_one
  134. elseif not mail_pool_two.sign then
  135. mail_pool_two.sign = true
  136. return mail_pool_two
  137. else -- 缓存池 死循环
  138. -- 处理缓存死循环
  139. logger.trace("----- 在取可用邮件池,两个池都被占用了, type = ", type)
  140. end
  141. end)
  142. end
  143. -- 邮件池
  144. function mail_pool:add_mail_pool(uids, content) -- 多个id,相同的内容, content 此时为被js编码
  145. local mail_pool = get_usable_pool('w')
  146. local id
  147. if mail_pool then
  148. for _, uid in pairs(uids) do
  149. id = "M" .. keygen()
  150. content.id = id
  151. table_insert(mail_pool.mail, {uid = uid, id = id, content = cjson_encode(content)})
  152. end
  153. else
  154. skynet.sleep(100)
  155. return mail_pool:add_mail_pool(uids, content)
  156. end
  157. -- 释放池子
  158. mail_pool.sign = false
  159. end
  160. -- 轮流返回两个邮件的池
  161. local have_num = 0
  162. local function get_have_mail_pool()
  163. have_num = have_num + 1
  164. -- math.maxinteger
  165. return synchronized(function ( ... )
  166. local ret = math.fmod(have_num, 2)
  167. if ret == 0 then
  168. if not mail_pool_one.sign then
  169. mail_pool_one.sign = true
  170. return mail_pool_one, 1
  171. end
  172. else
  173. if not mail_pool_two.sign then
  174. mail_pool_two.sign = true
  175. return mail_pool_two, 2
  176. end
  177. end
  178. end)
  179. end
  180. function mail_pool:send_mail_pool()
  181. skynet.fork(function()
  182. local mail_pool = {}
  183. local batch = pipeline(1024)
  184. local count = 0
  185. local z_count = 0
  186. local hash = 0
  187. while true do
  188. skynet.sleep(100)
  189. mail_pool = get_have_mail_pool('r')
  190. local sign = false
  191. z_count = 0
  192. if mail_pool then
  193. -- 开始发送邮件
  194. local kaishi = 0
  195. local keys = {}
  196. local key
  197. -- 读取非在线玩家的邮件数量到缓存中
  198. for _, v in pairs(mail_pool.mail) do
  199. if not user_mail_num[v.uid] then
  200. key = string_format(MAIL_CONTENT, v.uid)
  201. table.insert(keys, v.uid)
  202. batch.add("zcard", key)
  203. end
  204. end
  205. if key then
  206. local resp = batch.execute(redis, {})
  207. for k, uid in pairs(keys) do
  208. user_mail_num[uid] = resp[k].out
  209. end
  210. end
  211. -- 将邮件写入到玩家数据库中
  212. for _, v in pairs(mail_pool.mail) do
  213. sign = true
  214. z_count = z_count + 1
  215. if get_memory_mail_num(v.uid) < MAX_MAIL then
  216. set_memory_mail_num(v.uid, 1)
  217. hash = redis_pool.hashcode(v.uid)
  218. -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, v.id), retention_tm,v.content)
  219. redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, v.id), v.content)
  220. redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, v.uid), os_time(), v.id)
  221. redis_pool.execute(hash)
  222. -- 检查玩家是否已经上线, 在线,通知玩家
  223. if online_user[v.uid] then
  224. pcall(skynet_send, online_user[v.uid].agent, "lua", "newmail", v.content, v.id)
  225. end
  226. count = count + 1
  227. if count >= 200 then
  228. skynet.sleep(100)
  229. count = 0
  230. end
  231. else
  232. mail_rubbish(v.uid, v.content)
  233. end
  234. end
  235. -- 邮件发送结束
  236. count = 0
  237. mail_pool.mail = {}
  238. if sign then
  239. redis_pool.time(z_count)
  240. sign = false
  241. end
  242. redis_pool.execute()
  243. end
  244. -- 释放池子
  245. mail_pool.sign = false
  246. end
  247. end)
  248. end
  249. -- 取得对于的 redis
  250. function redis_pool.get_redis(hash)
  251. return redis_list[hash].redis
  252. end
  253. function redis_pool.get_batch(hash)
  254. return redis_list[hash].batch
  255. end
  256. function redis_pool.push(hash, ...)
  257. redis_list[hash].batch.add(...)
  258. end
  259. function redis_pool.execute(hash)
  260. skynet.fork(function()
  261. coroutine.running()
  262. local kaishi = util.gettime()
  263. if hash then
  264. local v = redis_list[hash]
  265. v.batch.execute(v.redis)
  266. v.time = (v.time or 0) + (util.gettime() - kaishi)
  267. v.num = 0
  268. else
  269. for _, v in pairs(redis_list) do
  270. if v.num > 0 then
  271. local kaishi = util.gettime()
  272. v.batch.execute(v.redis)
  273. v.time = (v.time or 0) + (util.gettime() - kaishi)
  274. v.num = 0
  275. end
  276. end
  277. -- redis_pool.time(count)
  278. end
  279. end)
  280. end
  281. function redis_pool.time(count)
  282. local tm = 0
  283. for _, v in pairs(redis_list) do
  284. tm = tm + (v.time or 0)
  285. v.time = 0
  286. end
  287. logger.trace("本次发送邮件: %s, 花费的时间 %s", count, tm)
  288. end
  289. function redis_pool.hashcode(uid)
  290. local hash = hashcode(uid) % redis_thread
  291. if hash == 0 then
  292. hash = hash + 1
  293. end
  294. redis_list[hash].num = redis_list[hash].num + 1
  295. return hash
  296. end
  297. --------------------------------------------------------------------------------
  298. function CMD.start()
  299. logger.info("start")
  300. local conf = assert(option.redis)
  301. -- 邮件数据库
  302. redis = redisdriver.connect(conf)
  303. redis:select(13)
  304. local redis_1
  305. local batch
  306. for i = 1, redis_thread do
  307. redis_1 = redisdriver.connect(conf)
  308. redis_1:select(13)
  309. batch = pipeline(124)
  310. table_insert(redis_list, {redis = redis_1, batch = batch, num = 0})
  311. end
  312. preload_global_mail()
  313. logger.info("%s:%s", conf.host, conf.port)
  314. start_sign = true
  315. end
  316. -- 群发送离线邮件给玩家,如果在线,就通知玩家,
  317. -- 提示: 必须 send 该模块
  318. --[[
  319. table uids: 接收邮件玩家的id
  320. number subject: 邮件的主题
  321. string body: 邮件的内容
  322. table attachment: 邮件的附件,格式按照物品添加格式处理
  323. table source: 邮件的来源{
  324. module, -- 发送模块
  325. brief, -- 说明
  326. context, -- 上下文
  327. }
  328. ]]
  329. function CMD.off_line_mail(uids, subject, body, attachment, source)
  330. if type(uids) ~= 'table' then
  331. uids = {[1] = uids}
  332. end
  333. local id
  334. local now = os_time()
  335. local content = {}
  336. local js_content
  337. -- 区分玩家是否在线
  338. local off_ids = {}
  339. local count = 0
  340. content = {
  341. date = now,
  342. subject = subject,
  343. body = body,
  344. attachment = attachment,
  345. source = source,
  346. }
  347. for _, uid in pairs(uids) do
  348. if online_user[uid] then
  349. if get_memory_mail_num(uid) < MAX_MAIL then
  350. set_memory_mail_num(uid, 1)
  351. id = "M" .. keygen()
  352. content.id = id
  353. js_content = cjson_encode(content)
  354. local hash = redis_pool.hashcode(uid)
  355. -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, id), retention_tm, js_content)
  356. redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), js_content)
  357. redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), id)
  358. redis_pool.execute(hash)
  359. pcall(skynet_send, online_user[uid].agent, "lua", "newmail", js_content, id)
  360. count = count + 1
  361. if count >= 100 then
  362. count = 0
  363. skynet.sleep(10)
  364. end
  365. else
  366. mail_rubbish(uid, cjson_encode(content))
  367. end
  368. else
  369. table_insert(off_ids, uid)
  370. end
  371. end
  372. mail_pool:add_mail_pool(off_ids, content)
  373. end
  374. -- 用于集群邮件发放
  375. function CMD.master_mail(...)
  376. local ok, info = pcall(CMD.off_line_mail, ...)
  377. if not ok then
  378. logger.trace(" ### 集群邮件发放失败 %s", info)
  379. end
  380. end
  381. -- 广播邮件(全局邮件)
  382. function CMD.broadcast(subject, body, attachment, source)
  383. --assert(type(content) == "string")
  384. local id = "M" .. keygen()
  385. local now = os_time()
  386. local content = cjson_encode({
  387. id = id,
  388. date = now,
  389. subject = subject,
  390. body = body,
  391. attachment = attachment,
  392. source = source,
  393. })
  394. redis:rpush("mailbox:global",content)
  395. global_mail[id] = { date=now, content=content, id = id}
  396. local content = cjson_decode(content)
  397. for uid, user in pairs(online_user) do
  398. if get_memory_mail_num(uid) < MAX_MAIL then
  399. set_memory_mail_num(uid, 1)
  400. id = "M" .. keygen()
  401. content.id = id
  402. local ok = pcall(skynet_send, user.agent, "lua", "newmail", cjson_encode(content), id)
  403. assert(ok)
  404. local hash = redis_pool.hashcode(uid)
  405. redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), id)
  406. -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, id), retention_tm, content)
  407. redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), cjson_encode(content))
  408. else
  409. mail_rubbish(uid, cjson_encode(content))
  410. end
  411. end
  412. redis_pool.execute()
  413. end
  414. -- 保存一封邮件, 邮件的内容已经是被 js 处理过的
  415. function CMD.save_mail(id, content, uid, flag)
  416. if flag or get_memory_mail_num(uid) < MAX_MAIL then
  417. set_memory_mail_num(uid, 1)
  418. -- 添加邮件, 到私人邮件公用信箱
  419. local hash = redis_pool.hashcode(uid)
  420. -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, id), retention_tm, content)
  421. redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), content)
  422. -- 维护邮件的id
  423. redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), id)
  424. redis_pool.execute(hash)
  425. else
  426. mail_rubbish(uid, content)
  427. end
  428. end
  429. -- 取出玩家邮件
  430. local function get_mail(uid, ids)
  431. local temp = {}
  432. local temp_ids = {}
  433. for _, id in pairs(ids) do
  434. table_insert(temp_ids, string_format(MAIL_PRIVATE, id))
  435. end
  436. local redis = redis_pool.get_redis(redis_pool.hashcode(uid))
  437. if #temp_ids > 0 then
  438. temp = redis:mget(table.unpack(temp_ids)) or {}
  439. end
  440. return 0, temp
  441. end
  442. -- 取出玩家邮件
  443. function CMD.get_all_mail(tm, uid)
  444. local key = string_format(MAIL_CONTENT, uid)
  445. local ids = {}
  446. -- ids = redis:lrange(key, 0, -1)
  447. local redis = redis_pool.get_redis(redis_pool.hashcode(uid))
  448. local del_ids = redis:ZRANGEBYSCORE (key, 0, os.time() - retention_tm)
  449. if next(del_ids or {}) then
  450. CMD.del_mail(uid, del_ids)
  451. end
  452. ids = redis:zrevrange(key, 0, -1)
  453. return get_mail(uid, ids)
  454. end
  455. -- 玩家上线的时候,检测全局邮件
  456. local function check_mail(uid, tm)
  457. local mail_num = get_memory_mail_num(uid)
  458. local temp_num = 0
  459. temp_num = MAX_MAIL - mail_num
  460. local hash = redis_pool.hashcode(uid)
  461. local m_u = string_format(MAIL_CONTENT, uid)
  462. for _, v in pairs(global_mail) do
  463. if v.date > tm then
  464. if temp_num <= 0 then
  465. break
  466. end
  467. -- redis_pool.push(hash, "zadd", string_format(MAIL_CONTENT, uid), os_time(), v.id)
  468. -- redis_pool.push(hash, "setex", string_format(MAIL_PRIVATE, v.id), retention_tm, v.content)
  469. -- 解码
  470. local content = cjson_decode(v.content)
  471. local id = "M" .. keygen()
  472. -- 更换id
  473. content.id = id
  474. redis_pool.push(hash, "set", string_format(MAIL_PRIVATE, id), cjson_encode(content))
  475. -- 维护邮件的id
  476. redis_pool.push(hash, "zadd", m_u, tm, id)
  477. set_memory_mail_num(uid, 1)
  478. temp_num = temp_num - 1
  479. end
  480. end
  481. redis_pool.execute(hash)
  482. return
  483. end
  484. -- 玩家上线
  485. function CMD.online(agent, uid, tm)
  486. online_user[uid] = {
  487. agent = agent,
  488. }
  489. local key = string_format(MAIL_CONTENT, uid)
  490. local del_ids = redis:ZRANGEBYSCORE (key, 0, os.time() - retention_tm)
  491. if next(del_ids or {}) then
  492. CMD.del_mail(uid, del_ids)
  493. end
  494. -- 收取玩家的全局邮件
  495. check_mail(uid, tm)
  496. end
  497. -- 玩家下线
  498. function CMD.offline(agent, uid)
  499. -- 删除服务器邮件数据
  500. if online_user[uid].agent == agent then
  501. online_user[uid] = nil
  502. user_mail_num[uid] = nil
  503. end
  504. end
  505. function CMD.mail_num(uid)
  506. return get_memory_mail_num(uid)
  507. end
  508. function CMD.surplus_mail_num(uid)
  509. return MAX_MAIL - get_memory_mail_num(uid)
  510. end
  511. -- 删除邮件
  512. function CMD.del_mail(uid, list)
  513. local mail_s = string_format(MAIL_CONTENT, uid)
  514. local mail_s_s
  515. local del_num = 0
  516. local hash = redis_pool.hashcode(uid)
  517. for _, v in pairs(list) do
  518. mail_s_s = string_format(MAIL_PRIVATE, v)
  519. redis_pool.push(hash, "zrem", mail_s, v)
  520. redis_pool.push(hash, "del", mail_s_s)
  521. del_num = del_num - 1
  522. end
  523. set_memory_mail_num(uid, del_num)
  524. redis_pool.execute(hash)
  525. return 0
  526. end
  527. -- 提取邮件附件
  528. function CMD.get_goods(uid, ids)
  529. local err, list = get_mail(uid, ids)
  530. return err, list
  531. end
  532. local function update_global_mail()
  533. local now_tm = os_time()
  534. local temp = {}
  535. local batch = pipeline(124)
  536. for _, v in pairs(global_mail) do
  537. -- 时间检查, 检查邮件是否过期
  538. if retention_tm and (retention_tm + v.date < now_tm) then
  539. batch.add("lpop", "mailbox:global")
  540. else
  541. temp[v.id] = v
  542. end
  543. end
  544. batch.execute(redis)
  545. global_mail = temp
  546. end
  547. skynet.init(function()
  548. skynet.register(".mailbox")
  549. cjson.encode_sparse_array(true, 1)
  550. mail_pool:send_mail_pool()
  551. skynet.fork(function()
  552. while true do
  553. skynet.sleep(300)
  554. if start_sign then
  555. update_global_mail()
  556. end
  557. end
  558. end)
  559. end)
  560. skynet.start(function()
  561. logger.label("<Mailbox>")
  562. skynet.dispatch("lua", function(session, _, cmd, ...)
  563. local f = assert(CMD[cmd])
  564. if session == 0 then
  565. f(...)
  566. else
  567. skynet_retpack(f(...))
  568. end
  569. end)
  570. end)