|
- #include "test/jemalloc_test.h"
- #include "jemalloc/internal/mpsc_queue.h"
- typedef struct elem_s elem_t;
- typedef ql_head(elem_t) elem_list_t;
- typedef mpsc_queue(elem_t) elem_mpsc_queue_t;
- struct elem_s {
- int thread;
- int idx;
- ql_elm(elem_t) link;
- };
- /* Include both proto and gen to make sure they match up. */
- mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
- elem_list_t);
- mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
- elem_list_t, link);
- static void
- init_elems_simple(elem_t *elems, int nelems, int thread) {
- for (int i = 0; i < nelems; i++) {
- elems[i].thread = thread;
- elems[i].idx = i;
- ql_elm_new(&elems[i], link);
- }
- }
- static void
- check_elems_simple(elem_list_t *list, int nelems, int thread) {
- elem_t *elem;
- int next_idx = 0;
- ql_foreach(elem, list, link) {
- expect_d_lt(next_idx, nelems, "Too many list items");
- expect_d_eq(thread, elem->thread, "");
- expect_d_eq(next_idx, elem->idx, "List out of order");
- next_idx++;
- }
- }
- TEST_BEGIN(test_simple) {
- enum {NELEMS = 10};
- elem_t elems[NELEMS];
- elem_list_t list;
- elem_mpsc_queue_t queue;
- /* Pop empty queue onto empty list -> empty list */
- ql_new(&list);
- elem_mpsc_queue_new(&queue);
- elem_mpsc_queue_pop_batch(&queue, &list);
- expect_true(ql_empty(&list), "");
- /* Pop empty queue onto nonempty list -> list unchanged */
- ql_new(&list);
- elem_mpsc_queue_new(&queue);
- init_elems_simple(elems, NELEMS, 0);
- for (int i = 0; i < NELEMS; i++) {
- ql_tail_insert(&list, &elems[i], link);
- }
- elem_mpsc_queue_pop_batch(&queue, &list);
- check_elems_simple(&list, NELEMS, 0);
- /* Pop nonempty queue onto empty list -> list takes queue contents */
- ql_new(&list);
- elem_mpsc_queue_new(&queue);
- init_elems_simple(elems, NELEMS, 0);
- for (int i = 0; i < NELEMS; i++) {
- elem_mpsc_queue_push(&queue, &elems[i]);
- }
- elem_mpsc_queue_pop_batch(&queue, &list);
- check_elems_simple(&list, NELEMS, 0);
- /* Pop nonempty queue onto nonempty list -> list gains queue contents */
- ql_new(&list);
- elem_mpsc_queue_new(&queue);
- init_elems_simple(elems, NELEMS, 0);
- for (int i = 0; i < NELEMS / 2; i++) {
- ql_tail_insert(&list, &elems[i], link);
- }
- for (int i = NELEMS / 2; i < NELEMS; i++) {
- elem_mpsc_queue_push(&queue, &elems[i]);
- }
- elem_mpsc_queue_pop_batch(&queue, &list);
- check_elems_simple(&list, NELEMS, 0);
- }
- TEST_END
- TEST_BEGIN(test_push_single_or_batch) {
- enum {
- BATCH_MAX = 10,
- /*
- * We'll push i items one-at-a-time, then i items as a batch,
- * then i items as a batch again, as i ranges from 1 to
- * BATCH_MAX. So we need 3 times the sum of the numbers from 1
- * to BATCH_MAX elements total.
- */
- NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2
- };
- elem_t elems[NELEMS];
- init_elems_simple(elems, NELEMS, 0);
- elem_list_t list;
- ql_new(&list);
- elem_mpsc_queue_t queue;
- elem_mpsc_queue_new(&queue);
- int next_idx = 0;
- for (int i = 1; i < 10; i++) {
- /* Push i items 1 at a time. */
- for (int j = 0; j < i; j++) {
- elem_mpsc_queue_push(&queue, &elems[next_idx]);
- next_idx++;
- }
- /* Push i items in batch. */
- for (int j = 0; j < i; j++) {
- ql_tail_insert(&list, &elems[next_idx], link);
- next_idx++;
- }
- elem_mpsc_queue_push_batch(&queue, &list);
- expect_true(ql_empty(&list), "Batch push should empty source");
- /*
- * Push i items in batch, again. This tests two batches
- * proceeding one after the other.
- */
- for (int j = 0; j < i; j++) {
- ql_tail_insert(&list, &elems[next_idx], link);
- next_idx++;
- }
- elem_mpsc_queue_push_batch(&queue, &list);
- expect_true(ql_empty(&list), "Batch push should empty source");
- }
- expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push.");
- expect_true(ql_empty(&list), "");
- elem_mpsc_queue_pop_batch(&queue, &list);
- check_elems_simple(&list, NELEMS, 0);
- }
- TEST_END
- TEST_BEGIN(test_multi_op) {
- enum {NELEMS = 20};
- elem_t elems[NELEMS];
- init_elems_simple(elems, NELEMS, 0);
- elem_list_t push_list;
- ql_new(&push_list);
- elem_list_t result_list;
- ql_new(&result_list);
- elem_mpsc_queue_t queue;
- elem_mpsc_queue_new(&queue);
- int next_idx = 0;
- /* Push first quarter 1-at-a-time. */
- for (int i = 0; i < NELEMS / 4; i++) {
- elem_mpsc_queue_push(&queue, &elems[next_idx]);
- next_idx++;
- }
- /* Push second quarter in batch. */
- for (int i = NELEMS / 4; i < NELEMS / 2; i++) {
- ql_tail_insert(&push_list, &elems[next_idx], link);
- next_idx++;
- }
- elem_mpsc_queue_push_batch(&queue, &push_list);
- /* Batch pop all pushed elements. */
- elem_mpsc_queue_pop_batch(&queue, &result_list);
- /* Push third quarter in batch. */
- for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) {
- ql_tail_insert(&push_list, &elems[next_idx], link);
- next_idx++;
- }
- elem_mpsc_queue_push_batch(&queue, &push_list);
- /* Push last quarter one-at-a-time. */
- for (int i = 3 * NELEMS / 4; i < NELEMS; i++) {
- elem_mpsc_queue_push(&queue, &elems[next_idx]);
- next_idx++;
- }
- /* Pop them again. Order of existing list should be preserved. */
- elem_mpsc_queue_pop_batch(&queue, &result_list);
- check_elems_simple(&result_list, NELEMS, 0);
- }
- TEST_END
- typedef struct pusher_arg_s pusher_arg_t;
- struct pusher_arg_s {
- elem_mpsc_queue_t *queue;
- int thread;
- elem_t *elems;
- int nelems;
- };
- typedef struct popper_arg_s popper_arg_t;
- struct popper_arg_s {
- elem_mpsc_queue_t *queue;
- int npushers;
- int nelems_per_pusher;
- int *pusher_counts;
- };
- static void *
- thd_pusher(void *void_arg) {
- pusher_arg_t *arg = (pusher_arg_t *)void_arg;
- int next_idx = 0;
- while (next_idx < arg->nelems) {
- /* Push 10 items in batch. */
- elem_list_t list;
- ql_new(&list);
- int limit = next_idx + 10;
- while (next_idx < arg->nelems && next_idx < limit) {
- ql_tail_insert(&list, &arg->elems[next_idx], link);
- next_idx++;
- }
- elem_mpsc_queue_push_batch(arg->queue, &list);
- /* Push 10 items one-at-a-time. */
- limit = next_idx + 10;
- while (next_idx < arg->nelems && next_idx < limit) {
- elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]);
- next_idx++;
- }
- }
- return NULL;
- }
- static void *
- thd_popper(void *void_arg) {
- popper_arg_t *arg = (popper_arg_t *)void_arg;
- int done_pushers = 0;
- while (done_pushers < arg->npushers) {
- elem_list_t list;
- ql_new(&list);
- elem_mpsc_queue_pop_batch(arg->queue, &list);
- elem_t *elem;
- ql_foreach(elem, &list, link) {
- int thread = elem->thread;
- int idx = elem->idx;
- expect_d_eq(arg->pusher_counts[thread], idx,
- "Thread's pushes reordered");
- arg->pusher_counts[thread]++;
- if (arg->pusher_counts[thread]
- == arg->nelems_per_pusher) {
- done_pushers++;
- }
- }
- }
- return NULL;
- }
- TEST_BEGIN(test_multiple_threads) {
- enum {
- NPUSHERS = 4,
- NELEMS_PER_PUSHER = 1000*1000,
- };
- thd_t pushers[NPUSHERS];
- pusher_arg_t pusher_arg[NPUSHERS];
- thd_t popper;
- popper_arg_t popper_arg;
- elem_mpsc_queue_t queue;
- elem_mpsc_queue_new(&queue);
- elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t));
- elem_t *elem_iter = elems;
- for (int i = 0; i < NPUSHERS; i++) {
- pusher_arg[i].queue = &queue;
- pusher_arg[i].thread = i;
- pusher_arg[i].elems = elem_iter;
- pusher_arg[i].nelems = NELEMS_PER_PUSHER;
- init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i);
- elem_iter += NELEMS_PER_PUSHER;
- }
- popper_arg.queue = &queue;
- popper_arg.npushers = NPUSHERS;
- popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER;
- int pusher_counts[NPUSHERS] = {0};
- popper_arg.pusher_counts = pusher_counts;
- thd_create(&popper, thd_popper, (void *)&popper_arg);
- for (int i = 0; i < NPUSHERS; i++) {
- thd_create(&pushers[i], thd_pusher, &pusher_arg[i]);
- }
- thd_join(popper, NULL);
- for (int i = 0; i < NPUSHERS; i++) {
- thd_join(pushers[i], NULL);
- }
- for (int i = 0; i < NPUSHERS; i++) {
- expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], "");
- }
- free(elems);
- }
- TEST_END
- int
- main(void) {
- return test_no_reentrancy(
- test_simple,
- test_push_single_or_batch,
- test_multi_op,
- test_multiple_threads);
- }
|