databuffer.h 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. #ifndef skynet_databuffer_h
  2. #define skynet_databuffer_h
  3. #include <stdlib.h>
  4. #include <string.h>
  5. #include <assert.h>
  6. #define MESSAGEPOOL 1023
  7. struct message {
  8. char * buffer;
  9. int size;
  10. struct message * next;
  11. };
  12. struct databuffer {
  13. int header;
  14. int offset;
  15. int size;
  16. struct message * head;
  17. struct message * tail;
  18. };
  19. struct messagepool_list {
  20. struct messagepool_list *next;
  21. struct message pool[MESSAGEPOOL];
  22. };
  23. struct messagepool {
  24. struct messagepool_list * pool;
  25. struct message * freelist;
  26. };
  27. // use memset init struct
  28. static void
  29. messagepool_free(struct messagepool *pool) {
  30. struct messagepool_list *p = pool->pool;
  31. while(p) {
  32. struct messagepool_list *tmp = p;
  33. p=p->next;
  34. skynet_free(tmp);
  35. }
  36. pool->pool = NULL;
  37. pool->freelist = NULL;
  38. }
  39. static inline void
  40. _return_message(struct databuffer *db, struct messagepool *mp) {
  41. struct message *m = db->head;
  42. if (m->next == NULL) {
  43. assert(db->tail == m);
  44. db->head = db->tail = NULL;
  45. } else {
  46. db->head = m->next;
  47. }
  48. skynet_free(m->buffer);
  49. m->buffer = NULL;
  50. m->size = 0;
  51. m->next = mp->freelist;
  52. mp->freelist = m;
  53. }
  54. static void
  55. databuffer_read(struct databuffer *db, struct messagepool *mp, char * buffer, int sz) {
  56. assert(db->size >= sz);
  57. db->size -= sz;
  58. for (;;) {
  59. struct message *current = db->head;
  60. int bsz = current->size - db->offset;
  61. if (bsz > sz) {
  62. memcpy(buffer, current->buffer + db->offset, sz);
  63. db->offset += sz;
  64. return;
  65. }
  66. if (bsz == sz) {
  67. memcpy(buffer, current->buffer + db->offset, sz);
  68. db->offset = 0;
  69. _return_message(db, mp);
  70. return;
  71. } else {
  72. memcpy(buffer, current->buffer + db->offset, bsz);
  73. _return_message(db, mp);
  74. db->offset = 0;
  75. buffer+=bsz;
  76. sz-=bsz;
  77. }
  78. }
  79. }
  80. static void
  81. databuffer_push(struct databuffer *db, struct messagepool *mp, void *data, int sz) {
  82. struct message * m;
  83. if (mp->freelist) {
  84. m = mp->freelist;
  85. mp->freelist = m->next;
  86. } else {
  87. struct messagepool_list * mpl = skynet_malloc(sizeof(*mpl));
  88. struct message * temp = mpl->pool;
  89. int i;
  90. for (i=1;i<MESSAGEPOOL;i++) {
  91. temp[i].buffer = NULL;
  92. temp[i].size = 0;
  93. temp[i].next = &temp[i+1];
  94. }
  95. temp[MESSAGEPOOL-1].next = NULL;
  96. mpl->next = mp->pool;
  97. mp->pool = mpl;
  98. m = &temp[0];
  99. mp->freelist = &temp[1];
  100. }
  101. m->buffer = data;
  102. m->size = sz;
  103. m->next = NULL;
  104. db->size += sz;
  105. if (db->head == NULL) {
  106. assert(db->tail == NULL);
  107. db->head = db->tail = m;
  108. } else {
  109. db->tail->next = m;
  110. db->tail = m;
  111. }
  112. }
  113. static int
  114. databuffer_readheader(struct databuffer *db, struct messagepool *mp, int header_size) {
  115. if (db->header == 0) {
  116. // parser header (2 or 4)
  117. if (db->size < header_size) {
  118. return -1;
  119. }
  120. uint8_t plen[4];
  121. databuffer_read(db,mp,(char *)plen,header_size);
  122. // big-endian
  123. if (header_size == 2) {
  124. db->header = plen[0] << 8 | plen[1];
  125. } else {
  126. db->header = plen[0] << 24 | plen[1] << 16 | plen[2] << 8 | plen[3];
  127. }
  128. }
  129. if (db->size < db->header)
  130. return -1;
  131. return db->header;
  132. }
  133. static inline void
  134. databuffer_reset(struct databuffer *db) {
  135. db->header = 0;
  136. }
  137. static void
  138. databuffer_clear(struct databuffer *db, struct messagepool *mp) {
  139. while (db->head) {
  140. _return_message(db,mp);
  141. }
  142. memset(db, 0, sizeof(*db));
  143. }
  144. #endif