socket_server.c 57 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332
  1. #include "skynet.h"
  2. #include "socket_server.h"
  3. #include "socket_poll.h"
  4. #include "atomic.h"
  5. #include "spinlock.h"
  6. #include <sys/types.h>
  7. #include <sys/socket.h>
  8. #include <netinet/tcp.h>
  9. #include <unistd.h>
  10. #include <errno.h>
  11. #include <stdlib.h>
  12. #include <stdbool.h>
  13. #include <stdio.h>
  14. #include <stdint.h>
  15. #include <assert.h>
  16. #include <string.h>
  17. #define MAX_INFO 128
  18. // MAX_SOCKET will be 2^MAX_SOCKET_P
  19. #define MAX_SOCKET_P 16
  20. #define MAX_EVENT 64
  21. #define MIN_READ_BUFFER 64
  22. #define SOCKET_TYPE_INVALID 0
  23. #define SOCKET_TYPE_RESERVE 1
  24. #define SOCKET_TYPE_PLISTEN 2
  25. #define SOCKET_TYPE_LISTEN 3
  26. #define SOCKET_TYPE_CONNECTING 4
  27. #define SOCKET_TYPE_CONNECTED 5
  28. #define SOCKET_TYPE_HALFCLOSE_READ 6
  29. #define SOCKET_TYPE_HALFCLOSE_WRITE 7
  30. #define SOCKET_TYPE_PACCEPT 8
  31. #define SOCKET_TYPE_BIND 9
  32. #define MAX_SOCKET (1<<MAX_SOCKET_P)
  33. #define PRIORITY_HIGH 0
  34. #define PRIORITY_LOW 1
  35. #define HASH_ID(id) (((unsigned)id) % MAX_SOCKET)
  36. #define ID_TAG16(id) ((id>>MAX_SOCKET_P) & 0xffff)
  37. #define PROTOCOL_TCP 0
  38. #define PROTOCOL_UDP 1
  39. #define PROTOCOL_UDPv6 2
  40. #define PROTOCOL_UNKNOWN 255
  41. #define UDP_ADDRESS_SIZE 19 // ipv6 128bit + port 16bit + 1 byte type
  42. #define MAX_UDP_PACKAGE 65535
  43. // EAGAIN and EWOULDBLOCK may be not the same value.
  44. #if (EAGAIN != EWOULDBLOCK)
  45. #define AGAIN_WOULDBLOCK EAGAIN : case EWOULDBLOCK
  46. #else
  47. #define AGAIN_WOULDBLOCK EAGAIN
  48. #endif
  49. #define WARNING_SIZE (1024*1024)
  50. #define USEROBJECT ((size_t)(-1))
  51. struct write_buffer {
  52. struct write_buffer * next;
  53. const void *buffer;
  54. char *ptr;
  55. size_t sz;
  56. bool userobject;
  57. };
  58. struct write_buffer_udp {
  59. struct write_buffer buffer;
  60. uint8_t udp_address[UDP_ADDRESS_SIZE];
  61. };
  62. struct wb_list {
  63. struct write_buffer * head;
  64. struct write_buffer * tail;
  65. };
  66. struct socket_stat {
  67. uint64_t rtime;
  68. uint64_t wtime;
  69. uint64_t read;
  70. uint64_t write;
  71. };
  72. struct socket {
  73. uintptr_t opaque;
  74. struct wb_list high;
  75. struct wb_list low;
  76. int64_t wb_size;
  77. struct socket_stat stat;
  78. ATOM_ULONG sending;
  79. int fd;
  80. int id;
  81. ATOM_INT type;
  82. uint8_t protocol;
  83. bool reading;
  84. bool writing;
  85. bool closing;
  86. ATOM_INT udpconnecting;
  87. int64_t warn_size;
  88. union {
  89. int size;
  90. uint8_t udp_address[UDP_ADDRESS_SIZE];
  91. } p;
  92. struct spinlock dw_lock;
  93. int dw_offset;
  94. const void * dw_buffer;
  95. size_t dw_size;
  96. };
  97. struct socket_server {
  98. volatile uint64_t time;
  99. int reserve_fd; // for EMFILE
  100. int recvctrl_fd;
  101. int sendctrl_fd;
  102. int checkctrl;
  103. poll_fd event_fd;
  104. ATOM_INT alloc_id;
  105. int event_n;
  106. int event_index;
  107. struct socket_object_interface soi;
  108. struct event ev[MAX_EVENT];
  109. struct socket slot[MAX_SOCKET];
  110. char buffer[MAX_INFO];
  111. uint8_t udpbuffer[MAX_UDP_PACKAGE];
  112. fd_set rfds;
  113. };
  114. struct request_open {
  115. int id;
  116. int port;
  117. uintptr_t opaque;
  118. char host[1];
  119. };
  120. struct request_send {
  121. int id;
  122. size_t sz;
  123. const void * buffer;
  124. };
  125. struct request_send_udp {
  126. struct request_send send;
  127. uint8_t address[UDP_ADDRESS_SIZE];
  128. };
  129. struct request_setudp {
  130. int id;
  131. uint8_t address[UDP_ADDRESS_SIZE];
  132. };
  133. struct request_close {
  134. int id;
  135. int shutdown;
  136. uintptr_t opaque;
  137. };
  138. struct request_listen {
  139. int id;
  140. int fd;
  141. uintptr_t opaque;
  142. char host[1];
  143. };
  144. struct request_bind {
  145. int id;
  146. int fd;
  147. uintptr_t opaque;
  148. };
  149. struct request_resumepause {
  150. int id;
  151. uintptr_t opaque;
  152. };
  153. struct request_setopt {
  154. int id;
  155. int what;
  156. int value;
  157. };
  158. struct request_udp {
  159. int id;
  160. int fd;
  161. int family;
  162. uintptr_t opaque;
  163. };
  164. /*
  165. The first byte is TYPE
  166. S Start socket
  167. B Bind socket
  168. L Listen socket
  169. K Close socket
  170. O Connect to (Open)
  171. X Exit
  172. D Send package (high)
  173. P Send package (low)
  174. A Send UDP package
  175. T Set opt
  176. U Create UDP socket
  177. C set udp address
  178. Q query info
  179. */
  180. struct request_package {
  181. uint8_t header[8]; // 6 bytes dummy
  182. union {
  183. char buffer[256];
  184. struct request_open open;
  185. struct request_send send;
  186. struct request_send_udp send_udp;
  187. struct request_close close;
  188. struct request_listen listen;
  189. struct request_bind bind;
  190. struct request_resumepause resumepause;
  191. struct request_setopt setopt;
  192. struct request_udp udp;
  193. struct request_setudp set_udp;
  194. } u;
  195. uint8_t dummy[256];
  196. };
  197. union sockaddr_all {
  198. struct sockaddr s;
  199. struct sockaddr_in v4;
  200. struct sockaddr_in6 v6;
  201. };
  202. struct send_object {
  203. const void * buffer;
  204. size_t sz;
  205. void (*free_func)(void *);
  206. };
  207. #define MALLOC skynet_malloc
  208. #define FREE skynet_free
  209. struct socket_lock {
  210. struct spinlock *lock;
  211. int count;
  212. };
  213. static inline void
  214. socket_lock_init(struct socket *s, struct socket_lock *sl) {
  215. sl->lock = &s->dw_lock;
  216. sl->count = 0;
  217. }
  218. static inline void
  219. socket_lock(struct socket_lock *sl) {
  220. if (sl->count == 0) {
  221. spinlock_lock(sl->lock);
  222. }
  223. ++sl->count;
  224. }
  225. static inline int
  226. socket_trylock(struct socket_lock *sl) {
  227. if (sl->count == 0) {
  228. if (!spinlock_trylock(sl->lock))
  229. return 0; // lock failed
  230. }
  231. ++sl->count;
  232. return 1;
  233. }
  234. static inline void
  235. socket_unlock(struct socket_lock *sl) {
  236. --sl->count;
  237. if (sl->count <= 0) {
  238. assert(sl->count == 0);
  239. spinlock_unlock(sl->lock);
  240. }
  241. }
  242. static inline int
  243. socket_invalid(struct socket *s, int id) {
  244. return (s->id != id || ATOM_LOAD(&s->type) == SOCKET_TYPE_INVALID);
  245. }
  246. static inline bool
  247. send_object_init(struct socket_server *ss, struct send_object *so, const void *object, size_t sz) {
  248. if (sz == USEROBJECT) {
  249. so->buffer = ss->soi.buffer(object);
  250. so->sz = ss->soi.size(object);
  251. so->free_func = ss->soi.free;
  252. return true;
  253. } else {
  254. so->buffer = object;
  255. so->sz = sz;
  256. so->free_func = FREE;
  257. return false;
  258. }
  259. }
  260. static void
  261. dummy_free(void *ptr) {
  262. (void)ptr;
  263. }
  264. static inline void
  265. send_object_init_from_sendbuffer(struct socket_server *ss, struct send_object *so, struct socket_sendbuffer *buf) {
  266. switch (buf->type) {
  267. case SOCKET_BUFFER_MEMORY:
  268. send_object_init(ss, so, buf->buffer, buf->sz);
  269. break;
  270. case SOCKET_BUFFER_OBJECT:
  271. send_object_init(ss, so, buf->buffer, USEROBJECT);
  272. break;
  273. case SOCKET_BUFFER_RAWPOINTER:
  274. so->buffer = buf->buffer;
  275. so->sz = buf->sz;
  276. so->free_func = dummy_free;
  277. break;
  278. default:
  279. // never get here
  280. so->buffer = NULL;
  281. so->sz = 0;
  282. so->free_func = NULL;
  283. break;
  284. }
  285. }
  286. static inline void
  287. write_buffer_free(struct socket_server *ss, struct write_buffer *wb) {
  288. if (wb->userobject) {
  289. ss->soi.free((void *)wb->buffer);
  290. } else {
  291. FREE((void *)wb->buffer);
  292. }
  293. FREE(wb);
  294. }
  295. static void
  296. socket_keepalive(int fd) {
  297. int keepalive = 1;
  298. setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepalive , sizeof(keepalive));
  299. }
  300. static int
  301. reserve_id(struct socket_server *ss) {
  302. int i;
  303. for (i=0;i<MAX_SOCKET;i++) {
  304. int id = ATOM_FINC(&(ss->alloc_id))+1;
  305. if (id < 0) {
  306. id = ATOM_FAND(&(ss->alloc_id), 0x7fffffff) & 0x7fffffff;
  307. }
  308. struct socket *s = &ss->slot[HASH_ID(id)];
  309. int type_invalid = ATOM_LOAD(&s->type);
  310. if (type_invalid == SOCKET_TYPE_INVALID) {
  311. if (ATOM_CAS(&s->type, type_invalid, SOCKET_TYPE_RESERVE)) {
  312. s->id = id;
  313. s->protocol = PROTOCOL_UNKNOWN;
  314. // socket_server_udp_connect may inc s->udpconncting directly (from other thread, before new_fd),
  315. // so reset it to 0 here rather than in new_fd.
  316. ATOM_INIT(&s->udpconnecting, 0);
  317. s->fd = -1;
  318. return id;
  319. } else {
  320. // retry
  321. --i;
  322. }
  323. }
  324. }
  325. return -1;
  326. }
  327. static inline void
  328. clear_wb_list(struct wb_list *list) {
  329. list->head = NULL;
  330. list->tail = NULL;
  331. }
  332. struct socket_server *
  333. socket_server_create(uint64_t time) {
  334. int i;
  335. int fd[2];
  336. poll_fd efd = sp_create();
  337. if (sp_invalid(efd)) {
  338. skynet_error(NULL, "socket-server: create event pool failed.");
  339. return NULL;
  340. }
  341. if (pipe(fd)) {
  342. sp_release(efd);
  343. skynet_error(NULL, "socket-server: create socket pair failed.");
  344. return NULL;
  345. }
  346. if (sp_add(efd, fd[0], NULL)) {
  347. // add recvctrl_fd to event poll
  348. skynet_error(NULL, "socket-server: can't add server fd to event pool.");
  349. close(fd[0]);
  350. close(fd[1]);
  351. sp_release(efd);
  352. return NULL;
  353. }
  354. struct socket_server *ss = MALLOC(sizeof(*ss));
  355. ss->time = time;
  356. ss->event_fd = efd;
  357. ss->recvctrl_fd = fd[0];
  358. ss->sendctrl_fd = fd[1];
  359. ss->checkctrl = 1;
  360. ss->reserve_fd = dup(1); // reserve an extra fd for EMFILE
  361. for (i=0;i<MAX_SOCKET;i++) {
  362. struct socket *s = &ss->slot[i];
  363. ATOM_INIT(&s->type, SOCKET_TYPE_INVALID);
  364. clear_wb_list(&s->high);
  365. clear_wb_list(&s->low);
  366. spinlock_init(&s->dw_lock);
  367. }
  368. ATOM_INIT(&ss->alloc_id , 0);
  369. ss->event_n = 0;
  370. ss->event_index = 0;
  371. memset(&ss->soi, 0, sizeof(ss->soi));
  372. FD_ZERO(&ss->rfds);
  373. assert(ss->recvctrl_fd < FD_SETSIZE);
  374. return ss;
  375. }
  376. void
  377. socket_server_updatetime(struct socket_server *ss, uint64_t time) {
  378. ss->time = time;
  379. }
  380. static void
  381. free_wb_list(struct socket_server *ss, struct wb_list *list) {
  382. struct write_buffer *wb = list->head;
  383. while (wb) {
  384. struct write_buffer *tmp = wb;
  385. wb = wb->next;
  386. write_buffer_free(ss, tmp);
  387. }
  388. list->head = NULL;
  389. list->tail = NULL;
  390. }
  391. static void
  392. free_buffer(struct socket_server *ss, struct socket_sendbuffer *buf) {
  393. void *buffer = (void *)buf->buffer;
  394. switch (buf->type) {
  395. case SOCKET_BUFFER_MEMORY:
  396. FREE(buffer);
  397. break;
  398. case SOCKET_BUFFER_OBJECT:
  399. ss->soi.free(buffer);
  400. break;
  401. case SOCKET_BUFFER_RAWPOINTER:
  402. break;
  403. }
  404. }
  405. static const void *
  406. clone_buffer(struct socket_sendbuffer *buf, size_t *sz) {
  407. switch (buf->type) {
  408. case SOCKET_BUFFER_MEMORY:
  409. *sz = buf->sz;
  410. return buf->buffer;
  411. case SOCKET_BUFFER_OBJECT:
  412. *sz = USEROBJECT;
  413. return buf->buffer;
  414. case SOCKET_BUFFER_RAWPOINTER:
  415. // It's a raw pointer, we need make a copy
  416. *sz = buf->sz;
  417. void * tmp = MALLOC(*sz);
  418. memcpy(tmp, buf->buffer, *sz);
  419. return tmp;
  420. }
  421. // never get here
  422. *sz = 0;
  423. return NULL;
  424. }
  425. static void
  426. force_close(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
  427. result->id = s->id;
  428. result->ud = 0;
  429. result->data = NULL;
  430. result->opaque = s->opaque;
  431. uint8_t type = ATOM_LOAD(&s->type);
  432. if (type == SOCKET_TYPE_INVALID) {
  433. return;
  434. }
  435. assert(type != SOCKET_TYPE_RESERVE);
  436. free_wb_list(ss,&s->high);
  437. free_wb_list(ss,&s->low);
  438. sp_del(ss->event_fd, s->fd);
  439. socket_lock(l);
  440. if (type != SOCKET_TYPE_BIND) {
  441. if (close(s->fd) < 0) {
  442. perror("close socket:");
  443. }
  444. }
  445. ATOM_STORE(&s->type, SOCKET_TYPE_INVALID);
  446. if (s->dw_buffer) {
  447. struct socket_sendbuffer tmp;
  448. tmp.buffer = s->dw_buffer;
  449. tmp.sz = s->dw_size;
  450. tmp.id = s->id;
  451. tmp.type = (tmp.sz == USEROBJECT) ? SOCKET_BUFFER_OBJECT : SOCKET_BUFFER_MEMORY;
  452. free_buffer(ss, &tmp);
  453. s->dw_buffer = NULL;
  454. }
  455. socket_unlock(l);
  456. }
  457. void
  458. socket_server_release(struct socket_server *ss) {
  459. int i;
  460. struct socket_message dummy;
  461. for (i=0;i<MAX_SOCKET;i++) {
  462. struct socket *s = &ss->slot[i];
  463. struct socket_lock l;
  464. socket_lock_init(s, &l);
  465. if (ATOM_LOAD(&s->type) != SOCKET_TYPE_RESERVE) {
  466. force_close(ss, s, &l, &dummy);
  467. }
  468. spinlock_destroy(&s->dw_lock);
  469. }
  470. close(ss->sendctrl_fd);
  471. close(ss->recvctrl_fd);
  472. sp_release(ss->event_fd);
  473. if (ss->reserve_fd >= 0)
  474. close(ss->reserve_fd);
  475. FREE(ss);
  476. }
  477. static inline void
  478. check_wb_list(struct wb_list *s) {
  479. assert(s->head == NULL);
  480. assert(s->tail == NULL);
  481. }
  482. static inline int
  483. enable_write(struct socket_server *ss, struct socket *s, bool enable) {
  484. if (s->writing != enable) {
  485. s->writing = enable;
  486. return sp_enable(ss->event_fd, s->fd, s, s->reading, enable);
  487. }
  488. return 0;
  489. }
  490. static inline int
  491. enable_read(struct socket_server *ss, struct socket *s, bool enable) {
  492. if (s->reading != enable) {
  493. s->reading = enable;
  494. return sp_enable(ss->event_fd, s->fd, s, enable, s->writing);
  495. }
  496. return 0;
  497. }
  498. static struct socket *
  499. new_fd(struct socket_server *ss, int id, int fd, int protocol, uintptr_t opaque, bool reading) {
  500. struct socket * s = &ss->slot[HASH_ID(id)];
  501. assert(ATOM_LOAD(&s->type) == SOCKET_TYPE_RESERVE);
  502. if (sp_add(ss->event_fd, fd, s)) {
  503. ATOM_STORE(&s->type, SOCKET_TYPE_INVALID);
  504. return NULL;
  505. }
  506. s->id = id;
  507. s->fd = fd;
  508. s->reading = true;
  509. s->writing = false;
  510. s->closing = false;
  511. ATOM_INIT(&s->sending , ID_TAG16(id) << 16 | 0);
  512. s->protocol = protocol;
  513. s->p.size = MIN_READ_BUFFER;
  514. s->opaque = opaque;
  515. s->wb_size = 0;
  516. s->warn_size = 0;
  517. check_wb_list(&s->high);
  518. check_wb_list(&s->low);
  519. s->dw_buffer = NULL;
  520. s->dw_size = 0;
  521. memset(&s->stat, 0, sizeof(s->stat));
  522. if (enable_read(ss, s, reading)) {
  523. ATOM_STORE(&s->type , SOCKET_TYPE_INVALID);
  524. return NULL;
  525. }
  526. return s;
  527. }
  528. static inline void
  529. stat_read(struct socket_server *ss, struct socket *s, int n) {
  530. s->stat.read += n;
  531. s->stat.rtime = ss->time;
  532. }
  533. static inline void
  534. stat_write(struct socket_server *ss, struct socket *s, int n) {
  535. s->stat.write += n;
  536. s->stat.wtime = ss->time;
  537. }
  538. // return -1 when connecting
  539. static int
  540. open_socket(struct socket_server *ss, struct request_open * request, struct socket_message *result) {
  541. int id = request->id;
  542. result->opaque = request->opaque;
  543. result->id = id;
  544. result->ud = 0;
  545. result->data = NULL;
  546. struct socket *ns;
  547. int status;
  548. struct addrinfo ai_hints;
  549. struct addrinfo *ai_list = NULL;
  550. struct addrinfo *ai_ptr = NULL;
  551. char port[16];
  552. sprintf(port, "%d", request->port);
  553. memset(&ai_hints, 0, sizeof( ai_hints ) );
  554. ai_hints.ai_family = AF_UNSPEC;
  555. ai_hints.ai_socktype = SOCK_STREAM;
  556. ai_hints.ai_protocol = IPPROTO_TCP;
  557. status = getaddrinfo( request->host, port, &ai_hints, &ai_list );
  558. if ( status != 0 ) {
  559. result->data = (void *)gai_strerror(status);
  560. goto _failed_getaddrinfo;
  561. }
  562. int sock= -1;
  563. for (ai_ptr = ai_list; ai_ptr != NULL; ai_ptr = ai_ptr->ai_next ) {
  564. sock = socket( ai_ptr->ai_family, ai_ptr->ai_socktype, ai_ptr->ai_protocol );
  565. if ( sock < 0 ) {
  566. continue;
  567. }
  568. socket_keepalive(sock);
  569. sp_nonblocking(sock);
  570. status = connect( sock, ai_ptr->ai_addr, ai_ptr->ai_addrlen);
  571. if ( status != 0 && errno != EINPROGRESS) {
  572. close(sock);
  573. sock = -1;
  574. continue;
  575. }
  576. break;
  577. }
  578. if (sock < 0) {
  579. result->data = strerror(errno);
  580. goto _failed;
  581. }
  582. ns = new_fd(ss, id, sock, PROTOCOL_TCP, request->opaque, true);
  583. if (ns == NULL) {
  584. result->data = "reach skynet socket number limit";
  585. goto _failed;
  586. }
  587. if(status == 0) {
  588. ATOM_STORE(&ns->type , SOCKET_TYPE_CONNECTED);
  589. struct sockaddr * addr = ai_ptr->ai_addr;
  590. void * sin_addr = (ai_ptr->ai_family == AF_INET) ? (void*)&((struct sockaddr_in *)addr)->sin_addr : (void*)&((struct sockaddr_in6 *)addr)->sin6_addr;
  591. if (inet_ntop(ai_ptr->ai_family, sin_addr, ss->buffer, sizeof(ss->buffer))) {
  592. result->data = ss->buffer;
  593. }
  594. freeaddrinfo( ai_list );
  595. return SOCKET_OPEN;
  596. } else {
  597. if (enable_write(ss, ns, true)) {
  598. result->data = "enable write failed";
  599. goto _failed;
  600. }
  601. ATOM_STORE(&ns->type , SOCKET_TYPE_CONNECTING);
  602. }
  603. freeaddrinfo( ai_list );
  604. return -1;
  605. _failed:
  606. if (sock >= 0)
  607. close(sock);
  608. freeaddrinfo( ai_list );
  609. _failed_getaddrinfo:
  610. ATOM_STORE(&ss->slot[HASH_ID(id)].type, SOCKET_TYPE_INVALID);
  611. return SOCKET_ERR;
  612. }
  613. static int
  614. report_error(struct socket *s, struct socket_message *result, const char *err) {
  615. result->id = s->id;
  616. result->ud = 0;
  617. result->opaque = s->opaque;
  618. result->data = (char *)err;
  619. return SOCKET_ERR;
  620. }
  621. static int
  622. close_write(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
  623. if (s->closing) {
  624. force_close(ss,s,l,result);
  625. return SOCKET_RST;
  626. } else {
  627. int t = ATOM_LOAD(&s->type);
  628. if (t == SOCKET_TYPE_HALFCLOSE_READ) {
  629. // recv 0 before, ignore the error and close fd
  630. force_close(ss,s,l,result);
  631. return SOCKET_RST;
  632. }
  633. if (t == SOCKET_TYPE_HALFCLOSE_WRITE) {
  634. // already raise SOCKET_ERR
  635. return SOCKET_RST;
  636. }
  637. ATOM_STORE(&s->type, SOCKET_TYPE_HALFCLOSE_WRITE);
  638. shutdown(s->fd, SHUT_WR);
  639. enable_write(ss, s, false);
  640. return report_error(s, result, strerror(errno));
  641. }
  642. }
  643. static int
  644. send_list_tcp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_lock *l, struct socket_message *result) {
  645. while (list->head) {
  646. struct write_buffer * tmp = list->head;
  647. for (;;) {
  648. ssize_t sz = write(s->fd, tmp->ptr, tmp->sz);
  649. if (sz < 0) {
  650. switch(errno) {
  651. case EINTR:
  652. continue;
  653. case AGAIN_WOULDBLOCK:
  654. return -1;
  655. }
  656. return close_write(ss, s, l, result);
  657. }
  658. stat_write(ss,s,(int)sz);
  659. s->wb_size -= sz;
  660. if (sz != tmp->sz) {
  661. tmp->ptr += sz;
  662. tmp->sz -= sz;
  663. return -1;
  664. }
  665. break;
  666. }
  667. list->head = tmp->next;
  668. write_buffer_free(ss,tmp);
  669. }
  670. list->tail = NULL;
  671. return -1;
  672. }
  673. static socklen_t
  674. udp_socket_address(struct socket *s, const uint8_t udp_address[UDP_ADDRESS_SIZE], union sockaddr_all *sa) {
  675. int type = (uint8_t)udp_address[0];
  676. if (type != s->protocol)
  677. return 0;
  678. uint16_t port = 0;
  679. memcpy(&port, udp_address+1, sizeof(uint16_t));
  680. switch (s->protocol) {
  681. case PROTOCOL_UDP:
  682. memset(&sa->v4, 0, sizeof(sa->v4));
  683. sa->s.sa_family = AF_INET;
  684. sa->v4.sin_port = port;
  685. memcpy(&sa->v4.sin_addr, udp_address + 1 + sizeof(uint16_t), sizeof(sa->v4.sin_addr)); // ipv4 address is 32 bits
  686. return sizeof(sa->v4);
  687. case PROTOCOL_UDPv6:
  688. memset(&sa->v6, 0, sizeof(sa->v6));
  689. sa->s.sa_family = AF_INET6;
  690. sa->v6.sin6_port = port;
  691. memcpy(&sa->v6.sin6_addr, udp_address + 1 + sizeof(uint16_t), sizeof(sa->v6.sin6_addr)); // ipv6 address is 128 bits
  692. return sizeof(sa->v6);
  693. }
  694. return 0;
  695. }
  696. static void
  697. drop_udp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct write_buffer *tmp) {
  698. s->wb_size -= tmp->sz;
  699. list->head = tmp->next;
  700. if (list->head == NULL)
  701. list->tail = NULL;
  702. write_buffer_free(ss,tmp);
  703. }
  704. static int
  705. send_list_udp(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_message *result) {
  706. while (list->head) {
  707. struct write_buffer * tmp = list->head;
  708. struct write_buffer_udp * udp = (struct write_buffer_udp *)tmp;
  709. union sockaddr_all sa;
  710. socklen_t sasz = udp_socket_address(s, udp->udp_address, &sa);
  711. if (sasz == 0) {
  712. skynet_error(NULL, "socket-server : udp (%d) type mismatch.", s->id);
  713. drop_udp(ss, s, list, tmp);
  714. return -1;
  715. }
  716. int err = sendto(s->fd, tmp->ptr, tmp->sz, 0, &sa.s, sasz);
  717. if (err < 0) {
  718. switch(errno) {
  719. case EINTR:
  720. case AGAIN_WOULDBLOCK:
  721. return -1;
  722. }
  723. skynet_error(NULL, "socket-server : udp (%d) sendto error %s.",s->id, strerror(errno));
  724. drop_udp(ss, s, list, tmp);
  725. return -1;
  726. }
  727. stat_write(ss,s,tmp->sz);
  728. s->wb_size -= tmp->sz;
  729. list->head = tmp->next;
  730. write_buffer_free(ss,tmp);
  731. }
  732. list->tail = NULL;
  733. return -1;
  734. }
  735. static int
  736. send_list(struct socket_server *ss, struct socket *s, struct wb_list *list, struct socket_lock *l, struct socket_message *result) {
  737. if (s->protocol == PROTOCOL_TCP) {
  738. return send_list_tcp(ss, s, list, l, result);
  739. } else {
  740. return send_list_udp(ss, s, list, result);
  741. }
  742. }
  743. static inline int
  744. list_uncomplete(struct wb_list *s) {
  745. struct write_buffer *wb = s->head;
  746. if (wb == NULL)
  747. return 0;
  748. return (void *)wb->ptr != wb->buffer;
  749. }
  750. static void
  751. raise_uncomplete(struct socket * s) {
  752. struct wb_list *low = &s->low;
  753. struct write_buffer *tmp = low->head;
  754. low->head = tmp->next;
  755. if (low->head == NULL) {
  756. low->tail = NULL;
  757. }
  758. // move head of low list (tmp) to the empty high list
  759. struct wb_list *high = &s->high;
  760. assert(high->head == NULL);
  761. tmp->next = NULL;
  762. high->head = high->tail = tmp;
  763. }
  764. static inline int
  765. send_buffer_empty(struct socket *s) {
  766. return (s->high.head == NULL && s->low.head == NULL);
  767. }
  768. /*
  769. Each socket has two write buffer list, high priority and low priority.
  770. 1. send high list as far as possible.
  771. 2. If high list is empty, try to send low list.
  772. 3. If low list head is uncomplete (send a part before), move the head of low list to empty high list (call raise_uncomplete) .
  773. 4. If two lists are both empty, turn off the event. (call check_close)
  774. */
  775. static int
  776. send_buffer_(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
  777. assert(!list_uncomplete(&s->low));
  778. // step 1
  779. int ret = send_list(ss,s,&s->high,l,result);
  780. if (ret != -1) {
  781. if (ret == SOCKET_ERR) {
  782. // HALFCLOSE_WRITE
  783. return SOCKET_ERR;
  784. }
  785. // SOCKET_RST (ignore)
  786. return -1;
  787. }
  788. if (s->high.head == NULL) {
  789. // step 2
  790. if (s->low.head != NULL) {
  791. int ret = send_list(ss,s,&s->low,l,result);
  792. if (ret != -1) {
  793. if (ret == SOCKET_ERR) {
  794. // HALFCLOSE_WRITE
  795. return SOCKET_ERR;
  796. }
  797. // SOCKET_RST (ignore)
  798. return -1;
  799. }
  800. // step 3
  801. if (list_uncomplete(&s->low)) {
  802. raise_uncomplete(s);
  803. return -1;
  804. }
  805. if (s->low.head)
  806. return -1;
  807. }
  808. // step 4
  809. assert(send_buffer_empty(s) && s->wb_size == 0);
  810. if (s->closing) {
  811. // finish writing
  812. force_close(ss, s, l, result);
  813. return -1;
  814. }
  815. int err = enable_write(ss, s, false);
  816. if (err) {
  817. return report_error(s, result, "disable write failed");
  818. }
  819. if(s->warn_size > 0){
  820. s->warn_size = 0;
  821. result->opaque = s->opaque;
  822. result->id = s->id;
  823. result->ud = 0;
  824. result->data = NULL;
  825. return SOCKET_WARNING;
  826. }
  827. }
  828. return -1;
  829. }
  830. static int
  831. send_buffer(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
  832. if (!socket_trylock(l))
  833. return -1; // blocked by direct write, send later.
  834. if (s->dw_buffer) {
  835. // add direct write buffer before high.head
  836. struct write_buffer * buf = MALLOC(sizeof(*buf));
  837. struct send_object so;
  838. buf->userobject = send_object_init(ss, &so, (void *)s->dw_buffer, s->dw_size);
  839. buf->ptr = (char*)so.buffer+s->dw_offset;
  840. buf->sz = so.sz - s->dw_offset;
  841. buf->buffer = (void *)s->dw_buffer;
  842. s->wb_size+=buf->sz;
  843. if (s->high.head == NULL) {
  844. s->high.head = s->high.tail = buf;
  845. buf->next = NULL;
  846. } else {
  847. buf->next = s->high.head;
  848. s->high.head = buf;
  849. }
  850. s->dw_buffer = NULL;
  851. }
  852. int r = send_buffer_(ss,s,l,result);
  853. socket_unlock(l);
  854. return r;
  855. }
  856. static struct write_buffer *
  857. append_sendbuffer_(struct socket_server *ss, struct wb_list *s, struct request_send * request, int size) {
  858. struct write_buffer * buf = MALLOC(size);
  859. struct send_object so;
  860. buf->userobject = send_object_init(ss, &so, request->buffer, request->sz);
  861. buf->ptr = (char*)so.buffer;
  862. buf->sz = so.sz;
  863. buf->buffer = request->buffer;
  864. buf->next = NULL;
  865. if (s->head == NULL) {
  866. s->head = s->tail = buf;
  867. } else {
  868. assert(s->tail != NULL);
  869. assert(s->tail->next == NULL);
  870. s->tail->next = buf;
  871. s->tail = buf;
  872. }
  873. return buf;
  874. }
  875. static inline void
  876. append_sendbuffer_udp(struct socket_server *ss, struct socket *s, int priority, struct request_send * request, const uint8_t udp_address[UDP_ADDRESS_SIZE]) {
  877. struct wb_list *wl = (priority == PRIORITY_HIGH) ? &s->high : &s->low;
  878. struct write_buffer_udp *buf = (struct write_buffer_udp *)append_sendbuffer_(ss, wl, request, sizeof(*buf));
  879. memcpy(buf->udp_address, udp_address, UDP_ADDRESS_SIZE);
  880. s->wb_size += buf->buffer.sz;
  881. }
  882. static inline void
  883. append_sendbuffer(struct socket_server *ss, struct socket *s, struct request_send * request) {
  884. struct write_buffer *buf = append_sendbuffer_(ss, &s->high, request, sizeof(*buf));
  885. s->wb_size += buf->sz;
  886. }
  887. static inline void
  888. append_sendbuffer_low(struct socket_server *ss,struct socket *s, struct request_send * request) {
  889. struct write_buffer *buf = append_sendbuffer_(ss, &s->low, request, sizeof(*buf));
  890. s->wb_size += buf->sz;
  891. }
  892. static int
  893. trigger_write(struct socket_server *ss, struct request_send * request, struct socket_message *result) {
  894. int id = request->id;
  895. struct socket * s = &ss->slot[HASH_ID(id)];
  896. if (socket_invalid(s, id))
  897. return -1;
  898. if (enable_write(ss, s, true)) {
  899. return report_error(s, result, "enable write failed");
  900. }
  901. return -1;
  902. }
  903. /*
  904. When send a package , we can assign the priority : PRIORITY_HIGH or PRIORITY_LOW
  905. If socket buffer is empty, write to fd directly.
  906. If write a part, append the rest part to high list. (Even priority is PRIORITY_LOW)
  907. Else append package to high (PRIORITY_HIGH) or low (PRIORITY_LOW) list.
  908. */
  909. static int
  910. send_socket(struct socket_server *ss, struct request_send * request, struct socket_message *result, int priority, const uint8_t *udp_address) {
  911. int id = request->id;
  912. struct socket * s = &ss->slot[HASH_ID(id)];
  913. struct send_object so;
  914. send_object_init(ss, &so, request->buffer, request->sz);
  915. uint8_t type = ATOM_LOAD(&s->type);
  916. if (type == SOCKET_TYPE_INVALID || s->id != id
  917. || type == SOCKET_TYPE_HALFCLOSE_WRITE
  918. || type == SOCKET_TYPE_PACCEPT
  919. || s->closing) {
  920. so.free_func((void *)request->buffer);
  921. return -1;
  922. }
  923. if (type == SOCKET_TYPE_PLISTEN || type == SOCKET_TYPE_LISTEN) {
  924. skynet_error(NULL, "socket-server: write to listen fd %d.", id);
  925. so.free_func((void *)request->buffer);
  926. return -1;
  927. }
  928. if (send_buffer_empty(s)) {
  929. if (s->protocol == PROTOCOL_TCP) {
  930. append_sendbuffer(ss, s, request); // add to high priority list, even priority == PRIORITY_LOW
  931. } else {
  932. // udp
  933. if (udp_address == NULL) {
  934. udp_address = s->p.udp_address;
  935. }
  936. union sockaddr_all sa;
  937. socklen_t sasz = udp_socket_address(s, udp_address, &sa);
  938. if (sasz == 0) {
  939. // udp type mismatch, just drop it.
  940. skynet_error(NULL, "socket-server: udp socket (%d) type mismatch.", id);
  941. so.free_func((void *)request->buffer);
  942. return -1;
  943. }
  944. int n = sendto(s->fd, so.buffer, so.sz, 0, &sa.s, sasz);
  945. if (n != so.sz) {
  946. append_sendbuffer_udp(ss,s,priority,request,udp_address);
  947. } else {
  948. stat_write(ss,s,n);
  949. so.free_func((void *)request->buffer);
  950. return -1;
  951. }
  952. }
  953. if (enable_write(ss, s, true)) {
  954. return report_error(s, result, "enable write failed");
  955. }
  956. } else {
  957. if (s->protocol == PROTOCOL_TCP) {
  958. if (priority == PRIORITY_LOW) {
  959. append_sendbuffer_low(ss, s, request);
  960. } else {
  961. append_sendbuffer(ss, s, request);
  962. }
  963. } else {
  964. if (udp_address == NULL) {
  965. udp_address = s->p.udp_address;
  966. }
  967. append_sendbuffer_udp(ss,s,priority,request,udp_address);
  968. }
  969. }
  970. if (s->wb_size >= WARNING_SIZE && s->wb_size >= s->warn_size) {
  971. s->warn_size = s->warn_size == 0 ? WARNING_SIZE *2 : s->warn_size*2;
  972. result->opaque = s->opaque;
  973. result->id = s->id;
  974. result->ud = s->wb_size%1024 == 0 ? s->wb_size/1024 : s->wb_size/1024 + 1;
  975. result->data = NULL;
  976. return SOCKET_WARNING;
  977. }
  978. return -1;
  979. }
  980. static int
  981. listen_socket(struct socket_server *ss, struct request_listen * request, struct socket_message *result) {
  982. int id = request->id;
  983. int listen_fd = request->fd;
  984. struct socket *s = new_fd(ss, id, listen_fd, PROTOCOL_TCP, request->opaque, false);
  985. if (s == NULL) {
  986. goto _failed;
  987. }
  988. ATOM_STORE(&s->type , SOCKET_TYPE_PLISTEN);
  989. result->opaque = request->opaque;
  990. result->id = id;
  991. result->ud = 0;
  992. result->data = "listen";
  993. union sockaddr_all u;
  994. socklen_t slen = sizeof(u);
  995. if (getsockname(listen_fd, &u.s, &slen) == 0) {
  996. void * sin_addr = (u.s.sa_family == AF_INET) ? (void*)&u.v4.sin_addr : (void *)&u.v6.sin6_addr;
  997. if (inet_ntop(u.s.sa_family, sin_addr, ss->buffer, sizeof(ss->buffer)) == 0) {
  998. result->data = strerror(errno);
  999. return SOCKET_ERR;
  1000. }
  1001. int sin_port = ntohs((u.s.sa_family == AF_INET) ? u.v4.sin_port : u.v6.sin6_port);
  1002. result->data = ss->buffer;
  1003. result->ud = sin_port;
  1004. } else {
  1005. result->data = strerror(errno);
  1006. return SOCKET_ERR;
  1007. }
  1008. return SOCKET_OPEN;
  1009. _failed:
  1010. close(listen_fd);
  1011. result->opaque = request->opaque;
  1012. result->id = id;
  1013. result->ud = 0;
  1014. result->data = "reach skynet socket number limit";
  1015. ss->slot[HASH_ID(id)].type = SOCKET_TYPE_INVALID;
  1016. return SOCKET_ERR;
  1017. }
  1018. static inline int
  1019. nomore_sending_data(struct socket *s) {
  1020. return (send_buffer_empty(s) && s->dw_buffer == NULL && (ATOM_LOAD(&s->sending) & 0xffff) == 0)
  1021. || (ATOM_LOAD(&s->type) == SOCKET_TYPE_HALFCLOSE_WRITE);
  1022. }
  1023. static void
  1024. close_read(struct socket_server *ss, struct socket * s, struct socket_message *result) {
  1025. // Don't read socket later
  1026. ATOM_STORE(&s->type , SOCKET_TYPE_HALFCLOSE_READ);
  1027. enable_read(ss,s,false);
  1028. shutdown(s->fd, SHUT_RD);
  1029. result->id = s->id;
  1030. result->ud = 0;
  1031. result->data = NULL;
  1032. result->opaque = s->opaque;
  1033. }
  1034. static inline int
  1035. halfclose_read(struct socket *s) {
  1036. return ATOM_LOAD(&s->type) == SOCKET_TYPE_HALFCLOSE_READ;
  1037. }
  1038. // SOCKET_CLOSE can be raised (only once) in one of two conditions.
  1039. // See https://github.com/cloudwu/skynet/issues/1346 for more discussion.
  1040. // 1. close socket by self, See close_socket()
  1041. // 2. recv 0 or eof event (close socket by remote), See forward_message_tcp()
  1042. // It's able to write data after SOCKET_CLOSE (In condition 2), but if remote is closed, SOCKET_ERR may raised.
  1043. static int
  1044. close_socket(struct socket_server *ss, struct request_close *request, struct socket_message *result) {
  1045. int id = request->id;
  1046. struct socket * s = &ss->slot[HASH_ID(id)];
  1047. if (socket_invalid(s, id)) {
  1048. // The socket is closed, ignore
  1049. return -1;
  1050. }
  1051. struct socket_lock l;
  1052. socket_lock_init(s, &l);
  1053. int shutdown_read = halfclose_read(s);
  1054. if (request->shutdown || nomore_sending_data(s)) {
  1055. // If socket is SOCKET_TYPE_HALFCLOSE_READ, Do not raise SOCKET_CLOSE again.
  1056. int r = shutdown_read ? -1 : SOCKET_CLOSE;
  1057. force_close(ss,s,&l,result);
  1058. return r;
  1059. }
  1060. s->closing = true;
  1061. if (!shutdown_read) {
  1062. // don't read socket after socket.close()
  1063. close_read(ss, s, result);
  1064. return SOCKET_CLOSE;
  1065. }
  1066. // recv 0 before (socket is SOCKET_TYPE_HALFCLOSE_READ) and waiting for sending data out.
  1067. return -1;
  1068. }
  1069. static int
  1070. bind_socket(struct socket_server *ss, struct request_bind *request, struct socket_message *result) {
  1071. int id = request->id;
  1072. result->id = id;
  1073. result->opaque = request->opaque;
  1074. result->ud = 0;
  1075. struct socket *s = new_fd(ss, id, request->fd, PROTOCOL_TCP, request->opaque, true);
  1076. if (s == NULL) {
  1077. result->data = "reach skynet socket number limit";
  1078. return SOCKET_ERR;
  1079. }
  1080. sp_nonblocking(request->fd);
  1081. ATOM_STORE(&s->type , SOCKET_TYPE_BIND);
  1082. result->data = "binding";
  1083. return SOCKET_OPEN;
  1084. }
  1085. static int
  1086. resume_socket(struct socket_server *ss, struct request_resumepause *request, struct socket_message *result) {
  1087. int id = request->id;
  1088. result->id = id;
  1089. result->opaque = request->opaque;
  1090. result->ud = 0;
  1091. result->data = NULL;
  1092. struct socket *s = &ss->slot[HASH_ID(id)];
  1093. if (socket_invalid(s, id)) {
  1094. result->data = "invalid socket";
  1095. return SOCKET_ERR;
  1096. }
  1097. if (halfclose_read(s)) {
  1098. // The closing socket may be in transit, so raise an error. See https://github.com/cloudwu/skynet/issues/1374
  1099. result->data = "socket closed";
  1100. return SOCKET_ERR;
  1101. }
  1102. struct socket_lock l;
  1103. socket_lock_init(s, &l);
  1104. if (enable_read(ss, s, true)) {
  1105. result->data = "enable read failed";
  1106. return SOCKET_ERR;
  1107. }
  1108. uint8_t type = ATOM_LOAD(&s->type);
  1109. if (type == SOCKET_TYPE_PACCEPT || type == SOCKET_TYPE_PLISTEN) {
  1110. ATOM_STORE(&s->type , (type == SOCKET_TYPE_PACCEPT) ? SOCKET_TYPE_CONNECTED : SOCKET_TYPE_LISTEN);
  1111. s->opaque = request->opaque;
  1112. result->data = "start";
  1113. return SOCKET_OPEN;
  1114. } else if (type == SOCKET_TYPE_CONNECTED) {
  1115. // todo: maybe we should send a message SOCKET_TRANSFER to s->opaque
  1116. s->opaque = request->opaque;
  1117. result->data = "transfer";
  1118. return SOCKET_OPEN;
  1119. }
  1120. // if s->type == SOCKET_TYPE_HALFCLOSE_WRITE , SOCKET_CLOSE message will send later
  1121. return -1;
  1122. }
  1123. static int
  1124. pause_socket(struct socket_server *ss, struct request_resumepause *request, struct socket_message *result) {
  1125. int id = request->id;
  1126. struct socket *s = &ss->slot[HASH_ID(id)];
  1127. if (socket_invalid(s, id)) {
  1128. return -1;
  1129. }
  1130. if (enable_read(ss, s, false)) {
  1131. return report_error(s, result, "enable read failed");
  1132. }
  1133. return -1;
  1134. }
  1135. static void
  1136. setopt_socket(struct socket_server *ss, struct request_setopt *request) {
  1137. int id = request->id;
  1138. struct socket *s = &ss->slot[HASH_ID(id)];
  1139. if (socket_invalid(s, id)) {
  1140. return;
  1141. }
  1142. int v = request->value;
  1143. setsockopt(s->fd, IPPROTO_TCP, request->what, &v, sizeof(v));
  1144. }
  1145. static void
  1146. block_readpipe(int pipefd, void *buffer, int sz) {
  1147. for (;;) {
  1148. int n = read(pipefd, buffer, sz);
  1149. if (n<0) {
  1150. if (errno == EINTR)
  1151. continue;
  1152. skynet_error(NULL, "socket-server : read pipe error %s.",strerror(errno));
  1153. return;
  1154. }
  1155. // must atomic read from a pipe
  1156. assert(n == sz);
  1157. return;
  1158. }
  1159. }
  1160. static int
  1161. has_cmd(struct socket_server *ss) {
  1162. struct timeval tv = {0,0};
  1163. int retval;
  1164. FD_SET(ss->recvctrl_fd, &ss->rfds);
  1165. retval = select(ss->recvctrl_fd+1, &ss->rfds, NULL, NULL, &tv);
  1166. if (retval == 1) {
  1167. return 1;
  1168. }
  1169. return 0;
  1170. }
  1171. static void
  1172. add_udp_socket(struct socket_server *ss, struct request_udp *udp) {
  1173. int id = udp->id;
  1174. int protocol;
  1175. if (udp->family == AF_INET6) {
  1176. protocol = PROTOCOL_UDPv6;
  1177. } else {
  1178. protocol = PROTOCOL_UDP;
  1179. }
  1180. struct socket *ns = new_fd(ss, id, udp->fd, protocol, udp->opaque, true);
  1181. if (ns == NULL) {
  1182. close(udp->fd);
  1183. ss->slot[HASH_ID(id)].type = SOCKET_TYPE_INVALID;
  1184. return;
  1185. }
  1186. ATOM_STORE(&ns->type , SOCKET_TYPE_CONNECTED);
  1187. memset(ns->p.udp_address, 0, sizeof(ns->p.udp_address));
  1188. }
  1189. static int
  1190. set_udp_address(struct socket_server *ss, struct request_setudp *request, struct socket_message *result) {
  1191. int id = request->id;
  1192. struct socket *s = &ss->slot[HASH_ID(id)];
  1193. if (socket_invalid(s, id)) {
  1194. return -1;
  1195. }
  1196. int type = request->address[0];
  1197. if (type != s->protocol) {
  1198. // protocol mismatch
  1199. return report_error(s, result, "protocol mismatch");
  1200. }
  1201. if (type == PROTOCOL_UDP) {
  1202. memcpy(s->p.udp_address, request->address, 1+2+4); // 1 type, 2 port, 4 ipv4
  1203. } else {
  1204. memcpy(s->p.udp_address, request->address, 1+2+16); // 1 type, 2 port, 16 ipv6
  1205. }
  1206. ATOM_FDEC(&s->udpconnecting);
  1207. return -1;
  1208. }
  1209. static inline void
  1210. inc_sending_ref(struct socket *s, int id) {
  1211. if (s->protocol != PROTOCOL_TCP)
  1212. return;
  1213. for (;;) {
  1214. unsigned long sending = ATOM_LOAD(&s->sending);
  1215. if ((sending >> 16) == ID_TAG16(id)) {
  1216. if ((sending & 0xffff) == 0xffff) {
  1217. // s->sending may overflow (rarely), so busy waiting here for socket thread dec it. see issue #794
  1218. continue;
  1219. }
  1220. // inc sending only matching the same socket id
  1221. if (ATOM_CAS_ULONG(&s->sending, sending, sending + 1))
  1222. return;
  1223. // atom inc failed, retry
  1224. } else {
  1225. // socket id changed, just return
  1226. return;
  1227. }
  1228. }
  1229. }
  1230. static inline void
  1231. dec_sending_ref(struct socket_server *ss, int id) {
  1232. struct socket * s = &ss->slot[HASH_ID(id)];
  1233. // Notice: udp may inc sending while type == SOCKET_TYPE_RESERVE
  1234. if (s->id == id && s->protocol == PROTOCOL_TCP) {
  1235. assert((ATOM_LOAD(&s->sending) & 0xffff) != 0);
  1236. ATOM_FDEC(&s->sending);
  1237. }
  1238. }
  1239. // return type
  1240. static int
  1241. ctrl_cmd(struct socket_server *ss, struct socket_message *result) {
  1242. int fd = ss->recvctrl_fd;
  1243. // the length of message is one byte, so 256 buffer size is enough.
  1244. uint8_t buffer[256];
  1245. uint8_t header[2];
  1246. block_readpipe(fd, header, sizeof(header));
  1247. int type = header[0];
  1248. int len = header[1];
  1249. block_readpipe(fd, buffer, len);
  1250. // ctrl command only exist in local fd, so don't worry about endian.
  1251. switch (type) {
  1252. case 'R':
  1253. return resume_socket(ss,(struct request_resumepause *)buffer, result);
  1254. case 'S':
  1255. return pause_socket(ss,(struct request_resumepause *)buffer, result);
  1256. case 'B':
  1257. return bind_socket(ss,(struct request_bind *)buffer, result);
  1258. case 'L':
  1259. return listen_socket(ss,(struct request_listen *)buffer, result);
  1260. case 'K':
  1261. return close_socket(ss,(struct request_close *)buffer, result);
  1262. case 'O':
  1263. return open_socket(ss, (struct request_open *)buffer, result);
  1264. case 'X':
  1265. result->opaque = 0;
  1266. result->id = 0;
  1267. result->ud = 0;
  1268. result->data = NULL;
  1269. return SOCKET_EXIT;
  1270. case 'W':
  1271. return trigger_write(ss, (struct request_send *)buffer, result);
  1272. case 'D':
  1273. case 'P': {
  1274. int priority = (type == 'D') ? PRIORITY_HIGH : PRIORITY_LOW;
  1275. struct request_send * request = (struct request_send *) buffer;
  1276. int ret = send_socket(ss, request, result, priority, NULL);
  1277. dec_sending_ref(ss, request->id);
  1278. return ret;
  1279. }
  1280. case 'A': {
  1281. struct request_send_udp * rsu = (struct request_send_udp *)buffer;
  1282. return send_socket(ss, &rsu->send, result, PRIORITY_HIGH, rsu->address);
  1283. }
  1284. case 'C':
  1285. return set_udp_address(ss, (struct request_setudp *)buffer, result);
  1286. case 'T':
  1287. setopt_socket(ss, (struct request_setopt *)buffer);
  1288. return -1;
  1289. case 'U':
  1290. add_udp_socket(ss, (struct request_udp *)buffer);
  1291. return -1;
  1292. default:
  1293. skynet_error(NULL, "socket-server: Unknown ctrl %c.",type);
  1294. return -1;
  1295. };
  1296. return -1;
  1297. }
  1298. // return -1 (ignore) when error
  1299. static int
  1300. forward_message_tcp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
  1301. int sz = s->p.size;
  1302. char * buffer = MALLOC(sz);
  1303. int n = (int)read(s->fd, buffer, sz);
  1304. if (n<0) {
  1305. FREE(buffer);
  1306. switch(errno) {
  1307. case EINTR:
  1308. case AGAIN_WOULDBLOCK:
  1309. break;
  1310. default:
  1311. return report_error(s, result, strerror(errno));
  1312. }
  1313. return -1;
  1314. }
  1315. if (n==0) {
  1316. FREE(buffer);
  1317. if (s->closing) {
  1318. // Rare case : if s->closing is true, reading event is disable, and SOCKET_CLOSE is raised.
  1319. if (nomore_sending_data(s)) {
  1320. force_close(ss,s,l,result);
  1321. }
  1322. return -1;
  1323. }
  1324. int t = ATOM_LOAD(&s->type);
  1325. if (t == SOCKET_TYPE_HALFCLOSE_READ) {
  1326. // Rare case : Already shutdown read.
  1327. return -1;
  1328. }
  1329. if (t == SOCKET_TYPE_HALFCLOSE_WRITE) {
  1330. // Remote shutdown read (write error) before.
  1331. force_close(ss,s,l,result);
  1332. } else {
  1333. close_read(ss, s, result);
  1334. }
  1335. return SOCKET_CLOSE;
  1336. }
  1337. if (halfclose_read(s)) {
  1338. // discard recv data (Rare case : if socket is HALFCLOSE_READ, reading event is disable.)
  1339. FREE(buffer);
  1340. return -1;
  1341. }
  1342. stat_read(ss,s,n);
  1343. result->opaque = s->opaque;
  1344. result->id = s->id;
  1345. result->ud = n;
  1346. result->data = buffer;
  1347. if (n == sz) {
  1348. s->p.size *= 2;
  1349. return SOCKET_MORE;
  1350. } else if (sz > MIN_READ_BUFFER && n*2 < sz) {
  1351. s->p.size /= 2;
  1352. }
  1353. return SOCKET_DATA;
  1354. }
  1355. static int
  1356. gen_udp_address(int protocol, union sockaddr_all *sa, uint8_t * udp_address) {
  1357. int addrsz = 1;
  1358. udp_address[0] = (uint8_t)protocol;
  1359. if (protocol == PROTOCOL_UDP) {
  1360. memcpy(udp_address+addrsz, &sa->v4.sin_port, sizeof(sa->v4.sin_port));
  1361. addrsz += sizeof(sa->v4.sin_port);
  1362. memcpy(udp_address+addrsz, &sa->v4.sin_addr, sizeof(sa->v4.sin_addr));
  1363. addrsz += sizeof(sa->v4.sin_addr);
  1364. } else {
  1365. memcpy(udp_address+addrsz, &sa->v6.sin6_port, sizeof(sa->v6.sin6_port));
  1366. addrsz += sizeof(sa->v6.sin6_port);
  1367. memcpy(udp_address+addrsz, &sa->v6.sin6_addr, sizeof(sa->v6.sin6_addr));
  1368. addrsz += sizeof(sa->v6.sin6_addr);
  1369. }
  1370. return addrsz;
  1371. }
  1372. static int
  1373. forward_message_udp(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message * result) {
  1374. union sockaddr_all sa;
  1375. socklen_t slen = sizeof(sa);
  1376. int n = recvfrom(s->fd, ss->udpbuffer,MAX_UDP_PACKAGE,0,&sa.s,&slen);
  1377. if (n<0) {
  1378. switch(errno) {
  1379. case EINTR:
  1380. case AGAIN_WOULDBLOCK:
  1381. return -1;
  1382. }
  1383. int error = errno;
  1384. // close when error
  1385. force_close(ss, s, l, result);
  1386. result->data = strerror(error);
  1387. return SOCKET_ERR;
  1388. }
  1389. stat_read(ss,s,n);
  1390. uint8_t * data;
  1391. if (slen == sizeof(sa.v4)) {
  1392. if (s->protocol != PROTOCOL_UDP)
  1393. return -1;
  1394. data = MALLOC(n + 1 + 2 + 4);
  1395. gen_udp_address(PROTOCOL_UDP, &sa, data + n);
  1396. } else {
  1397. if (s->protocol != PROTOCOL_UDPv6)
  1398. return -1;
  1399. data = MALLOC(n + 1 + 2 + 16);
  1400. gen_udp_address(PROTOCOL_UDPv6, &sa, data + n);
  1401. }
  1402. memcpy(data, ss->udpbuffer, n);
  1403. result->opaque = s->opaque;
  1404. result->id = s->id;
  1405. result->ud = n;
  1406. result->data = (char *)data;
  1407. return SOCKET_UDP;
  1408. }
  1409. static int
  1410. report_connect(struct socket_server *ss, struct socket *s, struct socket_lock *l, struct socket_message *result) {
  1411. int error;
  1412. socklen_t len = sizeof(error);
  1413. int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);
  1414. if (code < 0 || error) {
  1415. error = code < 0 ? errno : error;
  1416. force_close(ss, s, l, result);
  1417. result->data = strerror(error);
  1418. return SOCKET_ERR;
  1419. } else {
  1420. ATOM_STORE(&s->type , SOCKET_TYPE_CONNECTED);
  1421. result->opaque = s->opaque;
  1422. result->id = s->id;
  1423. result->ud = 0;
  1424. if (nomore_sending_data(s)) {
  1425. if (enable_write(ss, s, false)) {
  1426. force_close(ss,s,l, result);
  1427. result->data = "disable write failed";
  1428. return SOCKET_ERR;
  1429. }
  1430. }
  1431. union sockaddr_all u;
  1432. socklen_t slen = sizeof(u);
  1433. if (getpeername(s->fd, &u.s, &slen) == 0) {
  1434. void * sin_addr = (u.s.sa_family == AF_INET) ? (void*)&u.v4.sin_addr : (void *)&u.v6.sin6_addr;
  1435. if (inet_ntop(u.s.sa_family, sin_addr, ss->buffer, sizeof(ss->buffer))) {
  1436. result->data = ss->buffer;
  1437. return SOCKET_OPEN;
  1438. }
  1439. }
  1440. result->data = NULL;
  1441. return SOCKET_OPEN;
  1442. }
  1443. }
  1444. static int
  1445. getname(union sockaddr_all *u, char *buffer, size_t sz) {
  1446. char tmp[INET6_ADDRSTRLEN];
  1447. void * sin_addr = (u->s.sa_family == AF_INET) ? (void*)&u->v4.sin_addr : (void *)&u->v6.sin6_addr;
  1448. if (inet_ntop(u->s.sa_family, sin_addr, tmp, sizeof(tmp))) {
  1449. int sin_port = ntohs((u->s.sa_family == AF_INET) ? u->v4.sin_port : u->v6.sin6_port);
  1450. snprintf(buffer, sz, "%s:%d", tmp, sin_port);
  1451. return 1;
  1452. } else {
  1453. buffer[0] = '\0';
  1454. return 0;
  1455. }
  1456. }
  1457. // return 0 when failed, or -1 when file limit
  1458. static int
  1459. report_accept(struct socket_server *ss, struct socket *s, struct socket_message *result) {
  1460. union sockaddr_all u;
  1461. socklen_t len = sizeof(u);
  1462. int client_fd = accept(s->fd, &u.s, &len);
  1463. if (client_fd < 0) {
  1464. if (errno == EMFILE || errno == ENFILE) {
  1465. result->opaque = s->opaque;
  1466. result->id = s->id;
  1467. result->ud = 0;
  1468. result->data = strerror(errno);
  1469. // See https://stackoverflow.com/questions/47179793/how-to-gracefully-handle-accept-giving-emfile-and-close-the-connection
  1470. if (ss->reserve_fd >= 0) {
  1471. close(ss->reserve_fd);
  1472. client_fd = accept(s->fd, &u.s, &len);
  1473. if (client_fd >= 0) {
  1474. close(client_fd);
  1475. }
  1476. ss->reserve_fd = dup(1);
  1477. }
  1478. return -1;
  1479. } else {
  1480. return 0;
  1481. }
  1482. }
  1483. int id = reserve_id(ss);
  1484. if (id < 0) {
  1485. close(client_fd);
  1486. return 0;
  1487. }
  1488. socket_keepalive(client_fd);
  1489. sp_nonblocking(client_fd);
  1490. struct socket *ns = new_fd(ss, id, client_fd, PROTOCOL_TCP, s->opaque, false);
  1491. if (ns == NULL) {
  1492. close(client_fd);
  1493. return 0;
  1494. }
  1495. // accept new one connection
  1496. stat_read(ss,s,1);
  1497. ATOM_STORE(&ns->type , SOCKET_TYPE_PACCEPT);
  1498. result->opaque = s->opaque;
  1499. result->id = s->id;
  1500. result->ud = id;
  1501. result->data = NULL;
  1502. if (getname(&u, ss->buffer, sizeof(ss->buffer))) {
  1503. result->data = ss->buffer;
  1504. }
  1505. return 1;
  1506. }
  1507. static inline void
  1508. clear_closed_event(struct socket_server *ss, struct socket_message * result, int type) {
  1509. if (type == SOCKET_CLOSE || type == SOCKET_ERR) {
  1510. int id = result->id;
  1511. int i;
  1512. for (i=ss->event_index; i<ss->event_n; i++) {
  1513. struct event *e = &ss->ev[i];
  1514. struct socket *s = e->s;
  1515. if (s) {
  1516. if (socket_invalid(s, id) && s->id == id) {
  1517. e->s = NULL;
  1518. break;
  1519. }
  1520. }
  1521. }
  1522. }
  1523. }
  1524. // return type
  1525. int
  1526. socket_server_poll(struct socket_server *ss, struct socket_message * result, int * more) {
  1527. for (;;) {
  1528. if (ss->checkctrl) {
  1529. if (has_cmd(ss)) {
  1530. int type = ctrl_cmd(ss, result);
  1531. if (type != -1) {
  1532. clear_closed_event(ss, result, type);
  1533. return type;
  1534. } else
  1535. continue;
  1536. } else {
  1537. ss->checkctrl = 0;
  1538. }
  1539. }
  1540. if (ss->event_index == ss->event_n) {
  1541. ss->event_n = sp_wait(ss->event_fd, ss->ev, MAX_EVENT);
  1542. ss->checkctrl = 1;
  1543. if (more) {
  1544. *more = 0;
  1545. }
  1546. ss->event_index = 0;
  1547. if (ss->event_n <= 0) {
  1548. ss->event_n = 0;
  1549. int err = errno;
  1550. if (err != EINTR) {
  1551. skynet_error(NULL, "socket-server: %s", strerror(err));
  1552. }
  1553. continue;
  1554. }
  1555. }
  1556. struct event *e = &ss->ev[ss->event_index++];
  1557. struct socket *s = e->s;
  1558. if (s == NULL) {
  1559. // dispatch pipe message at beginning
  1560. continue;
  1561. }
  1562. struct socket_lock l;
  1563. socket_lock_init(s, &l);
  1564. switch (ATOM_LOAD(&s->type)) {
  1565. case SOCKET_TYPE_CONNECTING:
  1566. return report_connect(ss, s, &l, result);
  1567. case SOCKET_TYPE_LISTEN: {
  1568. int ok = report_accept(ss, s, result);
  1569. if (ok > 0) {
  1570. return SOCKET_ACCEPT;
  1571. } if (ok < 0 ) {
  1572. return SOCKET_ERR;
  1573. }
  1574. // when ok == 0, retry
  1575. break;
  1576. }
  1577. case SOCKET_TYPE_INVALID:
  1578. skynet_error(NULL, "socket-server: invalid socket");
  1579. break;
  1580. default:
  1581. if (e->read) {
  1582. int type;
  1583. if (s->protocol == PROTOCOL_TCP) {
  1584. type = forward_message_tcp(ss, s, &l, result);
  1585. if (type == SOCKET_MORE) {
  1586. --ss->event_index;
  1587. return SOCKET_DATA;
  1588. }
  1589. } else {
  1590. type = forward_message_udp(ss, s, &l, result);
  1591. if (type == SOCKET_UDP) {
  1592. // try read again
  1593. --ss->event_index;
  1594. return SOCKET_UDP;
  1595. }
  1596. }
  1597. if (e->write && type != SOCKET_CLOSE && type != SOCKET_ERR) {
  1598. // Try to dispatch write message next step if write flag set.
  1599. e->read = false;
  1600. --ss->event_index;
  1601. }
  1602. if (type == -1)
  1603. break;
  1604. return type;
  1605. }
  1606. if (e->write) {
  1607. int type = send_buffer(ss, s, &l, result);
  1608. if (type == -1)
  1609. break;
  1610. return type;
  1611. }
  1612. if (e->error) {
  1613. int error;
  1614. socklen_t len = sizeof(error);
  1615. int code = getsockopt(s->fd, SOL_SOCKET, SO_ERROR, &error, &len);
  1616. const char * err = NULL;
  1617. if (code < 0) {
  1618. err = strerror(errno);
  1619. } else if (error != 0) {
  1620. err = strerror(error);
  1621. } else {
  1622. err = "Unknown error";
  1623. }
  1624. return report_error(s, result, err);
  1625. }
  1626. if (e->eof) {
  1627. // For epoll (at least), FIN packets are exchanged both ways.
  1628. // See: https://stackoverflow.com/questions/52976152/tcp-when-is-epollhup-generated
  1629. int halfclose = halfclose_read(s);
  1630. force_close(ss, s, &l, result);
  1631. if (!halfclose) {
  1632. return SOCKET_CLOSE;
  1633. }
  1634. }
  1635. break;
  1636. }
  1637. }
  1638. }
  1639. static void
  1640. send_request(struct socket_server *ss, struct request_package *request, char type, int len) {
  1641. request->header[6] = (uint8_t)type;
  1642. request->header[7] = (uint8_t)len;
  1643. const char * req = (const char *)request + offsetof(struct request_package, header[6]);
  1644. for (;;) {
  1645. ssize_t n = write(ss->sendctrl_fd, req, len+2);
  1646. if (n<0) {
  1647. if (errno != EINTR) {
  1648. skynet_error(NULL, "socket-server : send ctrl command error %s.", strerror(errno));
  1649. }
  1650. continue;
  1651. }
  1652. assert(n == len+2);
  1653. return;
  1654. }
  1655. }
  1656. static int
  1657. open_request(struct socket_server *ss, struct request_package *req, uintptr_t opaque, const char *addr, int port) {
  1658. int len = strlen(addr);
  1659. if (len + sizeof(req->u.open) >= 256) {
  1660. skynet_error(NULL, "socket-server : Invalid addr %s.",addr);
  1661. return -1;
  1662. }
  1663. int id = reserve_id(ss);
  1664. if (id < 0)
  1665. return -1;
  1666. req->u.open.opaque = opaque;
  1667. req->u.open.id = id;
  1668. req->u.open.port = port;
  1669. memcpy(req->u.open.host, addr, len);
  1670. req->u.open.host[len] = '\0';
  1671. return len;
  1672. }
  1673. int
  1674. socket_server_connect(struct socket_server *ss, uintptr_t opaque, const char * addr, int port) {
  1675. struct request_package request;
  1676. int len = open_request(ss, &request, opaque, addr, port);
  1677. if (len < 0)
  1678. return -1;
  1679. send_request(ss, &request, 'O', sizeof(request.u.open) + len);
  1680. return request.u.open.id;
  1681. }
  1682. static inline int
  1683. can_direct_write(struct socket *s, int id) {
  1684. return s->id == id && nomore_sending_data(s) && ATOM_LOAD(&s->type) == SOCKET_TYPE_CONNECTED && ATOM_LOAD(&s->udpconnecting) == 0;
  1685. }
  1686. // return -1 when error, 0 when success
  1687. int
  1688. socket_server_send(struct socket_server *ss, struct socket_sendbuffer *buf) {
  1689. int id = buf->id;
  1690. struct socket * s = &ss->slot[HASH_ID(id)];
  1691. if (socket_invalid(s, id) || s->closing) {
  1692. free_buffer(ss, buf);
  1693. return -1;
  1694. }
  1695. struct socket_lock l;
  1696. socket_lock_init(s, &l);
  1697. if (can_direct_write(s,id) && socket_trylock(&l)) {
  1698. // may be we can send directly, double check
  1699. if (can_direct_write(s,id)) {
  1700. // send directly
  1701. struct send_object so;
  1702. send_object_init_from_sendbuffer(ss, &so, buf);
  1703. ssize_t n;
  1704. if (s->protocol == PROTOCOL_TCP) {
  1705. n = write(s->fd, so.buffer, so.sz);
  1706. } else {
  1707. union sockaddr_all sa;
  1708. socklen_t sasz = udp_socket_address(s, s->p.udp_address, &sa);
  1709. if (sasz == 0) {
  1710. skynet_error(NULL, "socket-server : set udp (%d) address first.", id);
  1711. socket_unlock(&l);
  1712. so.free_func((void *)buf->buffer);
  1713. return -1;
  1714. }
  1715. n = sendto(s->fd, so.buffer, so.sz, 0, &sa.s, sasz);
  1716. }
  1717. if (n<0) {
  1718. // ignore error, let socket thread try again
  1719. n = 0;
  1720. }
  1721. stat_write(ss,s,n);
  1722. if (n == so.sz) {
  1723. // write done
  1724. socket_unlock(&l);
  1725. so.free_func((void *)buf->buffer);
  1726. return 0;
  1727. }
  1728. // write failed, put buffer into s->dw_* , and let socket thread send it. see send_buffer()
  1729. s->dw_buffer = clone_buffer(buf, &s->dw_size);
  1730. s->dw_offset = n;
  1731. socket_unlock(&l);
  1732. struct request_package request;
  1733. request.u.send.id = id;
  1734. request.u.send.sz = 0;
  1735. request.u.send.buffer = NULL;
  1736. // let socket thread enable write event
  1737. send_request(ss, &request, 'W', sizeof(request.u.send));
  1738. return 0;
  1739. }
  1740. socket_unlock(&l);
  1741. }
  1742. inc_sending_ref(s, id);
  1743. struct request_package request;
  1744. request.u.send.id = id;
  1745. request.u.send.buffer = clone_buffer(buf, &request.u.send.sz);
  1746. send_request(ss, &request, 'D', sizeof(request.u.send));
  1747. return 0;
  1748. }
  1749. // return -1 when error, 0 when success
  1750. int
  1751. socket_server_send_lowpriority(struct socket_server *ss, struct socket_sendbuffer *buf) {
  1752. int id = buf->id;
  1753. struct socket * s = &ss->slot[HASH_ID(id)];
  1754. if (socket_invalid(s, id)) {
  1755. free_buffer(ss, buf);
  1756. return -1;
  1757. }
  1758. inc_sending_ref(s, id);
  1759. struct request_package request;
  1760. request.u.send.id = id;
  1761. request.u.send.buffer = clone_buffer(buf, &request.u.send.sz);
  1762. send_request(ss, &request, 'P', sizeof(request.u.send));
  1763. return 0;
  1764. }
  1765. void
  1766. socket_server_exit(struct socket_server *ss) {
  1767. struct request_package request;
  1768. send_request(ss, &request, 'X', 0);
  1769. }
  1770. void
  1771. socket_server_close(struct socket_server *ss, uintptr_t opaque, int id) {
  1772. struct request_package request;
  1773. request.u.close.id = id;
  1774. request.u.close.shutdown = 0;
  1775. request.u.close.opaque = opaque;
  1776. send_request(ss, &request, 'K', sizeof(request.u.close));
  1777. }
  1778. void
  1779. socket_server_shutdown(struct socket_server *ss, uintptr_t opaque, int id) {
  1780. struct request_package request;
  1781. request.u.close.id = id;
  1782. request.u.close.shutdown = 1;
  1783. request.u.close.opaque = opaque;
  1784. send_request(ss, &request, 'K', sizeof(request.u.close));
  1785. }
  1786. // return -1 means failed
  1787. // or return AF_INET or AF_INET6
  1788. static int
  1789. do_bind(const char *host, int port, int protocol, int *family) {
  1790. int fd;
  1791. int status;
  1792. int reuse = 1;
  1793. struct addrinfo ai_hints;
  1794. struct addrinfo *ai_list = NULL;
  1795. char portstr[16];
  1796. if (host == NULL || host[0] == 0) {
  1797. host = "0.0.0.0"; // INADDR_ANY
  1798. }
  1799. sprintf(portstr, "%d", port);
  1800. memset( &ai_hints, 0, sizeof( ai_hints ) );
  1801. ai_hints.ai_family = AF_UNSPEC;
  1802. if (protocol == IPPROTO_TCP) {
  1803. ai_hints.ai_socktype = SOCK_STREAM;
  1804. } else {
  1805. assert(protocol == IPPROTO_UDP);
  1806. ai_hints.ai_socktype = SOCK_DGRAM;
  1807. }
  1808. ai_hints.ai_protocol = protocol;
  1809. status = getaddrinfo( host, portstr, &ai_hints, &ai_list );
  1810. if ( status != 0 ) {
  1811. return -1;
  1812. }
  1813. *family = ai_list->ai_family;
  1814. fd = socket(*family, ai_list->ai_socktype, 0);
  1815. if (fd < 0) {
  1816. goto _failed_fd;
  1817. }
  1818. if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(int))==-1) {
  1819. goto _failed;
  1820. }
  1821. status = bind(fd, (struct sockaddr *)ai_list->ai_addr, ai_list->ai_addrlen);
  1822. if (status != 0)
  1823. goto _failed;
  1824. freeaddrinfo( ai_list );
  1825. return fd;
  1826. _failed:
  1827. close(fd);
  1828. _failed_fd:
  1829. freeaddrinfo( ai_list );
  1830. return -1;
  1831. }
  1832. static int
  1833. do_listen(const char * host, int port, int backlog) {
  1834. int family = 0;
  1835. int listen_fd = do_bind(host, port, IPPROTO_TCP, &family);
  1836. if (listen_fd < 0) {
  1837. return -1;
  1838. }
  1839. if (listen(listen_fd, backlog) == -1) {
  1840. close(listen_fd);
  1841. return -1;
  1842. }
  1843. return listen_fd;
  1844. }
  1845. int
  1846. socket_server_listen(struct socket_server *ss, uintptr_t opaque, const char * addr, int port, int backlog) {
  1847. int fd = do_listen(addr, port, backlog);
  1848. if (fd < 0) {
  1849. return -1;
  1850. }
  1851. struct request_package request;
  1852. int id = reserve_id(ss);
  1853. if (id < 0) {
  1854. close(fd);
  1855. return id;
  1856. }
  1857. request.u.listen.opaque = opaque;
  1858. request.u.listen.id = id;
  1859. request.u.listen.fd = fd;
  1860. send_request(ss, &request, 'L', sizeof(request.u.listen));
  1861. return id;
  1862. }
  1863. int
  1864. socket_server_bind(struct socket_server *ss, uintptr_t opaque, int fd) {
  1865. struct request_package request;
  1866. int id = reserve_id(ss);
  1867. if (id < 0)
  1868. return -1;
  1869. request.u.bind.opaque = opaque;
  1870. request.u.bind.id = id;
  1871. request.u.bind.fd = fd;
  1872. send_request(ss, &request, 'B', sizeof(request.u.bind));
  1873. return id;
  1874. }
  1875. void
  1876. socket_server_start(struct socket_server *ss, uintptr_t opaque, int id) {
  1877. struct request_package request;
  1878. request.u.resumepause.id = id;
  1879. request.u.resumepause.opaque = opaque;
  1880. send_request(ss, &request, 'R', sizeof(request.u.resumepause));
  1881. }
  1882. void
  1883. socket_server_pause(struct socket_server *ss, uintptr_t opaque, int id) {
  1884. struct request_package request;
  1885. request.u.resumepause.id = id;
  1886. request.u.resumepause.opaque = opaque;
  1887. send_request(ss, &request, 'S', sizeof(request.u.resumepause));
  1888. }
  1889. void
  1890. socket_server_nodelay(struct socket_server *ss, int id) {
  1891. struct request_package request;
  1892. request.u.setopt.id = id;
  1893. request.u.setopt.what = TCP_NODELAY;
  1894. request.u.setopt.value = 1;
  1895. send_request(ss, &request, 'T', sizeof(request.u.setopt));
  1896. }
  1897. void
  1898. socket_server_userobject(struct socket_server *ss, struct socket_object_interface *soi) {
  1899. ss->soi = *soi;
  1900. }
  1901. // UDP
  1902. int
  1903. socket_server_udp(struct socket_server *ss, uintptr_t opaque, const char * addr, int port) {
  1904. int fd;
  1905. int family;
  1906. if (port != 0 || addr != NULL) {
  1907. // bind
  1908. fd = do_bind(addr, port, IPPROTO_UDP, &family);
  1909. if (fd < 0) {
  1910. return -1;
  1911. }
  1912. } else {
  1913. family = AF_INET;
  1914. fd = socket(family, SOCK_DGRAM, 0);
  1915. if (fd < 0) {
  1916. return -1;
  1917. }
  1918. }
  1919. sp_nonblocking(fd);
  1920. int id = reserve_id(ss);
  1921. if (id < 0) {
  1922. close(fd);
  1923. return -1;
  1924. }
  1925. struct request_package request;
  1926. request.u.udp.id = id;
  1927. request.u.udp.fd = fd;
  1928. request.u.udp.opaque = opaque;
  1929. request.u.udp.family = family;
  1930. send_request(ss, &request, 'U', sizeof(request.u.udp));
  1931. return id;
  1932. }
  1933. int
  1934. socket_server_udp_send(struct socket_server *ss, const struct socket_udp_address *addr, struct socket_sendbuffer *buf) {
  1935. int id = buf->id;
  1936. struct socket * s = &ss->slot[HASH_ID(id)];
  1937. if (socket_invalid(s, id)) {
  1938. free_buffer(ss, buf);
  1939. return -1;
  1940. }
  1941. const uint8_t *udp_address = (const uint8_t *)addr;
  1942. int addrsz;
  1943. switch (udp_address[0]) {
  1944. case PROTOCOL_UDP:
  1945. addrsz = 1+2+4; // 1 type, 2 port, 4 ipv4
  1946. break;
  1947. case PROTOCOL_UDPv6:
  1948. addrsz = 1+2+16; // 1 type, 2 port, 16 ipv6
  1949. break;
  1950. default:
  1951. free_buffer(ss, buf);
  1952. return -1;
  1953. }
  1954. struct socket_lock l;
  1955. socket_lock_init(s, &l);
  1956. if (can_direct_write(s,id) && socket_trylock(&l)) {
  1957. // may be we can send directly, double check
  1958. if (can_direct_write(s,id)) {
  1959. // send directly
  1960. struct send_object so;
  1961. send_object_init_from_sendbuffer(ss, &so, buf);
  1962. union sockaddr_all sa;
  1963. socklen_t sasz = udp_socket_address(s, udp_address, &sa);
  1964. if (sasz == 0) {
  1965. socket_unlock(&l);
  1966. so.free_func((void *)buf->buffer);
  1967. return -1;
  1968. }
  1969. int n = sendto(s->fd, so.buffer, so.sz, 0, &sa.s, sasz);
  1970. if (n >= 0) {
  1971. // sendto succ
  1972. stat_write(ss,s,n);
  1973. socket_unlock(&l);
  1974. so.free_func((void *)buf->buffer);
  1975. return 0;
  1976. }
  1977. }
  1978. socket_unlock(&l);
  1979. // let socket thread try again, udp doesn't care the order
  1980. }
  1981. struct request_package request;
  1982. request.u.send_udp.send.id = id;
  1983. request.u.send_udp.send.buffer = clone_buffer(buf, &request.u.send_udp.send.sz);
  1984. memcpy(request.u.send_udp.address, udp_address, addrsz);
  1985. send_request(ss, &request, 'A', sizeof(request.u.send_udp.send)+addrsz);
  1986. return 0;
  1987. }
  1988. int
  1989. socket_server_udp_connect(struct socket_server *ss, int id, const char * addr, int port) {
  1990. struct socket * s = &ss->slot[HASH_ID(id)];
  1991. if (socket_invalid(s, id)) {
  1992. return -1;
  1993. }
  1994. struct socket_lock l;
  1995. socket_lock_init(s, &l);
  1996. socket_lock(&l);
  1997. if (socket_invalid(s, id)) {
  1998. socket_unlock(&l);
  1999. return -1;
  2000. }
  2001. ATOM_FINC(&s->udpconnecting);
  2002. socket_unlock(&l);
  2003. int status;
  2004. struct addrinfo ai_hints;
  2005. struct addrinfo *ai_list = NULL;
  2006. char portstr[16];
  2007. sprintf(portstr, "%d", port);
  2008. memset( &ai_hints, 0, sizeof( ai_hints ) );
  2009. ai_hints.ai_family = AF_UNSPEC;
  2010. ai_hints.ai_socktype = SOCK_DGRAM;
  2011. ai_hints.ai_protocol = IPPROTO_UDP;
  2012. status = getaddrinfo(addr, portstr, &ai_hints, &ai_list );
  2013. if ( status != 0 ) {
  2014. return -1;
  2015. }
  2016. struct request_package request;
  2017. request.u.set_udp.id = id;
  2018. int protocol;
  2019. if (ai_list->ai_family == AF_INET) {
  2020. protocol = PROTOCOL_UDP;
  2021. } else if (ai_list->ai_family == AF_INET6) {
  2022. protocol = PROTOCOL_UDPv6;
  2023. } else {
  2024. freeaddrinfo( ai_list );
  2025. return -1;
  2026. }
  2027. int addrsz = gen_udp_address(protocol, (union sockaddr_all *)ai_list->ai_addr, request.u.set_udp.address);
  2028. freeaddrinfo( ai_list );
  2029. send_request(ss, &request, 'C', sizeof(request.u.set_udp) - sizeof(request.u.set_udp.address) +addrsz);
  2030. return 0;
  2031. }
  2032. const struct socket_udp_address *
  2033. socket_server_udp_address(struct socket_server *ss, struct socket_message *msg, int *addrsz) {
  2034. uint8_t * address = (uint8_t *)(msg->data + msg->ud);
  2035. int type = address[0];
  2036. switch(type) {
  2037. case PROTOCOL_UDP:
  2038. *addrsz = 1+2+4;
  2039. break;
  2040. case PROTOCOL_UDPv6:
  2041. *addrsz = 1+2+16;
  2042. break;
  2043. default:
  2044. return NULL;
  2045. }
  2046. return (const struct socket_udp_address *)address;
  2047. }
  2048. struct socket_info *
  2049. socket_info_create(struct socket_info *last) {
  2050. struct socket_info *si = skynet_malloc(sizeof(*si));
  2051. memset(si, 0 , sizeof(*si));
  2052. si->next = last;
  2053. return si;
  2054. }
  2055. void
  2056. socket_info_release(struct socket_info *si) {
  2057. while (si) {
  2058. struct socket_info *temp = si;
  2059. si = si->next;
  2060. skynet_free(temp);
  2061. }
  2062. }
  2063. static int
  2064. query_info(struct socket *s, struct socket_info *si) {
  2065. union sockaddr_all u;
  2066. socklen_t slen = sizeof(u);
  2067. int closing = 0;
  2068. switch (ATOM_LOAD(&s->type)) {
  2069. case SOCKET_TYPE_BIND:
  2070. si->type = SOCKET_INFO_BIND;
  2071. si->name[0] = '\0';
  2072. break;
  2073. case SOCKET_TYPE_LISTEN:
  2074. si->type = SOCKET_INFO_LISTEN;
  2075. if (getsockname(s->fd, &u.s, &slen) == 0) {
  2076. getname(&u, si->name, sizeof(si->name));
  2077. }
  2078. break;
  2079. case SOCKET_TYPE_HALFCLOSE_READ:
  2080. case SOCKET_TYPE_HALFCLOSE_WRITE:
  2081. closing = 1;
  2082. case SOCKET_TYPE_CONNECTED:
  2083. if (s->protocol == PROTOCOL_TCP) {
  2084. si->type = closing ? SOCKET_INFO_CLOSING : SOCKET_INFO_TCP;
  2085. if (getpeername(s->fd, &u.s, &slen) == 0) {
  2086. getname(&u, si->name, sizeof(si->name));
  2087. }
  2088. } else {
  2089. si->type = SOCKET_INFO_UDP;
  2090. if (udp_socket_address(s, s->p.udp_address, &u)) {
  2091. getname(&u, si->name, sizeof(si->name));
  2092. }
  2093. }
  2094. break;
  2095. default:
  2096. return 0;
  2097. }
  2098. si->id = s->id;
  2099. si->opaque = (uint64_t)s->opaque;
  2100. si->read = s->stat.read;
  2101. si->write = s->stat.write;
  2102. si->rtime = s->stat.rtime;
  2103. si->wtime = s->stat.wtime;
  2104. si->wbuffer = s->wb_size;
  2105. si->reading = s->reading;
  2106. si->writing = s->writing;
  2107. return 1;
  2108. }
  2109. struct socket_info *
  2110. socket_server_info(struct socket_server *ss) {
  2111. int i;
  2112. struct socket_info * si = NULL;
  2113. for (i=0;i<MAX_SOCKET;i++) {
  2114. struct socket * s = &ss->slot[i];
  2115. int id = s->id;
  2116. struct socket_info temp;
  2117. if (query_info(s, &temp) && s->id == id) {
  2118. // socket_server_info may call in different thread, so check socket id again
  2119. si = socket_info_create(si);
  2120. temp.next = si->next;
  2121. *si = temp;
  2122. }
  2123. }
  2124. return si;
  2125. }