123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638 |
- #define LUA_LIB
- #include <lua.h>
- #include <lauxlib.h>
- #include <string.h>
- #include <assert.h>
- #include <unistd.h>
- #include "skynet.h"
- /*
- uint32_t/string addr
- uint32_t/session session
- lightuserdata msg
- uint32_t sz
- return
- string request
- uint32_t next_session
- */
- #define TEMP_LENGTH 0x8200
- #define MULTI_PART 0x8000
- static void
- fill_uint32(uint8_t * buf, uint32_t n) {
- buf[0] = n & 0xff;
- buf[1] = (n >> 8) & 0xff;
- buf[2] = (n >> 16) & 0xff;
- buf[3] = (n >> 24) & 0xff;
- }
- static void
- fill_header(lua_State *L, uint8_t *buf, int sz) {
- assert(sz < 0x10000);
- buf[0] = (sz >> 8) & 0xff;
- buf[1] = sz & 0xff;
- }
- /*
- The request package :
- first WORD is size of the package with big-endian
- DWORD in content is small-endian
- size <= 0x8000 (32K) and address is id
- WORD sz+9
- BYTE 0
- DWORD addr
- DWORD session
- PADDING msg(sz)
- size > 0x8000 and address is id
- WORD 13
- BYTE 1 ; multireq , 0x41: multi push
- DWORD addr
- DWORD session
- DWORD sz
- size <= 0x8000 (32K) and address is string
- WORD sz+6+namelen
- BYTE 0x80
- BYTE namelen
- STRING name
- DWORD session
- PADDING msg(sz)
- size > 0x8000 and address is string
- WORD 10 + namelen
- BYTE 0x81 ; 0xc1 : multi push
- BYTE namelen
- STRING name
- DWORD session
- DWORD sz
- multi req
- WORD sz + 5
- BYTE 2/3 ; 2:multipart, 3:multipart end
- DWORD SESSION
- PADDING msgpart(sz)
- trace
- WORD stringsz + 1
- BYTE 4
- STRING tag
- */
- static int
- packreq_number(lua_State *L, int session, void * msg, uint32_t sz, int is_push) {
- uint32_t addr = (uint32_t)lua_tointeger(L,1);
- uint8_t buf[TEMP_LENGTH];
- if (sz < MULTI_PART) {
- fill_header(L, buf, sz+9);
- buf[2] = 0;
- fill_uint32(buf+3, addr);
- fill_uint32(buf+7, is_push ? 0 : (uint32_t)session);
- memcpy(buf+11,msg,sz);
- lua_pushlstring(L, (const char *)buf, sz+11);
- return 0;
- } else {
- int part = (sz - 1) / MULTI_PART + 1;
- fill_header(L, buf, 13);
- buf[2] = is_push ? 0x41 : 1; // multi push or request
- fill_uint32(buf+3, addr);
- fill_uint32(buf+7, (uint32_t)session);
- fill_uint32(buf+11, sz);
- lua_pushlstring(L, (const char *)buf, 15);
- return part;
- }
- }
- static int
- packreq_string(lua_State *L, int session, void * msg, uint32_t sz, int is_push) {
- size_t namelen = 0;
- const char *name = lua_tolstring(L, 1, &namelen);
- if (name == NULL || namelen < 1 || namelen > 255) {
- skynet_free(msg);
- if (name == NULL) {
- luaL_error(L, "name is not a string, it's a %s", lua_typename(L, lua_type(L, 1)));
- } else {
- luaL_error(L, "name is too long %s", name);
- }
- }
- uint8_t buf[TEMP_LENGTH];
- if (sz < MULTI_PART) {
- fill_header(L, buf, sz+6+namelen);
- buf[2] = 0x80;
- buf[3] = (uint8_t)namelen;
- memcpy(buf+4, name, namelen);
- fill_uint32(buf+4+namelen, is_push ? 0 : (uint32_t)session);
- memcpy(buf+8+namelen,msg,sz);
- lua_pushlstring(L, (const char *)buf, sz+8+namelen);
- return 0;
- } else {
- int part = (sz - 1) / MULTI_PART + 1;
- fill_header(L, buf, 10+namelen);
- buf[2] = is_push ? 0xc1 : 0x81; // multi push or request
- buf[3] = (uint8_t)namelen;
- memcpy(buf+4, name, namelen);
- fill_uint32(buf+4+namelen, (uint32_t)session);
- fill_uint32(buf+8+namelen, sz);
- lua_pushlstring(L, (const char *)buf, 12+namelen);
- return part;
- }
- }
- static void
- packreq_multi(lua_State *L, int session, void * msg, uint32_t sz) {
- uint8_t buf[TEMP_LENGTH];
- int part = (sz - 1) / MULTI_PART + 1;
- int i;
- char *ptr = msg;
- for (i=0;i<part;i++) {
- uint32_t s;
- if (sz > MULTI_PART) {
- s = MULTI_PART;
- buf[2] = 2;
- } else {
- s = sz;
- buf[2] = 3; // the last multi part
- }
- fill_header(L, buf, s+5);
- fill_uint32(buf+3, (uint32_t)session);
- memcpy(buf+7, ptr, s);
- lua_pushlstring(L, (const char *)buf, s+7);
- lua_rawseti(L, -2, i+1);
- sz -= s;
- ptr += s;
- }
- }
- static int
- packrequest(lua_State *L, int is_push) {
- void *msg = lua_touserdata(L,3);
- if (msg == NULL) {
- return luaL_error(L, "Invalid request message");
- }
- uint32_t sz = (uint32_t)luaL_checkinteger(L,4);
- int session = luaL_checkinteger(L,2);
- if (session <= 0) {
- skynet_free(msg);
- return luaL_error(L, "Invalid request session %d", session);
- }
- int addr_type = lua_type(L,1);
- int multipak;
- if (addr_type == LUA_TNUMBER) {
- multipak = packreq_number(L, session, msg, sz, is_push);
- } else {
- multipak = packreq_string(L, session, msg, sz, is_push);
- }
- uint32_t new_session = (uint32_t)session + 1;
- if (new_session > INT32_MAX) {
- new_session = 1;
- }
- lua_pushinteger(L, new_session);
- if (multipak) {
- lua_createtable(L, multipak, 0);
- packreq_multi(L, session, msg, sz);
- skynet_free(msg);
- return 3;
- } else {
- skynet_free(msg);
- return 2;
- }
- }
- static int
- lpackrequest(lua_State *L) {
- return packrequest(L, 0);
- }
- static int
- lpackpush(lua_State *L) {
- return packrequest(L, 1);
- }
- static int
- lpacktrace(lua_State *L) {
- size_t sz;
- const char * tag = luaL_checklstring(L, 1, &sz);
- if (sz > 0x8000) {
- return luaL_error(L, "trace tag is too long : %d", (int) sz);
- }
- uint8_t buf[TEMP_LENGTH];
- buf[2] = 4;
- fill_header(L, buf, sz+1);
- memcpy(buf+3, tag, sz);
- lua_pushlstring(L, (const char *)buf, sz+3);
- return 1;
- }
- /*
- string packed message
- return
- uint32_t or string addr
- int session
- lightuserdata msg
- int sz
- boolean padding
- boolean is_push
- */
- static inline uint32_t
- unpack_uint32(const uint8_t * buf) {
- return buf[0] | buf[1]<<8 | buf[2]<<16 | buf[3]<<24;
- }
- static void
- return_buffer(lua_State *L, const char * buffer, int sz) {
- void * ptr = skynet_malloc(sz);
- memcpy(ptr, buffer, sz);
- lua_pushlightuserdata(L, ptr);
- lua_pushinteger(L, sz);
- }
- static int
- unpackreq_number(lua_State *L, const uint8_t * buf, int sz) {
- if (sz < 9) {
- return luaL_error(L, "Invalid cluster message (size=%d)", sz);
- }
- uint32_t address = unpack_uint32(buf+1);
- uint32_t session = unpack_uint32(buf+5);
- lua_pushinteger(L, address);
- lua_pushinteger(L, session);
- return_buffer(L, (const char *)buf+9, sz-9);
- if (session == 0) {
- lua_pushnil(L);
- lua_pushboolean(L,1); // is_push, no reponse
- return 6;
- }
- return 4;
- }
- static int
- unpackmreq_number(lua_State *L, const uint8_t * buf, int sz, int is_push) {
- if (sz != 13) {
- return luaL_error(L, "Invalid cluster message size %d (multi req must be 13)", sz);
- }
- uint32_t address = unpack_uint32(buf+1);
- uint32_t session = unpack_uint32(buf+5);
- uint32_t size = unpack_uint32(buf+9);
- lua_pushinteger(L, address);
- lua_pushinteger(L, session);
- lua_pushnil(L);
- lua_pushinteger(L, size);
- lua_pushboolean(L, 1); // padding multi part
- lua_pushboolean(L, is_push);
- return 6;
- }
- static int
- unpackmreq_part(lua_State *L, const uint8_t * buf, int sz) {
- if (sz < 5) {
- return luaL_error(L, "Invalid cluster multi part message");
- }
- int padding = (buf[0] == 2);
- uint32_t session = unpack_uint32(buf+1);
- lua_pushboolean(L, 0); // no address
- lua_pushinteger(L, session);
- return_buffer(L, (const char *)buf+5, sz-5);
- lua_pushboolean(L, padding);
- return 5;
- }
- static int
- unpacktrace(lua_State *L, const char * buf, int sz) {
- lua_pushlstring(L, buf + 1, sz - 1);
- return 1;
- }
- static int
- unpackreq_string(lua_State *L, const uint8_t * buf, int sz) {
- if (sz < 2) {
- return luaL_error(L, "Invalid cluster message (size=%d)", sz);
- }
- size_t namesz = buf[1];
- if (sz < namesz + 6) {
- return luaL_error(L, "Invalid cluster message (size=%d)", sz);
- }
- lua_pushlstring(L, (const char *)buf+2, namesz);
- uint32_t session = unpack_uint32(buf + namesz + 2);
- lua_pushinteger(L, (uint32_t)session);
- return_buffer(L, (const char *)buf+2+namesz+4, sz - namesz - 6);
- if (session == 0) {
- lua_pushnil(L);
- lua_pushboolean(L,1); // is_push, no reponse
- return 6;
- }
- return 4;
- }
- static int
- unpackmreq_string(lua_State *L, const uint8_t * buf, int sz, int is_push) {
- if (sz < 2) {
- return luaL_error(L, "Invalid cluster message (size=%d)", sz);
- }
- size_t namesz = buf[1];
- if (sz < namesz + 10) {
- return luaL_error(L, "Invalid cluster message (size=%d)", sz);
- }
- lua_pushlstring(L, (const char *)buf+2, namesz);
- uint32_t session = unpack_uint32(buf + namesz + 2);
- uint32_t size = unpack_uint32(buf + namesz + 6);
- lua_pushinteger(L, session);
- lua_pushnil(L);
- lua_pushinteger(L, size);
- lua_pushboolean(L, 1); // padding multipart
- lua_pushboolean(L, is_push);
- return 6;
- }
- static int
- lunpackrequest(lua_State *L) {
- int sz;
- const char *msg;
- if (lua_type(L, 1) == LUA_TLIGHTUSERDATA) {
- msg = (const char *)lua_touserdata(L, 1);
- sz = luaL_checkinteger(L, 2);
- } else {
- size_t ssz;
- msg = luaL_checklstring(L,1,&ssz);
- sz = (int)ssz;
- }
- if (sz == 0)
- return luaL_error(L, "Invalid req package. size == 0");
- switch (msg[0]) {
- case 0:
- return unpackreq_number(L, (const uint8_t *)msg, sz);
- case 1:
- return unpackmreq_number(L, (const uint8_t *)msg, sz, 0); // request
- case '\x41':
- return unpackmreq_number(L, (const uint8_t *)msg, sz, 1); // push
- case 2:
- case 3:
- return unpackmreq_part(L, (const uint8_t *)msg, sz);
- case 4:
- return unpacktrace(L, msg, sz);
- case '\x80':
- return unpackreq_string(L, (const uint8_t *)msg, sz);
- case '\x81':
- return unpackmreq_string(L, (const uint8_t *)msg, sz, 0 ); // request
- case '\xc1':
- return unpackmreq_string(L, (const uint8_t *)msg, sz, 1 ); // push
- default:
- return luaL_error(L, "Invalid req package type %d", msg[0]);
- }
- }
- /*
- The response package :
- WORD size (big endian)
- DWORD session
- BYTE type
- 0: error
- 1: ok
- 2: multi begin
- 3: multi part
- 4: multi end
- PADDING msg
- type = 0, error msg
- type = 1, msg
- type = 2, DWORD size
- type = 3/4, msg
- */
- /*
- int session
- boolean ok
- lightuserdata msg
- int sz
- return string response
- */
- static int
- lpackresponse(lua_State *L) {
- uint32_t session = (uint32_t)luaL_checkinteger(L,1);
- // clusterd.lua:command.socket call lpackresponse,
- // and the msg/sz is return by skynet.rawcall , so don't free(msg)
- int ok = lua_toboolean(L,2);
- void * msg;
- size_t sz;
-
- if (lua_type(L,3) == LUA_TSTRING) {
- msg = (void *)lua_tolstring(L, 3, &sz);
- } else {
- msg = lua_touserdata(L,3);
- sz = (size_t)luaL_checkinteger(L, 4);
- }
- if (!ok) {
- if (sz > MULTI_PART) {
- // truncate the error msg if too long
- sz = MULTI_PART;
- }
- } else {
- if (sz > MULTI_PART) {
- // return
- int part = (sz - 1) / MULTI_PART + 1;
- lua_createtable(L, part+1, 0);
- uint8_t buf[TEMP_LENGTH];
- // multi part begin
- fill_header(L, buf, 9);
- fill_uint32(buf+2, session);
- buf[6] = 2;
- fill_uint32(buf+7, (uint32_t)sz);
- lua_pushlstring(L, (const char *)buf, 11);
- lua_rawseti(L, -2, 1);
- char * ptr = msg;
- int i;
- for (i=0;i<part;i++) {
- int s;
- if (sz > MULTI_PART) {
- s = MULTI_PART;
- buf[6] = 3;
- } else {
- s = sz;
- buf[6] = 4;
- }
- fill_header(L, buf, s+5);
- fill_uint32(buf+2, session);
- memcpy(buf+7,ptr,s);
- lua_pushlstring(L, (const char *)buf, s+7);
- lua_rawseti(L, -2, i+2);
- sz -= s;
- ptr += s;
- }
- return 1;
- }
- }
- uint8_t buf[TEMP_LENGTH];
- fill_header(L, buf, sz+5);
- fill_uint32(buf+2, session);
- buf[6] = ok;
- memcpy(buf+7,msg,sz);
- lua_pushlstring(L, (const char *)buf, sz+7);
- return 1;
- }
- /*
- string packed response
- return integer session
- boolean ok
- string msg
- boolean padding
- */
- static int
- lunpackresponse(lua_State *L) {
- size_t sz;
- const char * buf = luaL_checklstring(L, 1, &sz);
- if (sz < 5) {
- return 0;
- }
- uint32_t session = unpack_uint32((const uint8_t *)buf);
- lua_pushinteger(L, (lua_Integer)session);
- switch(buf[4]) {
- case 0: // error
- lua_pushboolean(L, 0);
- lua_pushlstring(L, buf+5, sz-5);
- return 3;
- case 1: // ok
- case 4: // multi end
- lua_pushboolean(L, 1);
- lua_pushlstring(L, buf+5, sz-5);
- return 3;
- case 2: // multi begin
- if (sz != 9) {
- return 0;
- }
- sz = unpack_uint32((const uint8_t *)buf+5);
- lua_pushboolean(L, 1);
- lua_pushinteger(L, sz);
- lua_pushboolean(L, 1);
- return 4;
- case 3: // multi part
- lua_pushboolean(L, 1);
- lua_pushlstring(L, buf+5, sz-5);
- lua_pushboolean(L, 1);
- return 4;
- default:
- return 0;
- }
- }
- /*
- table
- pointer
- sz
- push (pointer/sz) as string into table, and free pointer
- */
- static int
- lappend(lua_State *L) {
- luaL_checktype(L, 1, LUA_TTABLE);
- int n = lua_rawlen(L, 1);
- if (lua_isnil(L, 2)) {
- lua_settop(L, 3);
- lua_seti(L, 1, n + 1);
- return 0;
- }
- void * buffer = lua_touserdata(L, 2);
- if (buffer == NULL)
- return luaL_error(L, "Need lightuserdata");
- int sz = luaL_checkinteger(L, 3);
- lua_pushlstring(L, (const char *)buffer, sz);
- skynet_free((void *)buffer);
- lua_seti(L, 1, n+1);
- return 0;
- }
- static int
- lconcat(lua_State *L) {
- if (!lua_istable(L,1))
- return 0;
- if (lua_geti(L,1,1) != LUA_TNUMBER)
- return 0;
- int sz = lua_tointeger(L,-1);
- lua_pop(L,1);
- char * buff = skynet_malloc(sz);
- int idx = 2;
- int offset = 0;
- while(lua_geti(L,1,idx) == LUA_TSTRING) {
- size_t s;
- const char * str = lua_tolstring(L, -1, &s);
- if (s+offset > sz) {
- skynet_free(buff);
- return 0;
- }
- memcpy(buff+offset, str, s);
- lua_pop(L,1);
- offset += s;
- ++idx;
- }
- if (offset != sz) {
- skynet_free(buff);
- return 0;
- }
- // buff/sz will send to other service, See clusterd.lua
- lua_pushlightuserdata(L, buff);
- lua_pushinteger(L, sz);
- return 2;
- }
- static int
- lisname(lua_State *L) {
- const char * name = lua_tostring(L, 1);
- if (name && name[0] == '@') {
- lua_pushboolean(L, 1);
- return 1;
- }
- return 0;
- }
- static int
- lnodename(lua_State *L) {
- pid_t pid = getpid();
- char hostname[256];
- if (gethostname(hostname, sizeof(hostname))==0) {
- int i;
- for (i=0; hostname[i]; i++) {
- if (hostname[i] <= ' ')
- hostname[i] = '_';
- }
- lua_pushfstring(L, "%s%d", hostname, (int)pid);
- } else {
- lua_pushfstring(L, "noname%d", (int)pid);
- }
- return 1;
- }
- LUAMOD_API int
- luaopen_skynet_cluster_core(lua_State *L) {
- luaL_Reg l[] = {
- { "packrequest", lpackrequest },
- { "packpush", lpackpush },
- { "packtrace", lpacktrace },
- { "unpackrequest", lunpackrequest },
- { "packresponse", lpackresponse },
- { "unpackresponse", lunpackresponse },
- { "append", lappend },
- { "concat", lconcat },
- { "isname", lisname },
- { "nodename", lnodename },
- { NULL, NULL },
- };
- luaL_checkversion(L);
- luaL_newlib(L,l);
- return 1;
- }
|