123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- local skynet = require "skynet"
- local sc = require "skynet.socketchannel"
- local socket = require "skynet.socket"
- local cluster = require "skynet.cluster.core"
- local channel
- local session = 1
- local node, nodename, init_host, init_port = ...
- local command = {}
- local function send_request(addr, msg, sz)
- -- msg is a local pointer, cluster.packrequest will free it
- local current_session = session
- local request, new_session, padding = cluster.packrequest(addr, session, msg, sz)
- session = new_session
- local tracetag = skynet.tracetag()
- if tracetag then
- if tracetag:sub(1,1) ~= "(" then
- -- add nodename
- local newtag = string.format("(%s-%s-%d)%s", nodename, node, session, tracetag)
- skynet.tracelog(tracetag, string.format("session %s", newtag))
- tracetag = newtag
- end
- skynet.tracelog(tracetag, string.format("cluster %s", node))
- channel:request(cluster.packtrace(tracetag))
- end
- return channel:request(request, current_session, padding)
- end
- function command.req(...)
- local ok, msg = pcall(send_request, ...)
- if ok then
- if type(msg) == "table" then
- skynet.ret(cluster.concat(msg))
- else
- skynet.ret(msg)
- end
- else
- skynet.error(msg)
- skynet.response()(false)
- end
- end
- function command.push(addr, msg, sz)
- local request, new_session, padding = cluster.packpush(addr, session, msg, sz)
- if padding then -- is multi push
- session = new_session
- end
- channel:request(request, nil, padding)
- end
- local function read_response(sock)
- local sz = socket.header(sock:read(2))
- local msg = sock:read(sz)
- return cluster.unpackresponse(msg) -- session, ok, data, padding
- end
- function command.changenode(host, port)
- if not host then
- skynet.error(string.format("Close cluster sender %s:%d", channel.__host, channel.__port))
- channel:close()
- else
- channel:changehost(host, tonumber(port))
- channel:connect(true)
- end
- skynet.ret(skynet.pack(nil))
- end
- skynet.start(function()
- channel = sc.channel {
- host = init_host,
- port = tonumber(init_port),
- response = read_response,
- nodelay = true,
- }
- skynet.dispatch("lua", function(session , source, cmd, ...)
- local f = assert(command[cmd])
- f(...)
- end)
- end)
|