mq.lua 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. -- 消息队列服务
  2. local skynet = require "skynet"
  3. require "skynet.manager"
  4. -- require "skynet.queue"
  5. local redisdriver = require "skynet.db.redis"
  6. local logger = require "logger"
  7. local cjson = require "cjson"
  8. -- local util = require "util"
  9. local skynet_call = skynet.call
  10. local skynet_send = skynet.send
  11. local string_format = string.format
  12. local table_unpack = table.unpack
  13. local cjson_encode = cjson.encode
  14. local cjson_decode = cjson.decode
  15. local logger_trace = logger.trace
  16. local logger_warn = logger.warn
  17. -- local hashcode = util.hashcode
  18. local forwarding = function(agent, ...)
  19. return pcall(skynet_call, agent, "lua", "publish", ...)
  20. end
  21. local keygen = function(k)
  22. return string_format("mq:%s", k)
  23. end
  24. local redis
  25. local lpop = function(k) return redis:lpop(k) end
  26. local lpush = function(k, v) return redis:lpush(k, v) end
  27. local rpush = function(k, v) return redis:rpush(k, v) end
  28. local dispatch_message = function(agent, uid)
  29. local key = keygen(uid)
  30. while true do
  31. local val = lpop(key)
  32. if val == nil then
  33. break
  34. end
  35. if forwarding(agent, table_unpack(cjson_decode(val))) then
  36. logger_trace("玩家 %s 处理了离线消息 %s", uid, val)
  37. else
  38. lpush(key, val)
  39. logger_warn("玩家 %s 处理离线消息 %s 失败", uid, val)
  40. break
  41. end
  42. end
  43. end
  44. local online_user = {}
  45. local CMD = {}
  46. function CMD.subscribe(agent, uid)
  47. dispatch_message(agent, uid)
  48. online_user[uid] = agent
  49. end
  50. function CMD.unsubscribe(agent, uid)
  51. if online_user[uid] == agent then
  52. online_user[uid] = nil
  53. end
  54. end
  55. function CMD.publish(_, uid, ...)
  56. local agent = online_user[uid]
  57. if agent then
  58. if forwarding(agent, ...) then
  59. return
  60. end
  61. end
  62. local key = keygen(uid)
  63. local value = cjson_encode({...})
  64. rpush(key, value)
  65. logger_trace("玩家 %s 的离线消息 %s 被存盘", uid, value)
  66. agent = online_user[uid]
  67. if agent then
  68. dispatch_message(agent, uid)
  69. end
  70. end
  71. function CMD.start()
  72. local conf = assert(option.redis)
  73. redis = redisdriver.connect(conf)
  74. redis:select(6)
  75. logger.info("%s:%s", conf.host, conf.port)
  76. end
  77. skynet.init(function()
  78. cjson.encode_sparse_array(true, 1)
  79. skynet.register(".mq")
  80. end)
  81. skynet.start(function()
  82. logger.label("<MQ>,")
  83. skynet.dispatch("lua", function(session, source, cmd, ...)
  84. local f = assert(CMD[cmd])
  85. if 0==session then
  86. f(source, ...)
  87. else
  88. skynet.retpack(f(source, ...))
  89. end
  90. end)
  91. end)