#include "skynet.h" #include "skynet_server.h" #include "skynet_module.h" #include "skynet_handle.h" #include "skynet_mq.h" #include "skynet_timer.h" #include "skynet_harbor.h" #include "skynet_env.h" #include "skynet_monitor.h" #include "skynet_imp.h" #include "skynet_log.h" #include "spinlock.h" #include "atomic.h" #include #include #include #include #include #include #ifdef CALLING_CHECK #define CHECKCALLING_BEGIN(ctx) if (!(spinlock_trylock(&ctx->calling))) { assert(0); } #define CHECKCALLING_END(ctx) spinlock_unlock(&ctx->calling); #define CHECKCALLING_INIT(ctx) spinlock_init(&ctx->calling); #define CHECKCALLING_DESTROY(ctx) spinlock_destroy(&ctx->calling); #define CHECKCALLING_DECL struct spinlock calling; #else #define CHECKCALLING_BEGIN(ctx) #define CHECKCALLING_END(ctx) #define CHECKCALLING_INIT(ctx) #define CHECKCALLING_DESTROY(ctx) #define CHECKCALLING_DECL #endif struct skynet_context { void * instance; struct skynet_module * mod; void * cb_ud; skynet_cb cb; struct message_queue *queue; ATOM_POINTER logfile; uint64_t cpu_cost; // in microsec uint64_t cpu_start; // in microsec char result[32]; uint32_t handle; int session_id; ATOM_INT ref; int message_count; bool init; bool endless; bool profile; CHECKCALLING_DECL }; struct skynet_node { ATOM_INT total; int init; uint32_t monitor_exit; pthread_key_t handle_key; bool profile; // default is on }; static struct skynet_node G_NODE; int skynet_context_total() { return ATOM_LOAD(&G_NODE.total); } static void context_inc() { ATOM_FINC(&G_NODE.total); } static void context_dec() { ATOM_FDEC(&G_NODE.total); } uint32_t skynet_current_handle(void) { if (G_NODE.init) { void * handle = pthread_getspecific(G_NODE.handle_key); return (uint32_t)(uintptr_t)handle; } else { uint32_t v = (uint32_t)(-THREAD_MAIN); return v; } } static void id_to_hex(char * str, uint32_t id) { int i; static char hex[16] = { '0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F' }; str[0] = ':'; for (i=0;i<8;i++) { str[i+1] = hex[(id >> ((7-i) * 4))&0xf]; } str[9] = '\0'; } struct drop_t { uint32_t handle; }; static void drop_message(struct skynet_message *msg, void *ud) { struct drop_t *d = ud; skynet_free(msg->data); uint32_t source = d->handle; assert(source); // report error to the message source skynet_send(NULL, source, msg->source, PTYPE_ERROR, msg->session, NULL, 0); } struct skynet_context * skynet_context_new(const char * name, const char *param) { struct skynet_module * mod = skynet_module_query(name); if (mod == NULL) return NULL; void *inst = skynet_module_instance_create(mod); if (inst == NULL) return NULL; struct skynet_context * ctx = skynet_malloc(sizeof(*ctx)); CHECKCALLING_INIT(ctx) ctx->mod = mod; ctx->instance = inst; ATOM_INIT(&ctx->ref , 2); ctx->cb = NULL; ctx->cb_ud = NULL; ctx->session_id = 0; ATOM_INIT(&ctx->logfile, (uintptr_t)NULL); ctx->init = false; ctx->endless = false; ctx->cpu_cost = 0; ctx->cpu_start = 0; ctx->message_count = 0; ctx->profile = G_NODE.profile; // Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle ctx->handle = 0; ctx->handle = skynet_handle_register(ctx); struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle); // init function maybe use ctx->handle, so it must init at last context_inc(); CHECKCALLING_BEGIN(ctx) int r = skynet_module_instance_init(mod, inst, ctx, param); CHECKCALLING_END(ctx) if (r == 0) { struct skynet_context * ret = skynet_context_release(ctx); if (ret) { ctx->init = true; } skynet_globalmq_push(queue); if (ret) { skynet_error(ret, "LAUNCH %s %s", name, param ? param : ""); } return ret; } else { skynet_error(ctx, "FAILED launch %s", name); uint32_t handle = ctx->handle; skynet_context_release(ctx); skynet_handle_retire(handle); struct drop_t d = { handle }; skynet_mq_release(queue, drop_message, &d); return NULL; } } int skynet_context_newsession(struct skynet_context *ctx) { // session always be a positive number int session = ++ctx->session_id; if (session <= 0) { ctx->session_id = 1; return 1; } return session; } void skynet_context_grab(struct skynet_context *ctx) { ATOM_FINC(&ctx->ref); } void skynet_context_reserve(struct skynet_context *ctx) { skynet_context_grab(ctx); // don't count the context reserved, because skynet abort (the worker threads terminate) only when the total context is 0 . // the reserved context will be release at last. context_dec(); } static void delete_context(struct skynet_context *ctx) { FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile); if (f) { fclose(f); } skynet_module_instance_release(ctx->mod, ctx->instance); skynet_mq_mark_release(ctx->queue); CHECKCALLING_DESTROY(ctx) skynet_free(ctx); context_dec(); } struct skynet_context * skynet_context_release(struct skynet_context *ctx) { if (ATOM_FDEC(&ctx->ref) == 1) { delete_context(ctx); return NULL; } return ctx; } int skynet_context_push(uint32_t handle, struct skynet_message *message) { struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { return -1; } skynet_mq_push(ctx->queue, message); skynet_context_release(ctx); return 0; } void skynet_context_endless(uint32_t handle) { struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { return; } ctx->endless = true; skynet_context_release(ctx); } int skynet_isremote(struct skynet_context * ctx, uint32_t handle, int * harbor) { int ret = skynet_harbor_message_isremote(handle); if (harbor) { *harbor = (int)(handle >> HANDLE_REMOTE_SHIFT); } return ret; } static void dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) { assert(ctx->init); CHECKCALLING_BEGIN(ctx) pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle)); int type = msg->sz >> MESSAGE_TYPE_SHIFT; size_t sz = msg->sz & MESSAGE_TYPE_MASK; FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile); if (f) { skynet_log_output(f, msg->source, type, msg->session, msg->data, sz); } ++ctx->message_count; int reserve_msg; if (ctx->profile) { ctx->cpu_start = skynet_thread_time(); reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); uint64_t cost_time = skynet_thread_time() - ctx->cpu_start; ctx->cpu_cost += cost_time; } else { reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); } if (!reserve_msg) { skynet_free(msg->data); } CHECKCALLING_END(ctx) } void skynet_context_dispatchall(struct skynet_context * ctx) { // for skynet_error struct skynet_message msg; struct message_queue *q = ctx->queue; while (!skynet_mq_pop(q,&msg)) { dispatch_message(ctx, &msg); } } struct message_queue * skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) { if (q == NULL) { q = skynet_globalmq_pop(); if (q==NULL) return NULL; } uint32_t handle = skynet_mq_handle(q); struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) { struct drop_t d = { handle }; skynet_mq_release(q, drop_message, &d); return skynet_globalmq_pop(); } int i,n=1; struct skynet_message msg; for (i=0;i= 0) { n = skynet_mq_length(q); n >>= weight; } int overload = skynet_mq_overload(q); if (overload) { skynet_error(ctx, "May overload, message queue length = %d", overload); } skynet_monitor_trigger(sm, msg.source , handle); if (ctx->cb == NULL) { skynet_free(msg.data); } else { dispatch_message(ctx, &msg); } skynet_monitor_trigger(sm, 0,0); } assert(q == ctx->queue); struct message_queue *nq = skynet_globalmq_pop(); if (nq) { // If global mq is not empty , push q back, and return next queue (nq) // Else (global mq is empty or block, don't push q back, and return q again (for next dispatch) skynet_globalmq_push(q); q = nq; } skynet_context_release(ctx); return q; } static void copy_name(char name[GLOBALNAME_LENGTH], const char * addr) { int i; for (i=0;ihandle; skynet_error(context, "KILL self"); } else { skynet_error(context, "KILL :%0x", handle); } if (G_NODE.monitor_exit) { skynet_send(context, handle, G_NODE.monitor_exit, PTYPE_CLIENT, 0, NULL, 0); } skynet_handle_retire(handle); } // skynet command struct command_func { const char *name; const char * (*func)(struct skynet_context * context, const char * param); }; static const char * cmd_timeout(struct skynet_context * context, const char * param) { char * session_ptr = NULL; int ti = strtol(param, &session_ptr, 10); int session = skynet_context_newsession(context); skynet_timeout(context->handle, ti, session); sprintf(context->result, "%d", session); return context->result; } static const char * cmd_reg(struct skynet_context * context, const char * param) { if (param == NULL || param[0] == '\0') { sprintf(context->result, ":%x", context->handle); return context->result; } else if (param[0] == '.') { return skynet_handle_namehandle(context->handle, param + 1); } else { skynet_error(context, "Can't register global name %s in C", param); return NULL; } } static const char * cmd_query(struct skynet_context * context, const char * param) { if (param[0] == '.') { uint32_t handle = skynet_handle_findname(param+1); if (handle) { sprintf(context->result, ":%x", handle); return context->result; } } return NULL; } static const char * cmd_name(struct skynet_context * context, const char * param) { int size = strlen(param); char name[size+1]; char handle[size+1]; sscanf(param,"%s %s",name,handle); if (handle[0] != ':') { return NULL; } uint32_t handle_id = strtoul(handle+1, NULL, 16); if (handle_id == 0) { return NULL; } if (name[0] == '.') { return skynet_handle_namehandle(handle_id, name + 1); } else { skynet_error(context, "Can't set global name %s in C", name); } return NULL; } static const char * cmd_exit(struct skynet_context * context, const char * param) { handle_exit(context, 0); return NULL; } static uint32_t tohandle(struct skynet_context * context, const char * param) { uint32_t handle = 0; if (param[0] == ':') { handle = strtoul(param+1, NULL, 16); } else if (param[0] == '.') { handle = skynet_handle_findname(param+1); } else { skynet_error(context, "Can't convert %s to handle",param); } return handle; } static const char * cmd_kill(struct skynet_context * context, const char * param) { uint32_t handle = tohandle(context, param); if (handle) { handle_exit(context, handle); } return NULL; } static const char * cmd_launch(struct skynet_context * context, const char * param) { size_t sz = strlen(param); char tmp[sz+1]; strcpy(tmp,param); char * args = tmp; char * mod = strsep(&args, " \t\r\n"); args = strsep(&args, "\r\n"); struct skynet_context * inst = skynet_context_new(mod,args); if (inst == NULL) { return NULL; } else { id_to_hex(context->result, inst->handle); return context->result; } } static const char * cmd_getenv(struct skynet_context * context, const char * param) { return skynet_getenv(param); } static const char * cmd_setenv(struct skynet_context * context, const char * param) { size_t sz = strlen(param); char key[sz+1]; int i; for (i=0;param[i] != ' ' && param[i];i++) { key[i] = param[i]; } if (param[i] == '\0') return NULL; key[i] = '\0'; param += i+1; skynet_setenv(key,param); return NULL; } static const char * cmd_starttime(struct skynet_context * context, const char * param) { uint32_t sec = skynet_starttime(); sprintf(context->result,"%u",sec); return context->result; } static const char * cmd_abort(struct skynet_context * context, const char * param) { skynet_handle_retireall(); return NULL; } static const char * cmd_monitor(struct skynet_context * context, const char * param) { uint32_t handle=0; if (param == NULL || param[0] == '\0') { if (G_NODE.monitor_exit) { // return current monitor serivce sprintf(context->result, ":%x", G_NODE.monitor_exit); return context->result; } return NULL; } else { handle = tohandle(context, param); } G_NODE.monitor_exit = handle; return NULL; } static const char * cmd_stat(struct skynet_context * context, const char * param) { if (strcmp(param, "mqlen") == 0) { int len = skynet_mq_length(context->queue); sprintf(context->result, "%d", len); } else if (strcmp(param, "endless") == 0) { if (context->endless) { strcpy(context->result, "1"); context->endless = false; } else { strcpy(context->result, "0"); } } else if (strcmp(param, "cpu") == 0) { double t = (double)context->cpu_cost / 1000000.0; // microsec sprintf(context->result, "%lf", t); } else if (strcmp(param, "time") == 0) { if (context->profile) { uint64_t ti = skynet_thread_time() - context->cpu_start; double t = (double)ti / 1000000.0; // microsec sprintf(context->result, "%lf", t); } else { strcpy(context->result, "0"); } } else if (strcmp(param, "message") == 0) { sprintf(context->result, "%d", context->message_count); } else { context->result[0] = '\0'; } return context->result; } static const char * cmd_logon(struct skynet_context * context, const char * param) { uint32_t handle = tohandle(context, param); if (handle == 0) return NULL; struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) return NULL; FILE *f = NULL; FILE * lastf = (FILE *)ATOM_LOAD(&ctx->logfile); if (lastf == NULL) { f = skynet_log_open(context, handle); if (f) { if (!ATOM_CAS_POINTER(&ctx->logfile, 0, (uintptr_t)f)) { // logfile opens in other thread, close this one. fclose(f); } } } skynet_context_release(ctx); return NULL; } static const char * cmd_logoff(struct skynet_context * context, const char * param) { uint32_t handle = tohandle(context, param); if (handle == 0) return NULL; struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) return NULL; FILE * f = (FILE *)ATOM_LOAD(&ctx->logfile); if (f) { // logfile may close in other thread if (ATOM_CAS_POINTER(&ctx->logfile, (uintptr_t)f, (uintptr_t)NULL)) { skynet_log_close(context, f, handle); } } skynet_context_release(ctx); return NULL; } static const char * cmd_signal(struct skynet_context * context, const char * param) { uint32_t handle = tohandle(context, param); if (handle == 0) return NULL; struct skynet_context * ctx = skynet_handle_grab(handle); if (ctx == NULL) return NULL; param = strchr(param, ' '); int sig = 0; if (param) { sig = strtol(param, NULL, 0); } // NOTICE: the signal function should be thread safe. skynet_module_instance_signal(ctx->mod, ctx->instance, sig); skynet_context_release(ctx); return NULL; } static struct command_func cmd_funcs[] = { { "TIMEOUT", cmd_timeout }, { "REG", cmd_reg }, { "QUERY", cmd_query }, { "NAME", cmd_name }, { "EXIT", cmd_exit }, { "KILL", cmd_kill }, { "LAUNCH", cmd_launch }, { "GETENV", cmd_getenv }, { "SETENV", cmd_setenv }, { "STARTTIME", cmd_starttime }, { "ABORT", cmd_abort }, { "MONITOR", cmd_monitor }, { "STAT", cmd_stat }, { "LOGON", cmd_logon }, { "LOGOFF", cmd_logoff }, { "SIGNAL", cmd_signal }, { NULL, NULL }, }; const char * skynet_command(struct skynet_context * context, const char * cmd , const char * param) { struct command_func * method = &cmd_funcs[0]; while(method->name) { if (strcmp(cmd, method->name) == 0) { return method->func(context, param); } ++method; } return NULL; } static void _filter_args(struct skynet_context * context, int type, int *session, void ** data, size_t * sz) { int needcopy = !(type & PTYPE_TAG_DONTCOPY); int allocsession = type & PTYPE_TAG_ALLOCSESSION; type &= 0xff; if (allocsession) { assert(*session == 0); *session = skynet_context_newsession(context); } if (needcopy && *data) { char * msg = skynet_malloc(*sz+1); memcpy(msg, *data, *sz); msg[*sz] = '\0'; *data = msg; } *sz |= (size_t)type << MESSAGE_TYPE_SHIFT; } int skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) { if ((sz & MESSAGE_TYPE_MASK) != sz) { skynet_error(context, "The message to %x is too large", destination); if (type & PTYPE_TAG_DONTCOPY) { skynet_free(data); } return -2; } _filter_args(context, type, &session, (void **)&data, &sz); if (source == 0) { source = context->handle; } if (destination == 0) { if (data) { skynet_error(context, "Destination address can't be 0"); skynet_free(data); return -1; } return session; } if (skynet_harbor_message_isremote(destination)) { struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg)); rmsg->destination.handle = destination; rmsg->message = data; rmsg->sz = sz & MESSAGE_TYPE_MASK; rmsg->type = sz >> MESSAGE_TYPE_SHIFT; skynet_harbor_send(rmsg, source, session); } else { struct skynet_message smsg; smsg.source = source; smsg.session = session; smsg.data = data; smsg.sz = sz; if (skynet_context_push(destination, &smsg)) { skynet_free(data); return -1; } } return session; } int skynet_sendname(struct skynet_context * context, uint32_t source, const char * addr , int type, int session, void * data, size_t sz) { if (source == 0) { source = context->handle; } uint32_t des = 0; if (addr[0] == ':') { des = strtoul(addr+1, NULL, 16); } else if (addr[0] == '.') { des = skynet_handle_findname(addr + 1); if (des == 0) { if (type & PTYPE_TAG_DONTCOPY) { skynet_free(data); } return -1; } } else { if ((sz & MESSAGE_TYPE_MASK) != sz) { skynet_error(context, "The message to %s is too large", addr); if (type & PTYPE_TAG_DONTCOPY) { skynet_free(data); } return -2; } _filter_args(context, type, &session, (void **)&data, &sz); struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg)); copy_name(rmsg->destination.name, addr); rmsg->destination.handle = 0; rmsg->message = data; rmsg->sz = sz & MESSAGE_TYPE_MASK; rmsg->type = sz >> MESSAGE_TYPE_SHIFT; skynet_harbor_send(rmsg, source, session); return session; } return skynet_send(context, source, des, type, session, data, sz); } uint32_t skynet_context_handle(struct skynet_context *ctx) { return ctx->handle; } void skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) { context->cb = cb; context->cb_ud = ud; } void skynet_context_send(struct skynet_context * ctx, void * msg, size_t sz, uint32_t source, int type, int session) { struct skynet_message smsg; smsg.source = source; smsg.session = session; smsg.data = msg; smsg.sz = sz | (size_t)type << MESSAGE_TYPE_SHIFT; skynet_mq_push(ctx->queue, &smsg); } void skynet_globalinit(void) { ATOM_INIT(&G_NODE.total , 0); G_NODE.monitor_exit = 0; G_NODE.init = 1; if (pthread_key_create(&G_NODE.handle_key, NULL)) { fprintf(stderr, "pthread_key_create failed"); exit(1); } // set mainthread's key skynet_initthread(THREAD_MAIN); } void skynet_globalexit(void) { pthread_key_delete(G_NODE.handle_key); } void skynet_initthread(int m) { uintptr_t v = (uint32_t)(-m); pthread_setspecific(G_NODE.handle_key, (void *)v); } void skynet_profile_enable(int enable) { G_NODE.profile = (bool)enable; }