local skynet = require "skynet" require "skynet.manager" local cluster = require "skynet.cluster" local redisdriver = require "skynet.db.redis" local logger = require "logger" local stringify = require "stringify" local dns = require "skynet.dns" local httpc = require "http.httpc" local cjson = require "cjson" cjson.encode_sparse_array(true, 1) local table_insert = table.insert local table_remove = table.remove local skynet_retpack = skynet.retpack local skynet_wakeup = skynet.wakeup local skynet_wait = skynet.wait local index = ... local MAX_PAGE_SIZE = 256 local preview = false local threading = 0 local processed = 0 local received = 0 local page = {} local cache = {} local wakeup_queue = {} local function wakeup(num) if num <= 0 then return end for co, _ in pairs(wakeup_queue) do skynet_wakeup(co) num = num - 1 if num == 0 then break end end end local function push_back(...) table_insert(page, {... }) if #page == MAX_PAGE_SIZE then table_insert(cache, page) page = {} wakeup(#cache) end end local function co_create(conf) local self = redisdriver.connect(conf) local function pipelining(ops) local succ = pcall(self.pipeline, self, ops) if not succ then while true do logger.warn("Reconnect to redis") local ok, red = pcall(redisdriver.connect, conf) if ok then logger.info("Reconnect success") self = red break end skynet.sleep(500) end end processed = processed + #ops end skynet.fork(function() local co = coroutine.running() while true do if #cache > 0 then local ops = table_remove(cache, 1) pipelining(ops) elseif #page > 0 then local ops = page page = {} pipelining(ops) else wakeup_queue[co] = true skynet_wait() wakeup_queue[co] = nil end end end) end local CMD = {} function CMD.start(conf) assert(conf) threading = assert(conf.thread) preview = conf.preview or false assert(threading > 0) for i=1, threading do co_create(conf) end local co = coroutine.running() skynet.fork(function() skynet_wakeup(co) local skynet_sleep = skynet.sleep while true do local len = #cache if #page > 0 then len = len + 1 end wakeup(len) skynet_sleep(100) end end) skynet_wait() logger.info("%s:%s", conf.host, conf.port) end function CMD.record(key, content) push_back('rpush', key, content) received = received + 1 if preview then logger.trace("\n%s", content) end end function CMD.logon() preview = true end function CMD.logoff() preview = false end skynet.info_func(function() local sleeping = 0 for _, _ in pairs(wakeup_queue) do sleeping = sleeping + 1 end local working = threading - sleeping local pending = #cache * MAX_PAGE_SIZE + #page return stringify({ preview = preview and "true" or "false", processed = processed, received = received, sleeping = sleeping, working = working, pending = pending }) end) skynet.memlimit(256 * 1024 * 1024) skynet.start(function() logger.label(string.format(",", index)) skynet.dispatch("lua", function(session, _, cmd, ...) local f = assert(CMD[cmd]) if session == 0 then f(...) else skynet_retpack(f(...)) end end) end)