skynet_server.c 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836
  1. #include "skynet.h"
  2. #include "skynet_server.h"
  3. #include "skynet_module.h"
  4. #include "skynet_handle.h"
  5. #include "skynet_mq.h"
  6. #include "skynet_timer.h"
  7. #include "skynet_harbor.h"
  8. #include "skynet_env.h"
  9. #include "skynet_monitor.h"
  10. #include "skynet_imp.h"
  11. #include "skynet_log.h"
  12. #include "spinlock.h"
  13. #include "atomic.h"
  14. #include <pthread.h>
  15. #include <string.h>
  16. #include <assert.h>
  17. #include <stdint.h>
  18. #include <stdio.h>
  19. #include <stdbool.h>
  20. #ifdef CALLING_CHECK
  21. #define CHECKCALLING_BEGIN(ctx) if (!(spinlock_trylock(&ctx->calling))) { assert(0); }
  22. #define CHECKCALLING_END(ctx) spinlock_unlock(&ctx->calling);
  23. #define CHECKCALLING_INIT(ctx) spinlock_init(&ctx->calling);
  24. #define CHECKCALLING_DESTROY(ctx) spinlock_destroy(&ctx->calling);
  25. #define CHECKCALLING_DECL struct spinlock calling;
  26. #else
  27. #define CHECKCALLING_BEGIN(ctx)
  28. #define CHECKCALLING_END(ctx)
  29. #define CHECKCALLING_INIT(ctx)
  30. #define CHECKCALLING_DESTROY(ctx)
  31. #define CHECKCALLING_DECL
  32. #endif
  33. struct skynet_context {
  34. void * instance;
  35. struct skynet_module * mod;
  36. void * cb_ud;
  37. skynet_cb cb;
  38. struct message_queue *queue;
  39. ATOM_POINTER logfile;
  40. uint64_t cpu_cost; // in microsec
  41. uint64_t cpu_start; // in microsec
  42. char result[32];
  43. uint32_t handle;
  44. int session_id;
  45. ATOM_INT ref;
  46. int message_count;
  47. bool init;
  48. bool endless;
  49. bool profile;
  50. CHECKCALLING_DECL
  51. };
  52. struct skynet_node {
  53. ATOM_INT total;
  54. int init;
  55. uint32_t monitor_exit;
  56. pthread_key_t handle_key;
  57. bool profile; // default is on
  58. };
  59. static struct skynet_node G_NODE;
  60. int
  61. skynet_context_total() {
  62. return ATOM_LOAD(&G_NODE.total);
  63. }
  64. static void
  65. context_inc() {
  66. ATOM_FINC(&G_NODE.total);
  67. }
  68. static void
  69. context_dec() {
  70. ATOM_FDEC(&G_NODE.total);
  71. }
  72. uint32_t
  73. skynet_current_handle(void) {
  74. if (G_NODE.init) {
  75. void * handle = pthread_getspecific(G_NODE.handle_key);
  76. return (uint32_t)(uintptr_t)handle;
  77. } else {
  78. uint32_t v = (uint32_t)(-THREAD_MAIN);
  79. return v;
  80. }
  81. }
  82. static void
  83. id_to_hex(char * str, uint32_t id) {
  84. int i;
  85. static char hex[16] = { '0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F' };
  86. str[0] = ':';
  87. for (i=0;i<8;i++) {
  88. str[i+1] = hex[(id >> ((7-i) * 4))&0xf];
  89. }
  90. str[9] = '\0';
  91. }
  92. struct drop_t {
  93. uint32_t handle;
  94. };
  95. static void
  96. drop_message(struct skynet_message *msg, void *ud) {
  97. struct drop_t *d = ud;
  98. skynet_free(msg->data);
  99. uint32_t source = d->handle;
  100. assert(source);
  101. // report error to the message source
  102. skynet_send(NULL, source, msg->source, PTYPE_ERROR, msg->session, NULL, 0);
  103. }
  104. struct skynet_context *
  105. skynet_context_new(const char * name, const char *param) {
  106. struct skynet_module * mod = skynet_module_query(name);
  107. if (mod == NULL)
  108. return NULL;
  109. void *inst = skynet_module_instance_create(mod);
  110. if (inst == NULL)
  111. return NULL;
  112. struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
  113. CHECKCALLING_INIT(ctx)
  114. ctx->mod = mod;
  115. ctx->instance = inst;
  116. ATOM_INIT(&ctx->ref , 2);
  117. ctx->cb = NULL;
  118. ctx->cb_ud = NULL;
  119. ctx->session_id = 0;
  120. ATOM_INIT(&ctx->logfile, (uintptr_t)NULL);
  121. ctx->init = false;
  122. ctx->endless = false;
  123. ctx->cpu_cost = 0;
  124. ctx->cpu_start = 0;
  125. ctx->message_count = 0;
  126. ctx->profile = G_NODE.profile;
  127. // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
  128. ctx->handle = 0;
  129. ctx->handle = skynet_handle_register(ctx);
  130. struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
  131. // init function maybe use ctx->handle, so it must init at last
  132. context_inc();
  133. CHECKCALLING_BEGIN(ctx)
  134. int r = skynet_module_instance_init(mod, inst, ctx, param);
  135. CHECKCALLING_END(ctx)
  136. if (r == 0) {
  137. struct skynet_context * ret = skynet_context_release(ctx);
  138. if (ret) {
  139. ctx->init = true;
  140. }
  141. skynet_globalmq_push(queue);
  142. if (ret) {
  143. skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
  144. }
  145. return ret;
  146. } else {
  147. skynet_error(ctx, "FAILED launch %s", name);
  148. uint32_t handle = ctx->handle;
  149. skynet_context_release(ctx);
  150. skynet_handle_retire(handle);
  151. struct drop_t d = { handle };
  152. skynet_mq_release(queue, drop_message, &d);
  153. return NULL;
  154. }
  155. }
  156. int
  157. skynet_context_newsession(struct skynet_context *ctx) {
  158. // session always be a positive number
  159. int session = ++ctx->session_id;
  160. if (session <= 0) {
  161. ctx->session_id = 1;
  162. return 1;
  163. }
  164. return session;
  165. }
  166. void
  167. skynet_context_grab(struct skynet_context *ctx) {
  168. ATOM_FINC(&ctx->ref);
  169. }
  170. void
  171. skynet_context_reserve(struct skynet_context *ctx) {
  172. skynet_context_grab(ctx);
  173. // don't count the context reserved, because skynet abort (the worker threads terminate) only when the total context is 0 .
  174. // the reserved context will be release at last.
  175. context_dec();
  176. }
  177. static void
  178. delete_context(struct skynet_context *ctx) {
  179. FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
  180. if (f) {
  181. fclose(f);
  182. }
  183. skynet_module_instance_release(ctx->mod, ctx->instance);
  184. skynet_mq_mark_release(ctx->queue);
  185. CHECKCALLING_DESTROY(ctx)
  186. skynet_free(ctx);
  187. context_dec();
  188. }
  189. struct skynet_context *
  190. skynet_context_release(struct skynet_context *ctx) {
  191. if (ATOM_FDEC(&ctx->ref) == 1) {
  192. delete_context(ctx);
  193. return NULL;
  194. }
  195. return ctx;
  196. }
  197. int
  198. skynet_context_push(uint32_t handle, struct skynet_message *message) {
  199. struct skynet_context * ctx = skynet_handle_grab(handle);
  200. if (ctx == NULL) {
  201. return -1;
  202. }
  203. skynet_mq_push(ctx->queue, message);
  204. skynet_context_release(ctx);
  205. return 0;
  206. }
  207. void
  208. skynet_context_endless(uint32_t handle) {
  209. struct skynet_context * ctx = skynet_handle_grab(handle);
  210. if (ctx == NULL) {
  211. return;
  212. }
  213. ctx->endless = true;
  214. skynet_context_release(ctx);
  215. }
  216. int
  217. skynet_isremote(struct skynet_context * ctx, uint32_t handle, int * harbor) {
  218. int ret = skynet_harbor_message_isremote(handle);
  219. if (harbor) {
  220. *harbor = (int)(handle >> HANDLE_REMOTE_SHIFT);
  221. }
  222. return ret;
  223. }
  224. static void
  225. dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
  226. assert(ctx->init);
  227. CHECKCALLING_BEGIN(ctx)
  228. pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
  229. int type = msg->sz >> MESSAGE_TYPE_SHIFT;
  230. size_t sz = msg->sz & MESSAGE_TYPE_MASK;
  231. FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
  232. if (f) {
  233. skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);
  234. }
  235. ++ctx->message_count;
  236. int reserve_msg;
  237. if (ctx->profile) {
  238. ctx->cpu_start = skynet_thread_time();
  239. reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
  240. uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
  241. ctx->cpu_cost += cost_time;
  242. } else {
  243. reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
  244. }
  245. if (!reserve_msg) {
  246. skynet_free(msg->data);
  247. }
  248. CHECKCALLING_END(ctx)
  249. }
  250. void
  251. skynet_context_dispatchall(struct skynet_context * ctx) {
  252. // for skynet_error
  253. struct skynet_message msg;
  254. struct message_queue *q = ctx->queue;
  255. while (!skynet_mq_pop(q,&msg)) {
  256. dispatch_message(ctx, &msg);
  257. }
  258. }
  259. struct message_queue *
  260. skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
  261. if (q == NULL) {
  262. q = skynet_globalmq_pop();
  263. if (q==NULL)
  264. return NULL;
  265. }
  266. uint32_t handle = skynet_mq_handle(q);
  267. struct skynet_context * ctx = skynet_handle_grab(handle);
  268. if (ctx == NULL) {
  269. struct drop_t d = { handle };
  270. skynet_mq_release(q, drop_message, &d);
  271. return skynet_globalmq_pop();
  272. }
  273. int i,n=1;
  274. struct skynet_message msg;
  275. for (i=0;i<n;i++) {
  276. if (skynet_mq_pop(q,&msg)) {
  277. skynet_context_release(ctx);
  278. return skynet_globalmq_pop();
  279. } else if (i==0 && weight >= 0) {
  280. n = skynet_mq_length(q);
  281. n >>= weight;
  282. }
  283. int overload = skynet_mq_overload(q);
  284. if (overload) {
  285. skynet_error(ctx, "May overload, message queue length = %d", overload);
  286. }
  287. skynet_monitor_trigger(sm, msg.source , handle);
  288. if (ctx->cb == NULL) {
  289. skynet_free(msg.data);
  290. } else {
  291. dispatch_message(ctx, &msg);
  292. }
  293. skynet_monitor_trigger(sm, 0,0);
  294. }
  295. assert(q == ctx->queue);
  296. struct message_queue *nq = skynet_globalmq_pop();
  297. if (nq) {
  298. // If global mq is not empty , push q back, and return next queue (nq)
  299. // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
  300. skynet_globalmq_push(q);
  301. q = nq;
  302. }
  303. skynet_context_release(ctx);
  304. return q;
  305. }
  306. static void
  307. copy_name(char name[GLOBALNAME_LENGTH], const char * addr) {
  308. int i;
  309. for (i=0;i<GLOBALNAME_LENGTH && addr[i];i++) {
  310. name[i] = addr[i];
  311. }
  312. for (;i<GLOBALNAME_LENGTH;i++) {
  313. name[i] = '\0';
  314. }
  315. }
  316. uint32_t
  317. skynet_queryname(struct skynet_context * context, const char * name) {
  318. switch(name[0]) {
  319. case ':':
  320. return strtoul(name+1,NULL,16);
  321. case '.':
  322. return skynet_handle_findname(name + 1);
  323. }
  324. skynet_error(context, "Don't support query global name %s",name);
  325. return 0;
  326. }
  327. static void
  328. handle_exit(struct skynet_context * context, uint32_t handle) {
  329. if (handle == 0) {
  330. handle = context->handle;
  331. skynet_error(context, "KILL self");
  332. } else {
  333. skynet_error(context, "KILL :%0x", handle);
  334. }
  335. if (G_NODE.monitor_exit) {
  336. skynet_send(context, handle, G_NODE.monitor_exit, PTYPE_CLIENT, 0, NULL, 0);
  337. }
  338. skynet_handle_retire(handle);
  339. }
  340. // skynet command
  341. struct command_func {
  342. const char *name;
  343. const char * (*func)(struct skynet_context * context, const char * param);
  344. };
  345. static const char *
  346. cmd_timeout(struct skynet_context * context, const char * param) {
  347. char * session_ptr = NULL;
  348. int ti = strtol(param, &session_ptr, 10);
  349. int session = skynet_context_newsession(context);
  350. skynet_timeout(context->handle, ti, session);
  351. sprintf(context->result, "%d", session);
  352. return context->result;
  353. }
  354. static const char *
  355. cmd_reg(struct skynet_context * context, const char * param) {
  356. if (param == NULL || param[0] == '\0') {
  357. sprintf(context->result, ":%x", context->handle);
  358. return context->result;
  359. } else if (param[0] == '.') {
  360. return skynet_handle_namehandle(context->handle, param + 1);
  361. } else {
  362. skynet_error(context, "Can't register global name %s in C", param);
  363. return NULL;
  364. }
  365. }
  366. static const char *
  367. cmd_query(struct skynet_context * context, const char * param) {
  368. if (param[0] == '.') {
  369. uint32_t handle = skynet_handle_findname(param+1);
  370. if (handle) {
  371. sprintf(context->result, ":%x", handle);
  372. return context->result;
  373. }
  374. }
  375. return NULL;
  376. }
  377. static const char *
  378. cmd_name(struct skynet_context * context, const char * param) {
  379. int size = strlen(param);
  380. char name[size+1];
  381. char handle[size+1];
  382. sscanf(param,"%s %s",name,handle);
  383. if (handle[0] != ':') {
  384. return NULL;
  385. }
  386. uint32_t handle_id = strtoul(handle+1, NULL, 16);
  387. if (handle_id == 0) {
  388. return NULL;
  389. }
  390. if (name[0] == '.') {
  391. return skynet_handle_namehandle(handle_id, name + 1);
  392. } else {
  393. skynet_error(context, "Can't set global name %s in C", name);
  394. }
  395. return NULL;
  396. }
  397. static const char *
  398. cmd_exit(struct skynet_context * context, const char * param) {
  399. handle_exit(context, 0);
  400. return NULL;
  401. }
  402. static uint32_t
  403. tohandle(struct skynet_context * context, const char * param) {
  404. uint32_t handle = 0;
  405. if (param[0] == ':') {
  406. handle = strtoul(param+1, NULL, 16);
  407. } else if (param[0] == '.') {
  408. handle = skynet_handle_findname(param+1);
  409. } else {
  410. skynet_error(context, "Can't convert %s to handle",param);
  411. }
  412. return handle;
  413. }
  414. static const char *
  415. cmd_kill(struct skynet_context * context, const char * param) {
  416. uint32_t handle = tohandle(context, param);
  417. if (handle) {
  418. handle_exit(context, handle);
  419. }
  420. return NULL;
  421. }
  422. static const char *
  423. cmd_launch(struct skynet_context * context, const char * param) {
  424. size_t sz = strlen(param);
  425. char tmp[sz+1];
  426. strcpy(tmp,param);
  427. char * args = tmp;
  428. char * mod = strsep(&args, " \t\r\n");
  429. args = strsep(&args, "\r\n");
  430. struct skynet_context * inst = skynet_context_new(mod,args);
  431. if (inst == NULL) {
  432. return NULL;
  433. } else {
  434. id_to_hex(context->result, inst->handle);
  435. return context->result;
  436. }
  437. }
  438. static const char *
  439. cmd_getenv(struct skynet_context * context, const char * param) {
  440. return skynet_getenv(param);
  441. }
  442. static const char *
  443. cmd_setenv(struct skynet_context * context, const char * param) {
  444. size_t sz = strlen(param);
  445. char key[sz+1];
  446. int i;
  447. for (i=0;param[i] != ' ' && param[i];i++) {
  448. key[i] = param[i];
  449. }
  450. if (param[i] == '\0')
  451. return NULL;
  452. key[i] = '\0';
  453. param += i+1;
  454. skynet_setenv(key,param);
  455. return NULL;
  456. }
  457. static const char *
  458. cmd_starttime(struct skynet_context * context, const char * param) {
  459. uint32_t sec = skynet_starttime();
  460. sprintf(context->result,"%u",sec);
  461. return context->result;
  462. }
  463. static const char *
  464. cmd_abort(struct skynet_context * context, const char * param) {
  465. skynet_handle_retireall();
  466. return NULL;
  467. }
  468. static const char *
  469. cmd_monitor(struct skynet_context * context, const char * param) {
  470. uint32_t handle=0;
  471. if (param == NULL || param[0] == '\0') {
  472. if (G_NODE.monitor_exit) {
  473. // return current monitor serivce
  474. sprintf(context->result, ":%x", G_NODE.monitor_exit);
  475. return context->result;
  476. }
  477. return NULL;
  478. } else {
  479. handle = tohandle(context, param);
  480. }
  481. G_NODE.monitor_exit = handle;
  482. return NULL;
  483. }
  484. static const char *
  485. cmd_stat(struct skynet_context * context, const char * param) {
  486. if (strcmp(param, "mqlen") == 0) {
  487. int len = skynet_mq_length(context->queue);
  488. sprintf(context->result, "%d", len);
  489. } else if (strcmp(param, "endless") == 0) {
  490. if (context->endless) {
  491. strcpy(context->result, "1");
  492. context->endless = false;
  493. } else {
  494. strcpy(context->result, "0");
  495. }
  496. } else if (strcmp(param, "cpu") == 0) {
  497. double t = (double)context->cpu_cost / 1000000.0; // microsec
  498. sprintf(context->result, "%lf", t);
  499. } else if (strcmp(param, "time") == 0) {
  500. if (context->profile) {
  501. uint64_t ti = skynet_thread_time() - context->cpu_start;
  502. double t = (double)ti / 1000000.0; // microsec
  503. sprintf(context->result, "%lf", t);
  504. } else {
  505. strcpy(context->result, "0");
  506. }
  507. } else if (strcmp(param, "message") == 0) {
  508. sprintf(context->result, "%d", context->message_count);
  509. } else {
  510. context->result[0] = '\0';
  511. }
  512. return context->result;
  513. }
  514. static const char *
  515. cmd_logon(struct skynet_context * context, const char * param) {
  516. uint32_t handle = tohandle(context, param);
  517. if (handle == 0)
  518. return NULL;
  519. struct skynet_context * ctx = skynet_handle_grab(handle);
  520. if (ctx == NULL)
  521. return NULL;
  522. FILE *f = NULL;
  523. FILE * lastf = (FILE *)ATOM_LOAD(&ctx->logfile);
  524. if (lastf == NULL) {
  525. f = skynet_log_open(context, handle);
  526. if (f) {
  527. if (!ATOM_CAS_POINTER(&ctx->logfile, 0, (uintptr_t)f)) {
  528. // logfile opens in other thread, close this one.
  529. fclose(f);
  530. }
  531. }
  532. }
  533. skynet_context_release(ctx);
  534. return NULL;
  535. }
  536. static const char *
  537. cmd_logoff(struct skynet_context * context, const char * param) {
  538. uint32_t handle = tohandle(context, param);
  539. if (handle == 0)
  540. return NULL;
  541. struct skynet_context * ctx = skynet_handle_grab(handle);
  542. if (ctx == NULL)
  543. return NULL;
  544. FILE * f = (FILE *)ATOM_LOAD(&ctx->logfile);
  545. if (f) {
  546. // logfile may close in other thread
  547. if (ATOM_CAS_POINTER(&ctx->logfile, (uintptr_t)f, (uintptr_t)NULL)) {
  548. skynet_log_close(context, f, handle);
  549. }
  550. }
  551. skynet_context_release(ctx);
  552. return NULL;
  553. }
  554. static const char *
  555. cmd_signal(struct skynet_context * context, const char * param) {
  556. uint32_t handle = tohandle(context, param);
  557. if (handle == 0)
  558. return NULL;
  559. struct skynet_context * ctx = skynet_handle_grab(handle);
  560. if (ctx == NULL)
  561. return NULL;
  562. param = strchr(param, ' ');
  563. int sig = 0;
  564. if (param) {
  565. sig = strtol(param, NULL, 0);
  566. }
  567. // NOTICE: the signal function should be thread safe.
  568. skynet_module_instance_signal(ctx->mod, ctx->instance, sig);
  569. skynet_context_release(ctx);
  570. return NULL;
  571. }
  572. static struct command_func cmd_funcs[] = {
  573. { "TIMEOUT", cmd_timeout },
  574. { "REG", cmd_reg },
  575. { "QUERY", cmd_query },
  576. { "NAME", cmd_name },
  577. { "EXIT", cmd_exit },
  578. { "KILL", cmd_kill },
  579. { "LAUNCH", cmd_launch },
  580. { "GETENV", cmd_getenv },
  581. { "SETENV", cmd_setenv },
  582. { "STARTTIME", cmd_starttime },
  583. { "ABORT", cmd_abort },
  584. { "MONITOR", cmd_monitor },
  585. { "STAT", cmd_stat },
  586. { "LOGON", cmd_logon },
  587. { "LOGOFF", cmd_logoff },
  588. { "SIGNAL", cmd_signal },
  589. { NULL, NULL },
  590. };
  591. const char *
  592. skynet_command(struct skynet_context * context, const char * cmd , const char * param) {
  593. struct command_func * method = &cmd_funcs[0];
  594. while(method->name) {
  595. if (strcmp(cmd, method->name) == 0) {
  596. return method->func(context, param);
  597. }
  598. ++method;
  599. }
  600. return NULL;
  601. }
  602. static void
  603. _filter_args(struct skynet_context * context, int type, int *session, void ** data, size_t * sz) {
  604. int needcopy = !(type & PTYPE_TAG_DONTCOPY);
  605. int allocsession = type & PTYPE_TAG_ALLOCSESSION;
  606. type &= 0xff;
  607. if (allocsession) {
  608. assert(*session == 0);
  609. *session = skynet_context_newsession(context);
  610. }
  611. if (needcopy && *data) {
  612. char * msg = skynet_malloc(*sz+1);
  613. memcpy(msg, *data, *sz);
  614. msg[*sz] = '\0';
  615. *data = msg;
  616. }
  617. *sz |= (size_t)type << MESSAGE_TYPE_SHIFT;
  618. }
  619. int
  620. skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
  621. if ((sz & MESSAGE_TYPE_MASK) != sz) {
  622. skynet_error(context, "The message to %x is too large", destination);
  623. if (type & PTYPE_TAG_DONTCOPY) {
  624. skynet_free(data);
  625. }
  626. return -2;
  627. }
  628. _filter_args(context, type, &session, (void **)&data, &sz);
  629. if (source == 0) {
  630. source = context->handle;
  631. }
  632. if (destination == 0) {
  633. if (data) {
  634. skynet_error(context, "Destination address can't be 0");
  635. skynet_free(data);
  636. return -1;
  637. }
  638. return session;
  639. }
  640. if (skynet_harbor_message_isremote(destination)) {
  641. struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
  642. rmsg->destination.handle = destination;
  643. rmsg->message = data;
  644. rmsg->sz = sz & MESSAGE_TYPE_MASK;
  645. rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
  646. skynet_harbor_send(rmsg, source, session);
  647. } else {
  648. struct skynet_message smsg;
  649. smsg.source = source;
  650. smsg.session = session;
  651. smsg.data = data;
  652. smsg.sz = sz;
  653. if (skynet_context_push(destination, &smsg)) {
  654. skynet_free(data);
  655. return -1;
  656. }
  657. }
  658. return session;
  659. }
  660. int
  661. skynet_sendname(struct skynet_context * context, uint32_t source, const char * addr , int type, int session, void * data, size_t sz) {
  662. if (source == 0) {
  663. source = context->handle;
  664. }
  665. uint32_t des = 0;
  666. if (addr[0] == ':') {
  667. des = strtoul(addr+1, NULL, 16);
  668. } else if (addr[0] == '.') {
  669. des = skynet_handle_findname(addr + 1);
  670. if (des == 0) {
  671. if (type & PTYPE_TAG_DONTCOPY) {
  672. skynet_free(data);
  673. }
  674. return -1;
  675. }
  676. } else {
  677. if ((sz & MESSAGE_TYPE_MASK) != sz) {
  678. skynet_error(context, "The message to %s is too large", addr);
  679. if (type & PTYPE_TAG_DONTCOPY) {
  680. skynet_free(data);
  681. }
  682. return -2;
  683. }
  684. _filter_args(context, type, &session, (void **)&data, &sz);
  685. struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
  686. copy_name(rmsg->destination.name, addr);
  687. rmsg->destination.handle = 0;
  688. rmsg->message = data;
  689. rmsg->sz = sz & MESSAGE_TYPE_MASK;
  690. rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
  691. skynet_harbor_send(rmsg, source, session);
  692. return session;
  693. }
  694. return skynet_send(context, source, des, type, session, data, sz);
  695. }
  696. uint32_t
  697. skynet_context_handle(struct skynet_context *ctx) {
  698. return ctx->handle;
  699. }
  700. void
  701. skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
  702. context->cb = cb;
  703. context->cb_ud = ud;
  704. }
  705. void
  706. skynet_context_send(struct skynet_context * ctx, void * msg, size_t sz, uint32_t source, int type, int session) {
  707. struct skynet_message smsg;
  708. smsg.source = source;
  709. smsg.session = session;
  710. smsg.data = msg;
  711. smsg.sz = sz | (size_t)type << MESSAGE_TYPE_SHIFT;
  712. skynet_mq_push(ctx->queue, &smsg);
  713. }
  714. void
  715. skynet_globalinit(void) {
  716. ATOM_INIT(&G_NODE.total , 0);
  717. G_NODE.monitor_exit = 0;
  718. G_NODE.init = 1;
  719. if (pthread_key_create(&G_NODE.handle_key, NULL)) {
  720. fprintf(stderr, "pthread_key_create failed");
  721. exit(1);
  722. }
  723. // set mainthread's key
  724. skynet_initthread(THREAD_MAIN);
  725. }
  726. void
  727. skynet_globalexit(void) {
  728. pthread_key_delete(G_NODE.handle_key);
  729. }
  730. void
  731. skynet_initthread(int m) {
  732. uintptr_t v = (uint32_t)(-m);
  733. pthread_setspecific(G_NODE.handle_key, (void *)v);
  734. }
  735. void
  736. skynet_profile_enable(int enable) {
  737. G_NODE.profile = (bool)enable;
  738. }