datacenterd.lua 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. local skynet = require "skynet"
  2. local command = {}
  3. local database = {}
  4. local wait_queue = {}
  5. local mode = {}
  6. local function query(db, key, ...)
  7. if key == nil then
  8. return db
  9. else
  10. return query(db[key], ...)
  11. end
  12. end
  13. function command.QUERY(key, ...)
  14. local d = database[key]
  15. if d ~= nil then
  16. return query(d, ...)
  17. end
  18. end
  19. local function update(db, key, value, ...)
  20. if select("#",...) == 0 then
  21. local ret = db[key]
  22. db[key] = value
  23. return ret, value
  24. else
  25. if db[key] == nil then
  26. db[key] = {}
  27. end
  28. return update(db[key], value, ...)
  29. end
  30. end
  31. local function wakeup(db, key1, ...)
  32. if key1 == nil then
  33. return
  34. end
  35. local q = db[key1]
  36. if q == nil then
  37. return
  38. end
  39. if q[mode] == "queue" then
  40. db[key1] = nil
  41. if select("#", ...) ~= 1 then
  42. -- throw error because can't wake up a branch
  43. for _,response in ipairs(q) do
  44. response(false)
  45. end
  46. else
  47. return q
  48. end
  49. else
  50. -- it's branch
  51. return wakeup(q , ...)
  52. end
  53. end
  54. function command.UPDATE(...)
  55. local ret, value = update(database, ...)
  56. if ret ~= nil or value == nil then
  57. return ret
  58. end
  59. local q = wakeup(wait_queue, ...)
  60. if q then
  61. for _, response in ipairs(q) do
  62. response(true,value)
  63. end
  64. end
  65. end
  66. local function waitfor(db, key1, key2, ...)
  67. if key2 == nil then
  68. -- push queue
  69. local q = db[key1]
  70. if q == nil then
  71. q = { [mode] = "queue" }
  72. db[key1] = q
  73. else
  74. assert(q[mode] == "queue")
  75. end
  76. table.insert(q, skynet.response())
  77. else
  78. local q = db[key1]
  79. if q == nil then
  80. q = { [mode] = "branch" }
  81. db[key1] = q
  82. else
  83. assert(q[mode] == "branch")
  84. end
  85. return waitfor(q, key2, ...)
  86. end
  87. end
  88. skynet.start(function()
  89. skynet.dispatch("lua", function (_, _, cmd, ...)
  90. if cmd == "WAIT" then
  91. local ret = command.QUERY(...)
  92. if ret ~= nil then
  93. skynet.ret(skynet.pack(ret))
  94. else
  95. waitfor(wait_queue, ...)
  96. end
  97. else
  98. local f = assert(command[cmd])
  99. skynet.ret(skynet.pack(f(...)))
  100. end
  101. end)
  102. end)