mpsc_queue.c 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. #include "test/jemalloc_test.h"
  2. #include "jemalloc/internal/mpsc_queue.h"
  3. typedef struct elem_s elem_t;
  4. typedef ql_head(elem_t) elem_list_t;
  5. typedef mpsc_queue(elem_t) elem_mpsc_queue_t;
  6. struct elem_s {
  7. int thread;
  8. int idx;
  9. ql_elm(elem_t) link;
  10. };
  11. /* Include both proto and gen to make sure they match up. */
  12. mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
  13. elem_list_t);
  14. mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
  15. elem_list_t, link);
  16. static void
  17. init_elems_simple(elem_t *elems, int nelems, int thread) {
  18. for (int i = 0; i < nelems; i++) {
  19. elems[i].thread = thread;
  20. elems[i].idx = i;
  21. ql_elm_new(&elems[i], link);
  22. }
  23. }
  24. static void
  25. check_elems_simple(elem_list_t *list, int nelems, int thread) {
  26. elem_t *elem;
  27. int next_idx = 0;
  28. ql_foreach(elem, list, link) {
  29. expect_d_lt(next_idx, nelems, "Too many list items");
  30. expect_d_eq(thread, elem->thread, "");
  31. expect_d_eq(next_idx, elem->idx, "List out of order");
  32. next_idx++;
  33. }
  34. }
  35. TEST_BEGIN(test_simple) {
  36. enum {NELEMS = 10};
  37. elem_t elems[NELEMS];
  38. elem_list_t list;
  39. elem_mpsc_queue_t queue;
  40. /* Pop empty queue onto empty list -> empty list */
  41. ql_new(&list);
  42. elem_mpsc_queue_new(&queue);
  43. elem_mpsc_queue_pop_batch(&queue, &list);
  44. expect_true(ql_empty(&list), "");
  45. /* Pop empty queue onto nonempty list -> list unchanged */
  46. ql_new(&list);
  47. elem_mpsc_queue_new(&queue);
  48. init_elems_simple(elems, NELEMS, 0);
  49. for (int i = 0; i < NELEMS; i++) {
  50. ql_tail_insert(&list, &elems[i], link);
  51. }
  52. elem_mpsc_queue_pop_batch(&queue, &list);
  53. check_elems_simple(&list, NELEMS, 0);
  54. /* Pop nonempty queue onto empty list -> list takes queue contents */
  55. ql_new(&list);
  56. elem_mpsc_queue_new(&queue);
  57. init_elems_simple(elems, NELEMS, 0);
  58. for (int i = 0; i < NELEMS; i++) {
  59. elem_mpsc_queue_push(&queue, &elems[i]);
  60. }
  61. elem_mpsc_queue_pop_batch(&queue, &list);
  62. check_elems_simple(&list, NELEMS, 0);
  63. /* Pop nonempty queue onto nonempty list -> list gains queue contents */
  64. ql_new(&list);
  65. elem_mpsc_queue_new(&queue);
  66. init_elems_simple(elems, NELEMS, 0);
  67. for (int i = 0; i < NELEMS / 2; i++) {
  68. ql_tail_insert(&list, &elems[i], link);
  69. }
  70. for (int i = NELEMS / 2; i < NELEMS; i++) {
  71. elem_mpsc_queue_push(&queue, &elems[i]);
  72. }
  73. elem_mpsc_queue_pop_batch(&queue, &list);
  74. check_elems_simple(&list, NELEMS, 0);
  75. }
  76. TEST_END
  77. TEST_BEGIN(test_push_single_or_batch) {
  78. enum {
  79. BATCH_MAX = 10,
  80. /*
  81. * We'll push i items one-at-a-time, then i items as a batch,
  82. * then i items as a batch again, as i ranges from 1 to
  83. * BATCH_MAX. So we need 3 times the sum of the numbers from 1
  84. * to BATCH_MAX elements total.
  85. */
  86. NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2
  87. };
  88. elem_t elems[NELEMS];
  89. init_elems_simple(elems, NELEMS, 0);
  90. elem_list_t list;
  91. ql_new(&list);
  92. elem_mpsc_queue_t queue;
  93. elem_mpsc_queue_new(&queue);
  94. int next_idx = 0;
  95. for (int i = 1; i < 10; i++) {
  96. /* Push i items 1 at a time. */
  97. for (int j = 0; j < i; j++) {
  98. elem_mpsc_queue_push(&queue, &elems[next_idx]);
  99. next_idx++;
  100. }
  101. /* Push i items in batch. */
  102. for (int j = 0; j < i; j++) {
  103. ql_tail_insert(&list, &elems[next_idx], link);
  104. next_idx++;
  105. }
  106. elem_mpsc_queue_push_batch(&queue, &list);
  107. expect_true(ql_empty(&list), "Batch push should empty source");
  108. /*
  109. * Push i items in batch, again. This tests two batches
  110. * proceeding one after the other.
  111. */
  112. for (int j = 0; j < i; j++) {
  113. ql_tail_insert(&list, &elems[next_idx], link);
  114. next_idx++;
  115. }
  116. elem_mpsc_queue_push_batch(&queue, &list);
  117. expect_true(ql_empty(&list), "Batch push should empty source");
  118. }
  119. expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push.");
  120. expect_true(ql_empty(&list), "");
  121. elem_mpsc_queue_pop_batch(&queue, &list);
  122. check_elems_simple(&list, NELEMS, 0);
  123. }
  124. TEST_END
  125. TEST_BEGIN(test_multi_op) {
  126. enum {NELEMS = 20};
  127. elem_t elems[NELEMS];
  128. init_elems_simple(elems, NELEMS, 0);
  129. elem_list_t push_list;
  130. ql_new(&push_list);
  131. elem_list_t result_list;
  132. ql_new(&result_list);
  133. elem_mpsc_queue_t queue;
  134. elem_mpsc_queue_new(&queue);
  135. int next_idx = 0;
  136. /* Push first quarter 1-at-a-time. */
  137. for (int i = 0; i < NELEMS / 4; i++) {
  138. elem_mpsc_queue_push(&queue, &elems[next_idx]);
  139. next_idx++;
  140. }
  141. /* Push second quarter in batch. */
  142. for (int i = NELEMS / 4; i < NELEMS / 2; i++) {
  143. ql_tail_insert(&push_list, &elems[next_idx], link);
  144. next_idx++;
  145. }
  146. elem_mpsc_queue_push_batch(&queue, &push_list);
  147. /* Batch pop all pushed elements. */
  148. elem_mpsc_queue_pop_batch(&queue, &result_list);
  149. /* Push third quarter in batch. */
  150. for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) {
  151. ql_tail_insert(&push_list, &elems[next_idx], link);
  152. next_idx++;
  153. }
  154. elem_mpsc_queue_push_batch(&queue, &push_list);
  155. /* Push last quarter one-at-a-time. */
  156. for (int i = 3 * NELEMS / 4; i < NELEMS; i++) {
  157. elem_mpsc_queue_push(&queue, &elems[next_idx]);
  158. next_idx++;
  159. }
  160. /* Pop them again. Order of existing list should be preserved. */
  161. elem_mpsc_queue_pop_batch(&queue, &result_list);
  162. check_elems_simple(&result_list, NELEMS, 0);
  163. }
  164. TEST_END
  165. typedef struct pusher_arg_s pusher_arg_t;
  166. struct pusher_arg_s {
  167. elem_mpsc_queue_t *queue;
  168. int thread;
  169. elem_t *elems;
  170. int nelems;
  171. };
  172. typedef struct popper_arg_s popper_arg_t;
  173. struct popper_arg_s {
  174. elem_mpsc_queue_t *queue;
  175. int npushers;
  176. int nelems_per_pusher;
  177. int *pusher_counts;
  178. };
  179. static void *
  180. thd_pusher(void *void_arg) {
  181. pusher_arg_t *arg = (pusher_arg_t *)void_arg;
  182. int next_idx = 0;
  183. while (next_idx < arg->nelems) {
  184. /* Push 10 items in batch. */
  185. elem_list_t list;
  186. ql_new(&list);
  187. int limit = next_idx + 10;
  188. while (next_idx < arg->nelems && next_idx < limit) {
  189. ql_tail_insert(&list, &arg->elems[next_idx], link);
  190. next_idx++;
  191. }
  192. elem_mpsc_queue_push_batch(arg->queue, &list);
  193. /* Push 10 items one-at-a-time. */
  194. limit = next_idx + 10;
  195. while (next_idx < arg->nelems && next_idx < limit) {
  196. elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]);
  197. next_idx++;
  198. }
  199. }
  200. return NULL;
  201. }
  202. static void *
  203. thd_popper(void *void_arg) {
  204. popper_arg_t *arg = (popper_arg_t *)void_arg;
  205. int done_pushers = 0;
  206. while (done_pushers < arg->npushers) {
  207. elem_list_t list;
  208. ql_new(&list);
  209. elem_mpsc_queue_pop_batch(arg->queue, &list);
  210. elem_t *elem;
  211. ql_foreach(elem, &list, link) {
  212. int thread = elem->thread;
  213. int idx = elem->idx;
  214. expect_d_eq(arg->pusher_counts[thread], idx,
  215. "Thread's pushes reordered");
  216. arg->pusher_counts[thread]++;
  217. if (arg->pusher_counts[thread]
  218. == arg->nelems_per_pusher) {
  219. done_pushers++;
  220. }
  221. }
  222. }
  223. return NULL;
  224. }
  225. TEST_BEGIN(test_multiple_threads) {
  226. enum {
  227. NPUSHERS = 4,
  228. NELEMS_PER_PUSHER = 1000*1000,
  229. };
  230. thd_t pushers[NPUSHERS];
  231. pusher_arg_t pusher_arg[NPUSHERS];
  232. thd_t popper;
  233. popper_arg_t popper_arg;
  234. elem_mpsc_queue_t queue;
  235. elem_mpsc_queue_new(&queue);
  236. elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t));
  237. elem_t *elem_iter = elems;
  238. for (int i = 0; i < NPUSHERS; i++) {
  239. pusher_arg[i].queue = &queue;
  240. pusher_arg[i].thread = i;
  241. pusher_arg[i].elems = elem_iter;
  242. pusher_arg[i].nelems = NELEMS_PER_PUSHER;
  243. init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i);
  244. elem_iter += NELEMS_PER_PUSHER;
  245. }
  246. popper_arg.queue = &queue;
  247. popper_arg.npushers = NPUSHERS;
  248. popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER;
  249. int pusher_counts[NPUSHERS] = {0};
  250. popper_arg.pusher_counts = pusher_counts;
  251. thd_create(&popper, thd_popper, (void *)&popper_arg);
  252. for (int i = 0; i < NPUSHERS; i++) {
  253. thd_create(&pushers[i], thd_pusher, &pusher_arg[i]);
  254. }
  255. thd_join(popper, NULL);
  256. for (int i = 0; i < NPUSHERS; i++) {
  257. thd_join(pushers[i], NULL);
  258. }
  259. for (int i = 0; i < NPUSHERS; i++) {
  260. expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], "");
  261. }
  262. free(elems);
  263. }
  264. TEST_END
  265. int
  266. main(void) {
  267. return test_no_reentrancy(
  268. test_simple,
  269. test_push_single_or_batch,
  270. test_multi_op,
  271. test_multiple_threads);
  272. }