123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126 |
- local skynet = require "skynet"
- local socket = require "skynet.socket"
- --[[
- master manage data :
- 1. all the slaves address : id -> ipaddr:port
- 2. all the global names : name -> address
- master hold connections from slaves .
- protocol slave->master :
- package size 1 byte
- type 1 byte :
- 'H' : HANDSHAKE, report slave id, and address.
- 'R' : REGISTER name address
- 'Q' : QUERY name
- protocol master->slave:
- package size 1 byte
- type 1 byte :
- 'W' : WAIT n
- 'C' : CONNECT slave_id slave_address
- 'N' : NAME globalname address
- 'D' : DISCONNECT slave_id
- ]]
- local slave_node = {}
- local global_name = {}
- local function read_package(fd)
- local sz = socket.read(fd, 1)
- assert(sz, "closed")
- sz = string.byte(sz)
- local content = assert(socket.read(fd, sz), "closed")
- return skynet.unpack(content)
- end
- local function pack_package(...)
- local message = skynet.packstring(...)
- local size = #message
- assert(size <= 255 , "too long")
- return string.char(size) .. message
- end
- local function report_slave(fd, slave_id, slave_addr)
- local message = pack_package("C", slave_id, slave_addr)
- local n = 0
- for k,v in pairs(slave_node) do
- if v.fd ~= 0 then
- socket.write(v.fd, message)
- n = n + 1
- end
- end
- socket.write(fd, pack_package("W", n))
- end
- local function handshake(fd)
- local t, slave_id, slave_addr = read_package(fd)
- assert(t=='H', "Invalid handshake type " .. t)
- assert(slave_id ~= 0 , "Invalid slave id 0")
- if slave_node[slave_id] then
- error(string.format("Slave %d already register on %s", slave_id, slave_node[slave_id].addr))
- end
- report_slave(fd, slave_id, slave_addr)
- slave_node[slave_id] = {
- fd = fd,
- id = slave_id,
- addr = slave_addr,
- }
- return slave_id , slave_addr
- end
- local function dispatch_slave(fd)
- local t, name, address = read_package(fd)
- if t == 'R' then
- -- register name
- assert(type(address)=="number", "Invalid request")
- if not global_name[name] then
- global_name[name] = address
- end
- local message = pack_package("N", name, address)
- for k,v in pairs(slave_node) do
- socket.write(v.fd, message)
- end
- elseif t == 'Q' then
- -- query name
- local address = global_name[name]
- if address then
- socket.write(fd, pack_package("N", name, address))
- end
- else
- skynet.error("Invalid slave message type " .. t)
- end
- end
- local function monitor_slave(slave_id, slave_address)
- local fd = slave_node[slave_id].fd
- skynet.error(string.format("Harbor %d (fd=%d) report %s", slave_id, fd, slave_address))
- while pcall(dispatch_slave, fd) do end
- skynet.error("slave " ..slave_id .. " is down")
- local message = pack_package("D", slave_id)
- slave_node[slave_id].fd = 0
- for k,v in pairs(slave_node) do
- socket.write(v.fd, message)
- end
- socket.close(fd)
- end
- skynet.start(function()
- local master_addr = skynet.getenv "standalone"
- skynet.error("master listen socket " .. tostring(master_addr))
- local fd = socket.listen(master_addr)
- socket.start(fd , function(id, addr)
- skynet.error("connect from " .. addr .. " " .. id)
- socket.start(id)
- local ok, slave, slave_addr = pcall(handshake, id)
- if ok then
- skynet.fork(monitor_slave, slave, slave_addr)
- else
- skynet.error(string.format("disconnect fd = %d, error = %s", id, slave))
- socket.close(id)
- end
- end)
- end)
|