lua-cluster.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638
  1. #define LUA_LIB
  2. #include <lua.h>
  3. #include <lauxlib.h>
  4. #include <string.h>
  5. #include <assert.h>
  6. #include <unistd.h>
  7. #include "skynet.h"
  8. /*
  9. uint32_t/string addr
  10. uint32_t/session session
  11. lightuserdata msg
  12. uint32_t sz
  13. return
  14. string request
  15. uint32_t next_session
  16. */
  17. #define TEMP_LENGTH 0x8200
  18. #define MULTI_PART 0x8000
  19. static void
  20. fill_uint32(uint8_t * buf, uint32_t n) {
  21. buf[0] = n & 0xff;
  22. buf[1] = (n >> 8) & 0xff;
  23. buf[2] = (n >> 16) & 0xff;
  24. buf[3] = (n >> 24) & 0xff;
  25. }
  26. static void
  27. fill_header(lua_State *L, uint8_t *buf, int sz) {
  28. assert(sz < 0x10000);
  29. buf[0] = (sz >> 8) & 0xff;
  30. buf[1] = sz & 0xff;
  31. }
  32. /*
  33. The request package :
  34. first WORD is size of the package with big-endian
  35. DWORD in content is small-endian
  36. size <= 0x8000 (32K) and address is id
  37. WORD sz+9
  38. BYTE 0
  39. DWORD addr
  40. DWORD session
  41. PADDING msg(sz)
  42. size > 0x8000 and address is id
  43. WORD 13
  44. BYTE 1 ; multireq , 0x41: multi push
  45. DWORD addr
  46. DWORD session
  47. DWORD sz
  48. size <= 0x8000 (32K) and address is string
  49. WORD sz+6+namelen
  50. BYTE 0x80
  51. BYTE namelen
  52. STRING name
  53. DWORD session
  54. PADDING msg(sz)
  55. size > 0x8000 and address is string
  56. WORD 10 + namelen
  57. BYTE 0x81 ; 0xc1 : multi push
  58. BYTE namelen
  59. STRING name
  60. DWORD session
  61. DWORD sz
  62. multi req
  63. WORD sz + 5
  64. BYTE 2/3 ; 2:multipart, 3:multipart end
  65. DWORD SESSION
  66. PADDING msgpart(sz)
  67. trace
  68. WORD stringsz + 1
  69. BYTE 4
  70. STRING tag
  71. */
  72. static int
  73. packreq_number(lua_State *L, int session, void * msg, uint32_t sz, int is_push) {
  74. uint32_t addr = (uint32_t)lua_tointeger(L,1);
  75. uint8_t buf[TEMP_LENGTH];
  76. if (sz < MULTI_PART) {
  77. fill_header(L, buf, sz+9);
  78. buf[2] = 0;
  79. fill_uint32(buf+3, addr);
  80. fill_uint32(buf+7, is_push ? 0 : (uint32_t)session);
  81. memcpy(buf+11,msg,sz);
  82. lua_pushlstring(L, (const char *)buf, sz+11);
  83. return 0;
  84. } else {
  85. int part = (sz - 1) / MULTI_PART + 1;
  86. fill_header(L, buf, 13);
  87. buf[2] = is_push ? 0x41 : 1; // multi push or request
  88. fill_uint32(buf+3, addr);
  89. fill_uint32(buf+7, (uint32_t)session);
  90. fill_uint32(buf+11, sz);
  91. lua_pushlstring(L, (const char *)buf, 15);
  92. return part;
  93. }
  94. }
  95. static int
  96. packreq_string(lua_State *L, int session, void * msg, uint32_t sz, int is_push) {
  97. size_t namelen = 0;
  98. const char *name = lua_tolstring(L, 1, &namelen);
  99. if (name == NULL || namelen < 1 || namelen > 255) {
  100. skynet_free(msg);
  101. if (name == NULL) {
  102. luaL_error(L, "name is not a string, it's a %s", lua_typename(L, lua_type(L, 1)));
  103. } else {
  104. luaL_error(L, "name is too long %s", name);
  105. }
  106. }
  107. uint8_t buf[TEMP_LENGTH];
  108. if (sz < MULTI_PART) {
  109. fill_header(L, buf, sz+6+namelen);
  110. buf[2] = 0x80;
  111. buf[3] = (uint8_t)namelen;
  112. memcpy(buf+4, name, namelen);
  113. fill_uint32(buf+4+namelen, is_push ? 0 : (uint32_t)session);
  114. memcpy(buf+8+namelen,msg,sz);
  115. lua_pushlstring(L, (const char *)buf, sz+8+namelen);
  116. return 0;
  117. } else {
  118. int part = (sz - 1) / MULTI_PART + 1;
  119. fill_header(L, buf, 10+namelen);
  120. buf[2] = is_push ? 0xc1 : 0x81; // multi push or request
  121. buf[3] = (uint8_t)namelen;
  122. memcpy(buf+4, name, namelen);
  123. fill_uint32(buf+4+namelen, (uint32_t)session);
  124. fill_uint32(buf+8+namelen, sz);
  125. lua_pushlstring(L, (const char *)buf, 12+namelen);
  126. return part;
  127. }
  128. }
  129. static void
  130. packreq_multi(lua_State *L, int session, void * msg, uint32_t sz) {
  131. uint8_t buf[TEMP_LENGTH];
  132. int part = (sz - 1) / MULTI_PART + 1;
  133. int i;
  134. char *ptr = msg;
  135. for (i=0;i<part;i++) {
  136. uint32_t s;
  137. if (sz > MULTI_PART) {
  138. s = MULTI_PART;
  139. buf[2] = 2;
  140. } else {
  141. s = sz;
  142. buf[2] = 3; // the last multi part
  143. }
  144. fill_header(L, buf, s+5);
  145. fill_uint32(buf+3, (uint32_t)session);
  146. memcpy(buf+7, ptr, s);
  147. lua_pushlstring(L, (const char *)buf, s+7);
  148. lua_rawseti(L, -2, i+1);
  149. sz -= s;
  150. ptr += s;
  151. }
  152. }
  153. static int
  154. packrequest(lua_State *L, int is_push) {
  155. void *msg = lua_touserdata(L,3);
  156. if (msg == NULL) {
  157. return luaL_error(L, "Invalid request message");
  158. }
  159. uint32_t sz = (uint32_t)luaL_checkinteger(L,4);
  160. int session = luaL_checkinteger(L,2);
  161. if (session <= 0) {
  162. skynet_free(msg);
  163. return luaL_error(L, "Invalid request session %d", session);
  164. }
  165. int addr_type = lua_type(L,1);
  166. int multipak;
  167. if (addr_type == LUA_TNUMBER) {
  168. multipak = packreq_number(L, session, msg, sz, is_push);
  169. } else {
  170. multipak = packreq_string(L, session, msg, sz, is_push);
  171. }
  172. uint32_t new_session = (uint32_t)session + 1;
  173. if (new_session > INT32_MAX) {
  174. new_session = 1;
  175. }
  176. lua_pushinteger(L, new_session);
  177. if (multipak) {
  178. lua_createtable(L, multipak, 0);
  179. packreq_multi(L, session, msg, sz);
  180. skynet_free(msg);
  181. return 3;
  182. } else {
  183. skynet_free(msg);
  184. return 2;
  185. }
  186. }
  187. static int
  188. lpackrequest(lua_State *L) {
  189. return packrequest(L, 0);
  190. }
  191. static int
  192. lpackpush(lua_State *L) {
  193. return packrequest(L, 1);
  194. }
  195. static int
  196. lpacktrace(lua_State *L) {
  197. size_t sz;
  198. const char * tag = luaL_checklstring(L, 1, &sz);
  199. if (sz > 0x8000) {
  200. return luaL_error(L, "trace tag is too long : %d", (int) sz);
  201. }
  202. uint8_t buf[TEMP_LENGTH];
  203. buf[2] = 4;
  204. fill_header(L, buf, sz+1);
  205. memcpy(buf+3, tag, sz);
  206. lua_pushlstring(L, (const char *)buf, sz+3);
  207. return 1;
  208. }
  209. /*
  210. string packed message
  211. return
  212. uint32_t or string addr
  213. int session
  214. lightuserdata msg
  215. int sz
  216. boolean padding
  217. boolean is_push
  218. */
  219. static inline uint32_t
  220. unpack_uint32(const uint8_t * buf) {
  221. return buf[0] | buf[1]<<8 | buf[2]<<16 | buf[3]<<24;
  222. }
  223. static void
  224. return_buffer(lua_State *L, const char * buffer, int sz) {
  225. void * ptr = skynet_malloc(sz);
  226. memcpy(ptr, buffer, sz);
  227. lua_pushlightuserdata(L, ptr);
  228. lua_pushinteger(L, sz);
  229. }
  230. static int
  231. unpackreq_number(lua_State *L, const uint8_t * buf, int sz) {
  232. if (sz < 9) {
  233. return luaL_error(L, "Invalid cluster message (size=%d)", sz);
  234. }
  235. uint32_t address = unpack_uint32(buf+1);
  236. uint32_t session = unpack_uint32(buf+5);
  237. lua_pushinteger(L, address);
  238. lua_pushinteger(L, session);
  239. return_buffer(L, (const char *)buf+9, sz-9);
  240. if (session == 0) {
  241. lua_pushnil(L);
  242. lua_pushboolean(L,1); // is_push, no reponse
  243. return 6;
  244. }
  245. return 4;
  246. }
  247. static int
  248. unpackmreq_number(lua_State *L, const uint8_t * buf, int sz, int is_push) {
  249. if (sz != 13) {
  250. return luaL_error(L, "Invalid cluster message size %d (multi req must be 13)", sz);
  251. }
  252. uint32_t address = unpack_uint32(buf+1);
  253. uint32_t session = unpack_uint32(buf+5);
  254. uint32_t size = unpack_uint32(buf+9);
  255. lua_pushinteger(L, address);
  256. lua_pushinteger(L, session);
  257. lua_pushnil(L);
  258. lua_pushinteger(L, size);
  259. lua_pushboolean(L, 1); // padding multi part
  260. lua_pushboolean(L, is_push);
  261. return 6;
  262. }
  263. static int
  264. unpackmreq_part(lua_State *L, const uint8_t * buf, int sz) {
  265. if (sz < 5) {
  266. return luaL_error(L, "Invalid cluster multi part message");
  267. }
  268. int padding = (buf[0] == 2);
  269. uint32_t session = unpack_uint32(buf+1);
  270. lua_pushboolean(L, 0); // no address
  271. lua_pushinteger(L, session);
  272. return_buffer(L, (const char *)buf+5, sz-5);
  273. lua_pushboolean(L, padding);
  274. return 5;
  275. }
  276. static int
  277. unpacktrace(lua_State *L, const char * buf, int sz) {
  278. lua_pushlstring(L, buf + 1, sz - 1);
  279. return 1;
  280. }
  281. static int
  282. unpackreq_string(lua_State *L, const uint8_t * buf, int sz) {
  283. if (sz < 2) {
  284. return luaL_error(L, "Invalid cluster message (size=%d)", sz);
  285. }
  286. size_t namesz = buf[1];
  287. if (sz < namesz + 6) {
  288. return luaL_error(L, "Invalid cluster message (size=%d)", sz);
  289. }
  290. lua_pushlstring(L, (const char *)buf+2, namesz);
  291. uint32_t session = unpack_uint32(buf + namesz + 2);
  292. lua_pushinteger(L, (uint32_t)session);
  293. return_buffer(L, (const char *)buf+2+namesz+4, sz - namesz - 6);
  294. if (session == 0) {
  295. lua_pushnil(L);
  296. lua_pushboolean(L,1); // is_push, no reponse
  297. return 6;
  298. }
  299. return 4;
  300. }
  301. static int
  302. unpackmreq_string(lua_State *L, const uint8_t * buf, int sz, int is_push) {
  303. if (sz < 2) {
  304. return luaL_error(L, "Invalid cluster message (size=%d)", sz);
  305. }
  306. size_t namesz = buf[1];
  307. if (sz < namesz + 10) {
  308. return luaL_error(L, "Invalid cluster message (size=%d)", sz);
  309. }
  310. lua_pushlstring(L, (const char *)buf+2, namesz);
  311. uint32_t session = unpack_uint32(buf + namesz + 2);
  312. uint32_t size = unpack_uint32(buf + namesz + 6);
  313. lua_pushinteger(L, session);
  314. lua_pushnil(L);
  315. lua_pushinteger(L, size);
  316. lua_pushboolean(L, 1); // padding multipart
  317. lua_pushboolean(L, is_push);
  318. return 6;
  319. }
  320. static int
  321. lunpackrequest(lua_State *L) {
  322. int sz;
  323. const char *msg;
  324. if (lua_type(L, 1) == LUA_TLIGHTUSERDATA) {
  325. msg = (const char *)lua_touserdata(L, 1);
  326. sz = luaL_checkinteger(L, 2);
  327. } else {
  328. size_t ssz;
  329. msg = luaL_checklstring(L,1,&ssz);
  330. sz = (int)ssz;
  331. }
  332. if (sz == 0)
  333. return luaL_error(L, "Invalid req package. size == 0");
  334. switch (msg[0]) {
  335. case 0:
  336. return unpackreq_number(L, (const uint8_t *)msg, sz);
  337. case 1:
  338. return unpackmreq_number(L, (const uint8_t *)msg, sz, 0); // request
  339. case '\x41':
  340. return unpackmreq_number(L, (const uint8_t *)msg, sz, 1); // push
  341. case 2:
  342. case 3:
  343. return unpackmreq_part(L, (const uint8_t *)msg, sz);
  344. case 4:
  345. return unpacktrace(L, msg, sz);
  346. case '\x80':
  347. return unpackreq_string(L, (const uint8_t *)msg, sz);
  348. case '\x81':
  349. return unpackmreq_string(L, (const uint8_t *)msg, sz, 0 ); // request
  350. case '\xc1':
  351. return unpackmreq_string(L, (const uint8_t *)msg, sz, 1 ); // push
  352. default:
  353. return luaL_error(L, "Invalid req package type %d", msg[0]);
  354. }
  355. }
  356. /*
  357. The response package :
  358. WORD size (big endian)
  359. DWORD session
  360. BYTE type
  361. 0: error
  362. 1: ok
  363. 2: multi begin
  364. 3: multi part
  365. 4: multi end
  366. PADDING msg
  367. type = 0, error msg
  368. type = 1, msg
  369. type = 2, DWORD size
  370. type = 3/4, msg
  371. */
  372. /*
  373. int session
  374. boolean ok
  375. lightuserdata msg
  376. int sz
  377. return string response
  378. */
  379. static int
  380. lpackresponse(lua_State *L) {
  381. uint32_t session = (uint32_t)luaL_checkinteger(L,1);
  382. // clusterd.lua:command.socket call lpackresponse,
  383. // and the msg/sz is return by skynet.rawcall , so don't free(msg)
  384. int ok = lua_toboolean(L,2);
  385. void * msg;
  386. size_t sz;
  387. if (lua_type(L,3) == LUA_TSTRING) {
  388. msg = (void *)lua_tolstring(L, 3, &sz);
  389. } else {
  390. msg = lua_touserdata(L,3);
  391. sz = (size_t)luaL_checkinteger(L, 4);
  392. }
  393. if (!ok) {
  394. if (sz > MULTI_PART) {
  395. // truncate the error msg if too long
  396. sz = MULTI_PART;
  397. }
  398. } else {
  399. if (sz > MULTI_PART) {
  400. // return
  401. int part = (sz - 1) / MULTI_PART + 1;
  402. lua_createtable(L, part+1, 0);
  403. uint8_t buf[TEMP_LENGTH];
  404. // multi part begin
  405. fill_header(L, buf, 9);
  406. fill_uint32(buf+2, session);
  407. buf[6] = 2;
  408. fill_uint32(buf+7, (uint32_t)sz);
  409. lua_pushlstring(L, (const char *)buf, 11);
  410. lua_rawseti(L, -2, 1);
  411. char * ptr = msg;
  412. int i;
  413. for (i=0;i<part;i++) {
  414. int s;
  415. if (sz > MULTI_PART) {
  416. s = MULTI_PART;
  417. buf[6] = 3;
  418. } else {
  419. s = sz;
  420. buf[6] = 4;
  421. }
  422. fill_header(L, buf, s+5);
  423. fill_uint32(buf+2, session);
  424. memcpy(buf+7,ptr,s);
  425. lua_pushlstring(L, (const char *)buf, s+7);
  426. lua_rawseti(L, -2, i+2);
  427. sz -= s;
  428. ptr += s;
  429. }
  430. return 1;
  431. }
  432. }
  433. uint8_t buf[TEMP_LENGTH];
  434. fill_header(L, buf, sz+5);
  435. fill_uint32(buf+2, session);
  436. buf[6] = ok;
  437. memcpy(buf+7,msg,sz);
  438. lua_pushlstring(L, (const char *)buf, sz+7);
  439. return 1;
  440. }
  441. /*
  442. string packed response
  443. return integer session
  444. boolean ok
  445. string msg
  446. boolean padding
  447. */
  448. static int
  449. lunpackresponse(lua_State *L) {
  450. size_t sz;
  451. const char * buf = luaL_checklstring(L, 1, &sz);
  452. if (sz < 5) {
  453. return 0;
  454. }
  455. uint32_t session = unpack_uint32((const uint8_t *)buf);
  456. lua_pushinteger(L, (lua_Integer)session);
  457. switch(buf[4]) {
  458. case 0: // error
  459. lua_pushboolean(L, 0);
  460. lua_pushlstring(L, buf+5, sz-5);
  461. return 3;
  462. case 1: // ok
  463. case 4: // multi end
  464. lua_pushboolean(L, 1);
  465. lua_pushlstring(L, buf+5, sz-5);
  466. return 3;
  467. case 2: // multi begin
  468. if (sz != 9) {
  469. return 0;
  470. }
  471. sz = unpack_uint32((const uint8_t *)buf+5);
  472. lua_pushboolean(L, 1);
  473. lua_pushinteger(L, sz);
  474. lua_pushboolean(L, 1);
  475. return 4;
  476. case 3: // multi part
  477. lua_pushboolean(L, 1);
  478. lua_pushlstring(L, buf+5, sz-5);
  479. lua_pushboolean(L, 1);
  480. return 4;
  481. default:
  482. return 0;
  483. }
  484. }
  485. /*
  486. table
  487. pointer
  488. sz
  489. push (pointer/sz) as string into table, and free pointer
  490. */
  491. static int
  492. lappend(lua_State *L) {
  493. luaL_checktype(L, 1, LUA_TTABLE);
  494. int n = lua_rawlen(L, 1);
  495. if (lua_isnil(L, 2)) {
  496. lua_settop(L, 3);
  497. lua_seti(L, 1, n + 1);
  498. return 0;
  499. }
  500. void * buffer = lua_touserdata(L, 2);
  501. if (buffer == NULL)
  502. return luaL_error(L, "Need lightuserdata");
  503. int sz = luaL_checkinteger(L, 3);
  504. lua_pushlstring(L, (const char *)buffer, sz);
  505. skynet_free((void *)buffer);
  506. lua_seti(L, 1, n+1);
  507. return 0;
  508. }
  509. static int
  510. lconcat(lua_State *L) {
  511. if (!lua_istable(L,1))
  512. return 0;
  513. if (lua_geti(L,1,1) != LUA_TNUMBER)
  514. return 0;
  515. int sz = lua_tointeger(L,-1);
  516. lua_pop(L,1);
  517. char * buff = skynet_malloc(sz);
  518. int idx = 2;
  519. int offset = 0;
  520. while(lua_geti(L,1,idx) == LUA_TSTRING) {
  521. size_t s;
  522. const char * str = lua_tolstring(L, -1, &s);
  523. if (s+offset > sz) {
  524. skynet_free(buff);
  525. return 0;
  526. }
  527. memcpy(buff+offset, str, s);
  528. lua_pop(L,1);
  529. offset += s;
  530. ++idx;
  531. }
  532. if (offset != sz) {
  533. skynet_free(buff);
  534. return 0;
  535. }
  536. // buff/sz will send to other service, See clusterd.lua
  537. lua_pushlightuserdata(L, buff);
  538. lua_pushinteger(L, sz);
  539. return 2;
  540. }
  541. static int
  542. lisname(lua_State *L) {
  543. const char * name = lua_tostring(L, 1);
  544. if (name && name[0] == '@') {
  545. lua_pushboolean(L, 1);
  546. return 1;
  547. }
  548. return 0;
  549. }
  550. static int
  551. lnodename(lua_State *L) {
  552. pid_t pid = getpid();
  553. char hostname[256];
  554. if (gethostname(hostname, sizeof(hostname))==0) {
  555. int i;
  556. for (i=0; hostname[i]; i++) {
  557. if (hostname[i] <= ' ')
  558. hostname[i] = '_';
  559. }
  560. lua_pushfstring(L, "%s%d", hostname, (int)pid);
  561. } else {
  562. lua_pushfstring(L, "noname%d", (int)pid);
  563. }
  564. return 1;
  565. }
  566. LUAMOD_API int
  567. luaopen_skynet_cluster_core(lua_State *L) {
  568. luaL_Reg l[] = {
  569. { "packrequest", lpackrequest },
  570. { "packpush", lpackpush },
  571. { "packtrace", lpacktrace },
  572. { "unpackrequest", lunpackrequest },
  573. { "packresponse", lpackresponse },
  574. { "unpackresponse", lunpackresponse },
  575. { "append", lappend },
  576. { "concat", lconcat },
  577. { "isname", lisname },
  578. { "nodename", lnodename },
  579. { NULL, NULL },
  580. };
  581. luaL_checkversion(L);
  582. luaL_newlib(L,l);
  583. return 1;
  584. }