sharedatad.lua 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. local skynet = require "skynet"
  2. local sharedata = require "skynet.sharedata.corelib"
  3. local table = table
  4. local cache = require "skynet.codecache"
  5. cache.mode "OFF" -- turn off codecache, because CMD.new may load data file
  6. local NORET = {}
  7. local pool = {}
  8. local pool_count = {}
  9. local objmap = {}
  10. local collect_tick = 10
  11. local function newobj(name, tbl)
  12. assert(pool[name] == nil)
  13. local cobj = sharedata.host.new(tbl)
  14. sharedata.host.incref(cobj)
  15. local v = {obj = cobj, watch = {} }
  16. objmap[cobj] = v
  17. pool[name] = v
  18. pool_count[name] = { n = 0, threshold = 16 }
  19. end
  20. local function collect1min()
  21. if collect_tick > 1 then
  22. collect_tick = 1
  23. end
  24. end
  25. local function collectobj()
  26. while true do
  27. skynet.sleep(60*100) -- sleep 1min
  28. if collect_tick <= 0 then
  29. collect_tick = 10 -- reset tick count to 10 min
  30. collectgarbage()
  31. for obj, v in pairs(objmap) do
  32. if v == true then
  33. if sharedata.host.getref(obj) <= 0 then
  34. objmap[obj] = nil
  35. sharedata.host.delete(obj)
  36. end
  37. end
  38. end
  39. else
  40. collect_tick = collect_tick - 1
  41. end
  42. end
  43. end
  44. local CMD = {}
  45. local env_mt = { __index = _ENV }
  46. function CMD.new(name, t, ...)
  47. local dt = type(t)
  48. local value
  49. if dt == "table" then
  50. value = t
  51. elseif dt == "string" then
  52. value = setmetatable({}, env_mt)
  53. local f
  54. if t:sub(1,1) == "@" then
  55. f = assert(loadfile(t:sub(2),"bt",value))
  56. else
  57. f = assert(load(t, "=" .. name, "bt", value))
  58. end
  59. local _, ret = assert(skynet.pcall(f, ...))
  60. setmetatable(value, nil)
  61. if type(ret) == "table" then
  62. value = ret
  63. end
  64. elseif dt == "nil" then
  65. value = {}
  66. else
  67. error ("Unknown data type " .. dt)
  68. end
  69. newobj(name, value)
  70. end
  71. function CMD.delete(name)
  72. local v = assert(pool[name])
  73. pool[name] = nil
  74. pool_count[name] = nil
  75. assert(objmap[v.obj])
  76. objmap[v.obj] = true
  77. sharedata.host.decref(v.obj)
  78. for _,response in pairs(v.watch) do
  79. response(true)
  80. end
  81. end
  82. function CMD.query(name)
  83. local v = assert(pool[name], name)
  84. local obj = v.obj
  85. sharedata.host.incref(obj)
  86. return v.obj
  87. end
  88. function CMD.confirm(cobj)
  89. if objmap[cobj] then
  90. sharedata.host.decref(cobj)
  91. end
  92. return NORET
  93. end
  94. function CMD.update(name, t, ...)
  95. local v = pool[name]
  96. local watch, oldcobj
  97. if v then
  98. watch = v.watch
  99. oldcobj = v.obj
  100. objmap[oldcobj] = true
  101. sharedata.host.decref(oldcobj)
  102. pool[name] = nil
  103. pool_count[name] = nil
  104. end
  105. CMD.new(name, t, ...)
  106. local newobj = pool[name].obj
  107. if watch then
  108. sharedata.host.markdirty(oldcobj)
  109. for _,response in pairs(watch) do
  110. sharedata.host.incref(newobj)
  111. response(true, newobj)
  112. end
  113. end
  114. collect1min() -- collect in 1 min
  115. end
  116. local function check_watch(queue)
  117. local n = 0
  118. for k,response in pairs(queue) do
  119. if not response "TEST" then
  120. queue[k] = nil
  121. n = n + 1
  122. end
  123. end
  124. return n
  125. end
  126. function CMD.monitor(name, obj)
  127. local v = assert(pool[name])
  128. if obj ~= v.obj then
  129. sharedata.host.incref(v.obj)
  130. return v.obj
  131. end
  132. local n = pool_count[name].n + 1
  133. if n > pool_count[name].threshold then
  134. n = n - check_watch(v.watch)
  135. pool_count[name].threshold = n * 2
  136. end
  137. pool_count[name].n = n
  138. table.insert(v.watch, skynet.response())
  139. return NORET
  140. end
  141. skynet.start(function()
  142. skynet.fork(collectobj)
  143. skynet.dispatch("lua", function (session, source ,cmd, ...)
  144. local f = assert(CMD[cmd])
  145. local r = f(...)
  146. if r ~= NORET then
  147. skynet.ret(skynet.pack(r))
  148. end
  149. end)
  150. end)