lua-stm.c 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. #define LUA_LIB
  2. #include <lua.h>
  3. #include <lauxlib.h>
  4. #include <stdlib.h>
  5. #include <stdint.h>
  6. #include <assert.h>
  7. #include <string.h>
  8. #include "rwlock.h"
  9. #include "skynet_malloc.h"
  10. #include "atomic.h"
  11. struct stm_object {
  12. struct rwlock lock;
  13. ATOM_INT reference;
  14. struct stm_copy * copy;
  15. };
  16. struct stm_copy {
  17. ATOM_INT reference;
  18. uint32_t sz;
  19. void * msg;
  20. };
  21. // msg should alloc by skynet_malloc
  22. static struct stm_copy *
  23. stm_newcopy(void * msg, int32_t sz) {
  24. struct stm_copy * copy = skynet_malloc(sizeof(*copy));
  25. ATOM_INIT(&copy->reference, 1);
  26. copy->sz = sz;
  27. copy->msg = msg;
  28. return copy;
  29. }
  30. static struct stm_object *
  31. stm_new(void * msg, int32_t sz) {
  32. struct stm_object * obj = skynet_malloc(sizeof(*obj));
  33. rwlock_init(&obj->lock);
  34. ATOM_INIT(&obj->reference , 1);
  35. obj->copy = stm_newcopy(msg, sz);
  36. return obj;
  37. }
  38. static void
  39. stm_releasecopy(struct stm_copy *copy) {
  40. if (copy == NULL)
  41. return;
  42. if (ATOM_FDEC(&copy->reference) <= 1) {
  43. skynet_free(copy->msg);
  44. skynet_free(copy);
  45. }
  46. }
  47. static void
  48. stm_release(struct stm_object *obj) {
  49. assert(obj->copy);
  50. rwlock_wlock(&obj->lock);
  51. // writer release the stm object, so release the last copy .
  52. stm_releasecopy(obj->copy);
  53. obj->copy = NULL;
  54. if (ATOM_FDEC(&obj->reference) > 1) {
  55. // stm object grab by readers, reset the copy to NULL.
  56. rwlock_wunlock(&obj->lock);
  57. return;
  58. }
  59. // no one grab the stm object, no need to unlock wlock.
  60. skynet_free(obj);
  61. }
  62. static void
  63. stm_releasereader(struct stm_object *obj) {
  64. rwlock_rlock(&obj->lock);
  65. if (ATOM_FDEC(&obj->reference) == 1) {
  66. // last reader, no writer. so no need to unlock
  67. assert(obj->copy == NULL);
  68. skynet_free(obj);
  69. return;
  70. }
  71. rwlock_runlock(&obj->lock);
  72. }
  73. static void
  74. stm_grab(struct stm_object *obj) {
  75. rwlock_rlock(&obj->lock);
  76. int ref = ATOM_FINC(&obj->reference);
  77. rwlock_runlock(&obj->lock);
  78. assert(ref > 0);
  79. }
  80. static struct stm_copy *
  81. stm_copy(struct stm_object *obj) {
  82. rwlock_rlock(&obj->lock);
  83. struct stm_copy * ret = obj->copy;
  84. if (ret) {
  85. int ref = ATOM_FINC(&ret->reference);
  86. assert(ref > 0);
  87. }
  88. rwlock_runlock(&obj->lock);
  89. return ret;
  90. }
  91. static void
  92. stm_update(struct stm_object *obj, void *msg, int32_t sz) {
  93. struct stm_copy *copy = stm_newcopy(msg, sz);
  94. rwlock_wlock(&obj->lock);
  95. struct stm_copy *oldcopy = obj->copy;
  96. obj->copy = copy;
  97. rwlock_wunlock(&obj->lock);
  98. stm_releasecopy(oldcopy);
  99. }
  100. // lua binding
  101. struct boxstm {
  102. struct stm_object * obj;
  103. };
  104. static int
  105. lcopy(lua_State *L) {
  106. struct boxstm * box = lua_touserdata(L, 1);
  107. stm_grab(box->obj);
  108. lua_pushlightuserdata(L, box->obj);
  109. return 1;
  110. }
  111. static int
  112. lnewwriter(lua_State *L) {
  113. void * msg;
  114. size_t sz;
  115. if (lua_isuserdata(L,1)) {
  116. msg = lua_touserdata(L, 1);
  117. sz = (size_t)luaL_checkinteger(L, 2);
  118. } else {
  119. const char * tmp = luaL_checklstring(L,1,&sz);
  120. msg = skynet_malloc(sz);
  121. memcpy(msg, tmp, sz);
  122. }
  123. struct boxstm * box = lua_newuserdatauv(L, sizeof(*box), 0);
  124. box->obj = stm_new(msg,sz);
  125. lua_pushvalue(L, lua_upvalueindex(1));
  126. lua_setmetatable(L, -2);
  127. return 1;
  128. }
  129. static int
  130. ldeletewriter(lua_State *L) {
  131. struct boxstm * box = lua_touserdata(L, 1);
  132. stm_release(box->obj);
  133. box->obj = NULL;
  134. return 0;
  135. }
  136. static int
  137. lupdate(lua_State *L) {
  138. struct boxstm * box = lua_touserdata(L, 1);
  139. void * msg;
  140. size_t sz;
  141. if (lua_isuserdata(L, 2)) {
  142. msg = lua_touserdata(L, 2);
  143. sz = (size_t)luaL_checkinteger(L, 3);
  144. } else {
  145. const char * tmp = luaL_checklstring(L,2,&sz);
  146. msg = skynet_malloc(sz);
  147. memcpy(msg, tmp, sz);
  148. }
  149. stm_update(box->obj, msg, sz);
  150. return 0;
  151. }
  152. struct boxreader {
  153. struct stm_object *obj;
  154. struct stm_copy *lastcopy;
  155. };
  156. static int
  157. lnewreader(lua_State *L) {
  158. struct boxreader * box = lua_newuserdatauv(L, sizeof(*box), 0);
  159. box->obj = lua_touserdata(L, 1);
  160. box->lastcopy = NULL;
  161. lua_pushvalue(L, lua_upvalueindex(1));
  162. lua_setmetatable(L, -2);
  163. return 1;
  164. }
  165. static int
  166. ldeletereader(lua_State *L) {
  167. struct boxreader * box = lua_touserdata(L, 1);
  168. stm_releasereader(box->obj);
  169. box->obj = NULL;
  170. stm_releasecopy(box->lastcopy);
  171. box->lastcopy = NULL;
  172. return 0;
  173. }
  174. static int
  175. lread(lua_State *L) {
  176. struct boxreader * box = lua_touserdata(L, 1);
  177. luaL_checktype(L, 2, LUA_TFUNCTION);
  178. struct stm_copy * copy = stm_copy(box->obj);
  179. if (copy == box->lastcopy) {
  180. // not update
  181. stm_releasecopy(copy);
  182. lua_pushboolean(L, 0);
  183. return 1;
  184. }
  185. stm_releasecopy(box->lastcopy);
  186. box->lastcopy = copy;
  187. if (copy) {
  188. lua_settop(L, 3);
  189. lua_replace(L, 1);
  190. lua_settop(L, 2);
  191. lua_pushlightuserdata(L, copy->msg);
  192. lua_pushinteger(L, copy->sz);
  193. lua_pushvalue(L, 1);
  194. lua_call(L, 3, LUA_MULTRET);
  195. lua_pushboolean(L, 1);
  196. lua_replace(L, 1);
  197. return lua_gettop(L);
  198. } else {
  199. lua_pushboolean(L, 0);
  200. return 1;
  201. }
  202. }
  203. LUAMOD_API int
  204. luaopen_skynet_stm(lua_State *L) {
  205. luaL_checkversion(L);
  206. lua_createtable(L, 0, 3);
  207. lua_pushcfunction(L, lcopy);
  208. lua_setfield(L, -2, "copy");
  209. luaL_Reg writer[] = {
  210. { "new", lnewwriter },
  211. { NULL, NULL },
  212. };
  213. lua_createtable(L, 0, 2);
  214. lua_pushcfunction(L, ldeletewriter),
  215. lua_setfield(L, -2, "__gc");
  216. lua_pushcfunction(L, lupdate),
  217. lua_setfield(L, -2, "__call");
  218. luaL_setfuncs(L, writer, 1);
  219. luaL_Reg reader[] = {
  220. { "newcopy", lnewreader },
  221. { NULL, NULL },
  222. };
  223. lua_createtable(L, 0, 2);
  224. lua_pushcfunction(L, ldeletereader),
  225. lua_setfield(L, -2, "__gc");
  226. lua_pushcfunction(L, lread),
  227. lua_setfield(L, -2, "__call");
  228. luaL_setfuncs(L, reader, 1);
  229. return 1;
  230. }