logslave.lua 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. local skynet = require "skynet"
  2. require "skynet.manager"
  3. local cluster = require "skynet.cluster"
  4. local redisdriver = require "skynet.db.redis"
  5. local logger = require "logger"
  6. local stringify = require "stringify"
  7. local dns = require "skynet.dns"
  8. local httpc = require "http.httpc"
  9. local cjson = require "cjson"
  10. cjson.encode_sparse_array(true, 1)
  11. local table_insert = table.insert
  12. local table_remove = table.remove
  13. local skynet_retpack = skynet.retpack
  14. local skynet_wakeup = skynet.wakeup
  15. local skynet_wait = skynet.wait
  16. local index = ...
  17. local MAX_PAGE_SIZE = 256
  18. local preview = false
  19. local threading = 0
  20. local processed = 0
  21. local received = 0
  22. local page = {}
  23. local cache = {}
  24. local wakeup_queue = {}
  25. local function wakeup(num)
  26. if num <= 0 then
  27. return
  28. end
  29. for co, _ in pairs(wakeup_queue) do
  30. skynet_wakeup(co)
  31. num = num - 1
  32. if num == 0 then
  33. break
  34. end
  35. end
  36. end
  37. local function push_back(...)
  38. table_insert(page, {... })
  39. if #page == MAX_PAGE_SIZE then
  40. table_insert(cache, page)
  41. page = {}
  42. wakeup(#cache)
  43. end
  44. end
  45. local function co_create(conf)
  46. local self = redisdriver.connect(conf)
  47. local function pipelining(ops)
  48. local succ = pcall(self.pipeline, self, ops)
  49. if not succ then
  50. while true do
  51. logger.warn("Reconnect to redis")
  52. local ok, red = pcall(redisdriver.connect, conf)
  53. if ok then
  54. logger.info("Reconnect success")
  55. self = red
  56. break
  57. end
  58. skynet.sleep(500)
  59. end
  60. end
  61. processed = processed + #ops
  62. end
  63. skynet.fork(function()
  64. local co = coroutine.running()
  65. while true do
  66. if #cache > 0 then
  67. local ops = table_remove(cache, 1)
  68. pipelining(ops)
  69. elseif #page > 0 then
  70. local ops = page
  71. page = {}
  72. pipelining(ops)
  73. else
  74. wakeup_queue[co] = true
  75. skynet_wait()
  76. wakeup_queue[co] = nil
  77. end
  78. end
  79. end)
  80. end
  81. local CMD = {}
  82. function CMD.start(conf)
  83. assert(conf)
  84. threading = assert(conf.thread)
  85. preview = conf.preview or false
  86. assert(threading > 0)
  87. for i=1, threading do
  88. co_create(conf)
  89. end
  90. local co = coroutine.running()
  91. skynet.fork(function()
  92. skynet_wakeup(co)
  93. local skynet_sleep = skynet.sleep
  94. while true do
  95. local len = #cache
  96. if #page > 0 then
  97. len = len + 1
  98. end
  99. wakeup(len)
  100. skynet_sleep(100)
  101. end
  102. end)
  103. skynet_wait()
  104. logger.info("%s:%s", conf.host, conf.port)
  105. end
  106. function CMD.record(key, content)
  107. push_back('rpush', key, content)
  108. received = received + 1
  109. if preview then
  110. logger.trace("\n%s", content)
  111. end
  112. end
  113. function CMD.logon()
  114. preview = true
  115. end
  116. function CMD.logoff()
  117. preview = false
  118. end
  119. skynet.info_func(function()
  120. local sleeping = 0
  121. for _, _ in pairs(wakeup_queue) do
  122. sleeping = sleeping + 1
  123. end
  124. local working = threading - sleeping
  125. local pending = #cache * MAX_PAGE_SIZE + #page
  126. return stringify({
  127. preview = preview and "true" or "false",
  128. processed = processed,
  129. received = received,
  130. sleeping = sleeping,
  131. working = working,
  132. pending = pending
  133. })
  134. end)
  135. skynet.memlimit(256 * 1024 * 1024)
  136. skynet.start(function()
  137. logger.label(string.format("<Log slave %d>,", index))
  138. skynet.dispatch("lua", function(session, _, cmd, ...)
  139. local f = assert(CMD[cmd])
  140. if session == 0 then
  141. f(...)
  142. else
  143. skynet_retpack(f(...))
  144. end
  145. end)
  146. end)