123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- local skynet = require "skynet"
- require "skynet.manager"
- local cluster = require "skynet.cluster"
- local redisdriver = require "skynet.db.redis"
- local logger = require "logger"
- local stringify = require "stringify"
- local dns = require "skynet.dns"
- local httpc = require "http.httpc"
- local cjson = require "cjson"
- cjson.encode_sparse_array(true, 1)
- local table_insert = table.insert
- local table_remove = table.remove
- local skynet_retpack = skynet.retpack
- local skynet_wakeup = skynet.wakeup
- local skynet_wait = skynet.wait
- local index = ...
- local MAX_PAGE_SIZE = 256
- local preview = false
- local threading = 0
- local processed = 0
- local received = 0
- local page = {}
- local cache = {}
- local wakeup_queue = {}
- local function wakeup(num)
- if num <= 0 then
- return
- end
- for co, _ in pairs(wakeup_queue) do
- skynet_wakeup(co)
- num = num - 1
- if num == 0 then
- break
- end
- end
- end
- local function push_back(...)
- table_insert(page, {... })
- if #page == MAX_PAGE_SIZE then
- table_insert(cache, page)
- page = {}
- wakeup(#cache)
- end
- end
- local function co_create(conf)
- local self = redisdriver.connect(conf)
- local function pipelining(ops)
- local succ = pcall(self.pipeline, self, ops)
- if not succ then
- while true do
- logger.warn("Reconnect to redis")
- local ok, red = pcall(redisdriver.connect, conf)
- if ok then
- logger.info("Reconnect success")
- self = red
- break
- end
- skynet.sleep(500)
- end
- end
- processed = processed + #ops
- end
- skynet.fork(function()
- local co = coroutine.running()
- while true do
- if #cache > 0 then
- local ops = table_remove(cache, 1)
- pipelining(ops)
- elseif #page > 0 then
- local ops = page
- page = {}
- pipelining(ops)
- else
- wakeup_queue[co] = true
- skynet_wait()
- wakeup_queue[co] = nil
- end
- end
- end)
- end
- local CMD = {}
- function CMD.start(conf)
- assert(conf)
- threading = assert(conf.thread)
- preview = conf.preview or false
- assert(threading > 0)
- for i=1, threading do
- co_create(conf)
- end
- local co = coroutine.running()
- skynet.fork(function()
- skynet_wakeup(co)
- local skynet_sleep = skynet.sleep
- while true do
- local len = #cache
- if #page > 0 then
- len = len + 1
- end
- wakeup(len)
- skynet_sleep(100)
- end
- end)
- skynet_wait()
- logger.info("%s:%s", conf.host, conf.port)
- end
- function CMD.record(key, content)
- push_back('rpush', key, content)
- received = received + 1
- if preview then
- logger.trace("\n%s", content)
- end
- end
- function CMD.logon()
- preview = true
- end
- function CMD.logoff()
- preview = false
- end
- skynet.info_func(function()
- local sleeping = 0
- for _, _ in pairs(wakeup_queue) do
- sleeping = sleeping + 1
- end
- local working = threading - sleeping
- local pending = #cache * MAX_PAGE_SIZE + #page
- return stringify({
- preview = preview and "true" or "false",
- processed = processed,
- received = received,
- sleeping = sleeping,
- working = working,
- pending = pending
- })
- end)
- skynet.memlimit(256 * 1024 * 1024)
- skynet.start(function()
- logger.label(string.format("<Log slave %d>,", index))
- skynet.dispatch("lua", function(session, _, cmd, ...)
- local f = assert(CMD[cmd])
- if session == 0 then
- f(...)
- else
- skynet_retpack(f(...))
- end
- end)
- end)
|