|
- #include "skynet.h"
- #include "skynet_server.h"
- #include "skynet_module.h"
- #include "skynet_handle.h"
- #include "skynet_mq.h"
- #include "skynet_timer.h"
- #include "skynet_harbor.h"
- #include "skynet_env.h"
- #include "skynet_monitor.h"
- #include "skynet_imp.h"
- #include "skynet_log.h"
- #include "spinlock.h"
- #include "atomic.h"
- #include <pthread.h>
- #include <string.h>
- #include <assert.h>
- #include <stdint.h>
- #include <stdio.h>
- #include <stdbool.h>
- #ifdef CALLING_CHECK
- #define CHECKCALLING_BEGIN(ctx) if (!(spinlock_trylock(&ctx->calling))) { assert(0); }
- #define CHECKCALLING_END(ctx) spinlock_unlock(&ctx->calling);
- #define CHECKCALLING_INIT(ctx) spinlock_init(&ctx->calling);
- #define CHECKCALLING_DESTROY(ctx) spinlock_destroy(&ctx->calling);
- #define CHECKCALLING_DECL struct spinlock calling;
- #else
- #define CHECKCALLING_BEGIN(ctx)
- #define CHECKCALLING_END(ctx)
- #define CHECKCALLING_INIT(ctx)
- #define CHECKCALLING_DESTROY(ctx)
- #define CHECKCALLING_DECL
- #endif
- struct skynet_context {
- void * instance;
- struct skynet_module * mod;
- void * cb_ud;
- skynet_cb cb;
- struct message_queue *queue;
- ATOM_POINTER logfile;
- uint64_t cpu_cost; // in microsec
- uint64_t cpu_start; // in microsec
- char result[32];
- uint32_t handle;
- int session_id;
- ATOM_INT ref;
- int message_count;
- bool init;
- bool endless;
- bool profile;
- CHECKCALLING_DECL
- };
- struct skynet_node {
- ATOM_INT total;
- int init;
- uint32_t monitor_exit;
- pthread_key_t handle_key;
- bool profile; // default is on
- };
- static struct skynet_node G_NODE;
- int
- skynet_context_total() {
- return ATOM_LOAD(&G_NODE.total);
- }
- static void
- context_inc() {
- ATOM_FINC(&G_NODE.total);
- }
- static void
- context_dec() {
- ATOM_FDEC(&G_NODE.total);
- }
- uint32_t
- skynet_current_handle(void) {
- if (G_NODE.init) {
- void * handle = pthread_getspecific(G_NODE.handle_key);
- return (uint32_t)(uintptr_t)handle;
- } else {
- uint32_t v = (uint32_t)(-THREAD_MAIN);
- return v;
- }
- }
- static void
- id_to_hex(char * str, uint32_t id) {
- int i;
- static char hex[16] = { '0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F' };
- str[0] = ':';
- for (i=0;i<8;i++) {
- str[i+1] = hex[(id >> ((7-i) * 4))&0xf];
- }
- str[9] = '\0';
- }
- struct drop_t {
- uint32_t handle;
- };
- static void
- drop_message(struct skynet_message *msg, void *ud) {
- struct drop_t *d = ud;
- skynet_free(msg->data);
- uint32_t source = d->handle;
- assert(source);
- // report error to the message source
- skynet_send(NULL, source, msg->source, PTYPE_ERROR, msg->session, NULL, 0);
- }
- struct skynet_context *
- skynet_context_new(const char * name, const char *param) {
- struct skynet_module * mod = skynet_module_query(name);
- if (mod == NULL)
- return NULL;
- void *inst = skynet_module_instance_create(mod);
- if (inst == NULL)
- return NULL;
- struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
- CHECKCALLING_INIT(ctx)
- ctx->mod = mod;
- ctx->instance = inst;
- ATOM_INIT(&ctx->ref , 2);
- ctx->cb = NULL;
- ctx->cb_ud = NULL;
- ctx->session_id = 0;
- ATOM_INIT(&ctx->logfile, (uintptr_t)NULL);
- ctx->init = false;
- ctx->endless = false;
- ctx->cpu_cost = 0;
- ctx->cpu_start = 0;
- ctx->message_count = 0;
- ctx->profile = G_NODE.profile;
- // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
- ctx->handle = 0;
- ctx->handle = skynet_handle_register(ctx);
- struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
- // init function maybe use ctx->handle, so it must init at last
- context_inc();
- CHECKCALLING_BEGIN(ctx)
- int r = skynet_module_instance_init(mod, inst, ctx, param);
- CHECKCALLING_END(ctx)
- if (r == 0) {
- struct skynet_context * ret = skynet_context_release(ctx);
- if (ret) {
- ctx->init = true;
- }
- skynet_globalmq_push(queue);
- if (ret) {
- skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
- }
- return ret;
- } else {
- skynet_error(ctx, "FAILED launch %s", name);
- uint32_t handle = ctx->handle;
- skynet_context_release(ctx);
- skynet_handle_retire(handle);
- struct drop_t d = { handle };
- skynet_mq_release(queue, drop_message, &d);
- return NULL;
- }
- }
- int
- skynet_context_newsession(struct skynet_context *ctx) {
- // session always be a positive number
- int session = ++ctx->session_id;
- if (session <= 0) {
- ctx->session_id = 1;
- return 1;
- }
- return session;
- }
- void
- skynet_context_grab(struct skynet_context *ctx) {
- ATOM_FINC(&ctx->ref);
- }
- void
- skynet_context_reserve(struct skynet_context *ctx) {
- skynet_context_grab(ctx);
- // don't count the context reserved, because skynet abort (the worker threads terminate) only when the total context is 0 .
- // the reserved context will be release at last.
- context_dec();
- }
- static void
- delete_context(struct skynet_context *ctx) {
- FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
- if (f) {
- fclose(f);
- }
- skynet_module_instance_release(ctx->mod, ctx->instance);
- skynet_mq_mark_release(ctx->queue);
- CHECKCALLING_DESTROY(ctx)
- skynet_free(ctx);
- context_dec();
- }
- struct skynet_context *
- skynet_context_release(struct skynet_context *ctx) {
- if (ATOM_FDEC(&ctx->ref) == 1) {
- delete_context(ctx);
- return NULL;
- }
- return ctx;
- }
- int
- skynet_context_push(uint32_t handle, struct skynet_message *message) {
- struct skynet_context * ctx = skynet_handle_grab(handle);
- if (ctx == NULL) {
- return -1;
- }
- skynet_mq_push(ctx->queue, message);
- skynet_context_release(ctx);
- return 0;
- }
- void
- skynet_context_endless(uint32_t handle) {
- struct skynet_context * ctx = skynet_handle_grab(handle);
- if (ctx == NULL) {
- return;
- }
- ctx->endless = true;
- skynet_context_release(ctx);
- }
- int
- skynet_isremote(struct skynet_context * ctx, uint32_t handle, int * harbor) {
- int ret = skynet_harbor_message_isremote(handle);
- if (harbor) {
- *harbor = (int)(handle >> HANDLE_REMOTE_SHIFT);
- }
- return ret;
- }
- static void
- dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
- assert(ctx->init);
- CHECKCALLING_BEGIN(ctx)
- pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
- int type = msg->sz >> MESSAGE_TYPE_SHIFT;
- size_t sz = msg->sz & MESSAGE_TYPE_MASK;
- FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
- if (f) {
- skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);
- }
- ++ctx->message_count;
- int reserve_msg;
- if (ctx->profile) {
- ctx->cpu_start = skynet_thread_time();
- reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
- uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
- ctx->cpu_cost += cost_time;
- } else {
- reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
- }
- if (!reserve_msg) {
- skynet_free(msg->data);
- }
- CHECKCALLING_END(ctx)
- }
- void
- skynet_context_dispatchall(struct skynet_context * ctx) {
- // for skynet_error
- struct skynet_message msg;
- struct message_queue *q = ctx->queue;
- while (!skynet_mq_pop(q,&msg)) {
- dispatch_message(ctx, &msg);
- }
- }
- struct message_queue *
- skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
- if (q == NULL) {
- q = skynet_globalmq_pop();
- if (q==NULL)
- return NULL;
- }
- uint32_t handle = skynet_mq_handle(q);
- struct skynet_context * ctx = skynet_handle_grab(handle);
- if (ctx == NULL) {
- struct drop_t d = { handle };
- skynet_mq_release(q, drop_message, &d);
- return skynet_globalmq_pop();
- }
- int i,n=1;
- struct skynet_message msg;
- for (i=0;i<n;i++) {
- if (skynet_mq_pop(q,&msg)) {
- skynet_context_release(ctx);
- return skynet_globalmq_pop();
- } else if (i==0 && weight >= 0) {
- n = skynet_mq_length(q);
- n >>= weight;
- }
- int overload = skynet_mq_overload(q);
- if (overload) {
- skynet_error(ctx, "May overload, message queue length = %d", overload);
- }
- skynet_monitor_trigger(sm, msg.source , handle);
- if (ctx->cb == NULL) {
- skynet_free(msg.data);
- } else {
- dispatch_message(ctx, &msg);
- }
- skynet_monitor_trigger(sm, 0,0);
- }
- assert(q == ctx->queue);
- struct message_queue *nq = skynet_globalmq_pop();
- if (nq) {
- // If global mq is not empty , push q back, and return next queue (nq)
- // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
- skynet_globalmq_push(q);
- q = nq;
- }
- skynet_context_release(ctx);
- return q;
- }
- static void
- copy_name(char name[GLOBALNAME_LENGTH], const char * addr) {
- int i;
- for (i=0;i<GLOBALNAME_LENGTH && addr[i];i++) {
- name[i] = addr[i];
- }
- for (;i<GLOBALNAME_LENGTH;i++) {
- name[i] = '\0';
- }
- }
- uint32_t
- skynet_queryname(struct skynet_context * context, const char * name) {
- switch(name[0]) {
- case ':':
- return strtoul(name+1,NULL,16);
- case '.':
- return skynet_handle_findname(name + 1);
- }
- skynet_error(context, "Don't support query global name %s",name);
- return 0;
- }
- static void
- handle_exit(struct skynet_context * context, uint32_t handle) {
- if (handle == 0) {
- handle = context->handle;
- skynet_error(context, "KILL self");
- } else {
- skynet_error(context, "KILL :%0x", handle);
- }
- if (G_NODE.monitor_exit) {
- skynet_send(context, handle, G_NODE.monitor_exit, PTYPE_CLIENT, 0, NULL, 0);
- }
- skynet_handle_retire(handle);
- }
- // skynet command
- struct command_func {
- const char *name;
- const char * (*func)(struct skynet_context * context, const char * param);
- };
- static const char *
- cmd_timeout(struct skynet_context * context, const char * param) {
- char * session_ptr = NULL;
- int ti = strtol(param, &session_ptr, 10);
- int session = skynet_context_newsession(context);
- skynet_timeout(context->handle, ti, session);
- sprintf(context->result, "%d", session);
- return context->result;
- }
- static const char *
- cmd_reg(struct skynet_context * context, const char * param) {
- if (param == NULL || param[0] == '\0') {
- sprintf(context->result, ":%x", context->handle);
- return context->result;
- } else if (param[0] == '.') {
- return skynet_handle_namehandle(context->handle, param + 1);
- } else {
- skynet_error(context, "Can't register global name %s in C", param);
- return NULL;
- }
- }
- static const char *
- cmd_query(struct skynet_context * context, const char * param) {
- if (param[0] == '.') {
- uint32_t handle = skynet_handle_findname(param+1);
- if (handle) {
- sprintf(context->result, ":%x", handle);
- return context->result;
- }
- }
- return NULL;
- }
- static const char *
- cmd_name(struct skynet_context * context, const char * param) {
- int size = strlen(param);
- char name[size+1];
- char handle[size+1];
- sscanf(param,"%s %s",name,handle);
- if (handle[0] != ':') {
- return NULL;
- }
- uint32_t handle_id = strtoul(handle+1, NULL, 16);
- if (handle_id == 0) {
- return NULL;
- }
- if (name[0] == '.') {
- return skynet_handle_namehandle(handle_id, name + 1);
- } else {
- skynet_error(context, "Can't set global name %s in C", name);
- }
- return NULL;
- }
- static const char *
- cmd_exit(struct skynet_context * context, const char * param) {
- handle_exit(context, 0);
- return NULL;
- }
- static uint32_t
- tohandle(struct skynet_context * context, const char * param) {
- uint32_t handle = 0;
- if (param[0] == ':') {
- handle = strtoul(param+1, NULL, 16);
- } else if (param[0] == '.') {
- handle = skynet_handle_findname(param+1);
- } else {
- skynet_error(context, "Can't convert %s to handle",param);
- }
- return handle;
- }
- static const char *
- cmd_kill(struct skynet_context * context, const char * param) {
- uint32_t handle = tohandle(context, param);
- if (handle) {
- handle_exit(context, handle);
- }
- return NULL;
- }
- static const char *
- cmd_launch(struct skynet_context * context, const char * param) {
- size_t sz = strlen(param);
- char tmp[sz+1];
- strcpy(tmp,param);
- char * args = tmp;
- char * mod = strsep(&args, " \t\r\n");
- args = strsep(&args, "\r\n");
- struct skynet_context * inst = skynet_context_new(mod,args);
- if (inst == NULL) {
- return NULL;
- } else {
- id_to_hex(context->result, inst->handle);
- return context->result;
- }
- }
- static const char *
- cmd_getenv(struct skynet_context * context, const char * param) {
- return skynet_getenv(param);
- }
- static const char *
- cmd_setenv(struct skynet_context * context, const char * param) {
- size_t sz = strlen(param);
- char key[sz+1];
- int i;
- for (i=0;param[i] != ' ' && param[i];i++) {
- key[i] = param[i];
- }
- if (param[i] == '\0')
- return NULL;
- key[i] = '\0';
- param += i+1;
-
- skynet_setenv(key,param);
- return NULL;
- }
- static const char *
- cmd_starttime(struct skynet_context * context, const char * param) {
- uint32_t sec = skynet_starttime();
- sprintf(context->result,"%u",sec);
- return context->result;
- }
- static const char *
- cmd_abort(struct skynet_context * context, const char * param) {
- skynet_handle_retireall();
- return NULL;
- }
- static const char *
- cmd_monitor(struct skynet_context * context, const char * param) {
- uint32_t handle=0;
- if (param == NULL || param[0] == '\0') {
- if (G_NODE.monitor_exit) {
- // return current monitor serivce
- sprintf(context->result, ":%x", G_NODE.monitor_exit);
- return context->result;
- }
- return NULL;
- } else {
- handle = tohandle(context, param);
- }
- G_NODE.monitor_exit = handle;
- return NULL;
- }
- static const char *
- cmd_stat(struct skynet_context * context, const char * param) {
- if (strcmp(param, "mqlen") == 0) {
- int len = skynet_mq_length(context->queue);
- sprintf(context->result, "%d", len);
- } else if (strcmp(param, "endless") == 0) {
- if (context->endless) {
- strcpy(context->result, "1");
- context->endless = false;
- } else {
- strcpy(context->result, "0");
- }
- } else if (strcmp(param, "cpu") == 0) {
- double t = (double)context->cpu_cost / 1000000.0; // microsec
- sprintf(context->result, "%lf", t);
- } else if (strcmp(param, "time") == 0) {
- if (context->profile) {
- uint64_t ti = skynet_thread_time() - context->cpu_start;
- double t = (double)ti / 1000000.0; // microsec
- sprintf(context->result, "%lf", t);
- } else {
- strcpy(context->result, "0");
- }
- } else if (strcmp(param, "message") == 0) {
- sprintf(context->result, "%d", context->message_count);
- } else {
- context->result[0] = '\0';
- }
- return context->result;
- }
- static const char *
- cmd_logon(struct skynet_context * context, const char * param) {
- uint32_t handle = tohandle(context, param);
- if (handle == 0)
- return NULL;
- struct skynet_context * ctx = skynet_handle_grab(handle);
- if (ctx == NULL)
- return NULL;
- FILE *f = NULL;
- FILE * lastf = (FILE *)ATOM_LOAD(&ctx->logfile);
- if (lastf == NULL) {
- f = skynet_log_open(context, handle);
- if (f) {
- if (!ATOM_CAS_POINTER(&ctx->logfile, 0, (uintptr_t)f)) {
- // logfile opens in other thread, close this one.
- fclose(f);
- }
- }
- }
- skynet_context_release(ctx);
- return NULL;
- }
- static const char *
- cmd_logoff(struct skynet_context * context, const char * param) {
- uint32_t handle = tohandle(context, param);
- if (handle == 0)
- return NULL;
- struct skynet_context * ctx = skynet_handle_grab(handle);
- if (ctx == NULL)
- return NULL;
- FILE * f = (FILE *)ATOM_LOAD(&ctx->logfile);
- if (f) {
- // logfile may close in other thread
- if (ATOM_CAS_POINTER(&ctx->logfile, (uintptr_t)f, (uintptr_t)NULL)) {
- skynet_log_close(context, f, handle);
- }
- }
- skynet_context_release(ctx);
- return NULL;
- }
- static const char *
- cmd_signal(struct skynet_context * context, const char * param) {
- uint32_t handle = tohandle(context, param);
- if (handle == 0)
- return NULL;
- struct skynet_context * ctx = skynet_handle_grab(handle);
- if (ctx == NULL)
- return NULL;
- param = strchr(param, ' ');
- int sig = 0;
- if (param) {
- sig = strtol(param, NULL, 0);
- }
- // NOTICE: the signal function should be thread safe.
- skynet_module_instance_signal(ctx->mod, ctx->instance, sig);
- skynet_context_release(ctx);
- return NULL;
- }
- static struct command_func cmd_funcs[] = {
- { "TIMEOUT", cmd_timeout },
- { "REG", cmd_reg },
- { "QUERY", cmd_query },
- { "NAME", cmd_name },
- { "EXIT", cmd_exit },
- { "KILL", cmd_kill },
- { "LAUNCH", cmd_launch },
- { "GETENV", cmd_getenv },
- { "SETENV", cmd_setenv },
- { "STARTTIME", cmd_starttime },
- { "ABORT", cmd_abort },
- { "MONITOR", cmd_monitor },
- { "STAT", cmd_stat },
- { "LOGON", cmd_logon },
- { "LOGOFF", cmd_logoff },
- { "SIGNAL", cmd_signal },
- { NULL, NULL },
- };
- const char *
- skynet_command(struct skynet_context * context, const char * cmd , const char * param) {
- struct command_func * method = &cmd_funcs[0];
- while(method->name) {
- if (strcmp(cmd, method->name) == 0) {
- return method->func(context, param);
- }
- ++method;
- }
- return NULL;
- }
- static void
- _filter_args(struct skynet_context * context, int type, int *session, void ** data, size_t * sz) {
- int needcopy = !(type & PTYPE_TAG_DONTCOPY);
- int allocsession = type & PTYPE_TAG_ALLOCSESSION;
- type &= 0xff;
- if (allocsession) {
- assert(*session == 0);
- *session = skynet_context_newsession(context);
- }
- if (needcopy && *data) {
- char * msg = skynet_malloc(*sz+1);
- memcpy(msg, *data, *sz);
- msg[*sz] = '\0';
- *data = msg;
- }
- *sz |= (size_t)type << MESSAGE_TYPE_SHIFT;
- }
- int
- skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
- if ((sz & MESSAGE_TYPE_MASK) != sz) {
- skynet_error(context, "The message to %x is too large", destination);
- if (type & PTYPE_TAG_DONTCOPY) {
- skynet_free(data);
- }
- return -2;
- }
- _filter_args(context, type, &session, (void **)&data, &sz);
- if (source == 0) {
- source = context->handle;
- }
- if (destination == 0) {
- if (data) {
- skynet_error(context, "Destination address can't be 0");
- skynet_free(data);
- return -1;
- }
- return session;
- }
- if (skynet_harbor_message_isremote(destination)) {
- struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
- rmsg->destination.handle = destination;
- rmsg->message = data;
- rmsg->sz = sz & MESSAGE_TYPE_MASK;
- rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
- skynet_harbor_send(rmsg, source, session);
- } else {
- struct skynet_message smsg;
- smsg.source = source;
- smsg.session = session;
- smsg.data = data;
- smsg.sz = sz;
- if (skynet_context_push(destination, &smsg)) {
- skynet_free(data);
- return -1;
- }
- }
- return session;
- }
- int
- skynet_sendname(struct skynet_context * context, uint32_t source, const char * addr , int type, int session, void * data, size_t sz) {
- if (source == 0) {
- source = context->handle;
- }
- uint32_t des = 0;
- if (addr[0] == ':') {
- des = strtoul(addr+1, NULL, 16);
- } else if (addr[0] == '.') {
- des = skynet_handle_findname(addr + 1);
- if (des == 0) {
- if (type & PTYPE_TAG_DONTCOPY) {
- skynet_free(data);
- }
- return -1;
- }
- } else {
- if ((sz & MESSAGE_TYPE_MASK) != sz) {
- skynet_error(context, "The message to %s is too large", addr);
- if (type & PTYPE_TAG_DONTCOPY) {
- skynet_free(data);
- }
- return -2;
- }
- _filter_args(context, type, &session, (void **)&data, &sz);
- struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
- copy_name(rmsg->destination.name, addr);
- rmsg->destination.handle = 0;
- rmsg->message = data;
- rmsg->sz = sz & MESSAGE_TYPE_MASK;
- rmsg->type = sz >> MESSAGE_TYPE_SHIFT;
- skynet_harbor_send(rmsg, source, session);
- return session;
- }
- return skynet_send(context, source, des, type, session, data, sz);
- }
- uint32_t
- skynet_context_handle(struct skynet_context *ctx) {
- return ctx->handle;
- }
- void
- skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
- context->cb = cb;
- context->cb_ud = ud;
- }
- void
- skynet_context_send(struct skynet_context * ctx, void * msg, size_t sz, uint32_t source, int type, int session) {
- struct skynet_message smsg;
- smsg.source = source;
- smsg.session = session;
- smsg.data = msg;
- smsg.sz = sz | (size_t)type << MESSAGE_TYPE_SHIFT;
- skynet_mq_push(ctx->queue, &smsg);
- }
- void
- skynet_globalinit(void) {
- ATOM_INIT(&G_NODE.total , 0);
- G_NODE.monitor_exit = 0;
- G_NODE.init = 1;
- if (pthread_key_create(&G_NODE.handle_key, NULL)) {
- fprintf(stderr, "pthread_key_create failed");
- exit(1);
- }
- // set mainthread's key
- skynet_initthread(THREAD_MAIN);
- }
- void
- skynet_globalexit(void) {
- pthread_key_delete(G_NODE.handle_key);
- }
- void
- skynet_initthread(int m) {
- uintptr_t v = (uint32_t)(-m);
- pthread_setspecific(G_NODE.handle_key, (void *)v);
- }
- void
- skynet_profile_enable(int enable) {
- G_NODE.profile = (bool)enable;
- }
|