diff --git a/lf-queue/Makefile b/lf-queue/Makefile old mode 100644 new mode 100755 index 2f78761..5b8a2ea --- a/lf-queue/Makefile +++ b/lf-queue/Makefile @@ -1,14 +1,20 @@ -CFLAGS = -Wall -O2 -g -I. +CFLAGS = -std=gnu11 -O3 -Wall -Wextra LDFLAGS = -lpthread -# Enable ThreadSanitizer -# CFLAGS += -fsanitize=thread -# LDFLAGS += -fsanitize=thread +targets = main1 main2 main3 main4 -all: test +all: $(targets) -test: test.c - $(CC) $(CFLAGS) -o $@ $< $(LDFLAGS) +tsan: CFLAGS += -fsanitize=thread -g +tsan: all clean: - rm -f test + rm -f $(targets) + +main%: lfq.c test.c + $(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ + +main1: CFLAGS += -D MAX_PRODUCER=1 -D MAX_CONSUMER=1 +main2: CFLAGS += -D MAX_PRODUCER=4 -D MAX_CONSUMER=4 +main3: CFLAGS += -D MAX_PRODUCER=100 -D MAX_CONSUMER=10 +main4: CFLAGS += -D MAX_PRODUCER=10 -D MAX_CONSUMER=100 diff --git a/lf-queue/README.md b/lf-queue/README.md index 935a3ff..f0fd3b8 100644 --- a/lf-queue/README.md +++ b/lf-queue/README.md @@ -1,8 +1,5 @@ -# Multithreaded Bounded Lock-free Queue +# lfq +The lock-free queue implementation is based on the paper [A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue](https://drops.dagstuhl.de/opus/volltexte/2019/11335/pdf/LIPIcs-DISC-2019-28.pdf) by Ruslan Nikolaev. +Nikolaev's paper provides an overview of various methods for implementing lock-free queues. This implementation specifically adopts the SCQ (Scalable Circular Queue) approach, which is designed for bounded queues. However, this approach can be easily extended to support unbounded FIFO queues capable of storing an arbitrary number of elements. -Features: -* single header style for C11 -* strongly typed -* multithreaded -* bounded -* lock-free +In addition, this implementation employs a hazard pointer-based memory reclamation system for concurrent queues. \ No newline at end of file diff --git a/lf-queue/atomics.h b/lf-queue/atomics.h new file mode 100644 index 0000000..1731eee --- /dev/null +++ b/lf-queue/atomics.h @@ -0,0 +1,21 @@ +#pragma once + +#include +#include +#include +#include + +#define ATOMIC_SUB atomic_fetch_sub +#define CAS atomic_compare_exchange_strong + +/* The 2nd argument is limited to 1 on machines with TAS but not XCHG. + * On x86 it is an arbitrary value. + */ +#define XCHG atomic_exchange +#define ATOMIC_ADD atomic_fetch_add +#define mb() atomic_thread_fence(memory_order_seq_cst) + +/* Memory barriers*/ +#define smp_mb() atomic_thread_fence(memory_order_seq_cst) +#define smp_rmb() atomic_thread_fence(memory_order_acquire) +#define smp_wmb() atomic_thread_fence(memory_order_release) \ No newline at end of file diff --git a/lf-queue/lfq.c b/lf-queue/lfq.c new file mode 100644 index 0000000..feb28ea --- /dev/null +++ b/lf-queue/lfq.c @@ -0,0 +1,211 @@ +#include +#include +#include + +#include "atomics.h" +#include "lfq.h" + +#define MAX_FREE 150 + +static bool in_hp(struct lfq_ctx *ctx, struct lfq_node *node) +{ + for (int i = 0; i < ctx->MAX_HP_SIZE; i++) { + if (atomic_load(&ctx->HP[i]) == node) + return true; + } + return false; +} + +/* add to tail of the free list */ +static void insert_pool(struct lfq_ctx *ctx, struct lfq_node *node) +{ + atomic_store(&node->free_next, NULL); + struct lfq_node *old_tail = XCHG(&ctx->fpt, node); /* seq_cst */ + atomic_store(&old_tail->free_next, node); +} + +static void free_pool(struct lfq_ctx *ctx, bool freeall) +{ + bool old = 0; + if (!CAS(&ctx->is_freeing, &old, 1)) + return; + + for (int i = 0; i < MAX_FREE || freeall; i++) { + struct lfq_node *p = ctx->fph; + if ((!atomic_load(&p->can_free)) || (!atomic_load(&p->free_next)) || + in_hp(ctx, (struct lfq_node *) p)) + break; + ctx->fph = p->free_next; + free(p); + } + atomic_store(&ctx->is_freeing, false); + smp_mb(); +} + +static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node) +{ + if (atomic_load(&node->can_free) && !in_hp(ctx, node)) { + /* free is not thread-safe */ + bool old = 0; + if (CAS(&ctx->is_freeing, &old, 1)) { + /* poison the pointer to detect use-after-free */ + node->next = (void *) -1; + free(node); /* we got the lock; actually free */ + atomic_store(&ctx->is_freeing, false); + smp_mb(); + } else /* we did not get the lock; only add to a freelist */ + insert_pool(ctx, node); + } else + insert_pool(ctx, node); + free_pool(ctx, false); +} + +static int alloc_tid(struct lfq_ctx *ctx) +{ + for (int i = 0; i < ctx->MAX_HP_SIZE; i++) { + if (ctx->tid_map[i] == 0) { + int old = 0; + if (CAS(&ctx->tid_map[i], &old, 1)) + return i; + } + } + + return -1; +} + +static void free_tid(struct lfq_ctx *ctx, int tid) +{ + ctx->tid_map[tid] = 0; +} + +int lfq_init(struct lfq_ctx *ctx, int max_consume_thread) +{ + struct lfq_node *tmp = calloc(1, sizeof(struct lfq_node)); + if (!tmp) + return -errno; + + struct lfq_node *node = calloc(1, sizeof(struct lfq_node)); + if (!node) + return -errno; + + tmp->can_free = node->can_free = true; + memset(ctx, 0, sizeof(struct lfq_ctx)); + ctx->MAX_HP_SIZE = max_consume_thread; + ctx->HP = calloc(max_consume_thread, sizeof(struct lfq_node)); + ctx->tid_map = calloc(max_consume_thread, sizeof(struct lfq_node)); + ctx->head = ctx->tail = tmp; + ctx->fph = ctx->fpt = node; + + return 0; +} + +long lfg_count_freelist(const struct lfq_ctx *ctx) +{ + long count = 0; + for (struct lfq_node *p = (struct lfq_node *) ctx->fph; p; p = p->free_next) + count++; + return count; +} + +int lfq_release(struct lfq_ctx *ctx) +{ + if (ctx->tail && ctx->head) { /* if we have data in queue */ + while ((struct lfq_node *) ctx->head) { /* while still have node */ + struct lfq_node *tmp = (struct lfq_node *) ctx->head->next; + safe_free(ctx, (struct lfq_node *) ctx->head); + ctx->head = tmp; + } + ctx->tail = 0; + } + if (ctx->fph && ctx->fpt) { + free_pool(ctx, true); + if (ctx->fph != ctx->fpt) + return -1; + free(ctx->fpt); /* free the empty node */ + ctx->fph = ctx->fpt = 0; + } + if (ctx->fph || ctx->fpt) + return -1; + + free(ctx->HP); + free(ctx->tid_map); + memset(ctx, 0, sizeof(struct lfq_ctx)); + + return 0; +} + +int lfq_enqueue(struct lfq_ctx *ctx, void *data) +{ + struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node)); + if (!insert_node) + return -errno; + + insert_node->data = data; + struct lfq_node *old_tail = XCHG(&ctx->tail, insert_node); + /* We have claimed our spot in the insertion order by modifying tail. + * we are the only inserting thread with a pointer to the old tail. + * + * Now we can make it part of the list by overwriting the NULL pointer in + * the old tail. This is safe whether or not other threads have updated + * ->next in our insert_node. + */ + assert(!old_tail->next && "old tail was not NULL"); + atomic_store(&old_tail->next, insert_node); + + return 0; +} + +void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid) +{ + struct lfq_node *old_head, *new_head; + + /* HP[tid] is necessary for deallocation. */ + do { + retry: + /* continue jumps to the bottom of the loop, and would attempt a CAS + * with uninitialized new_head. + */ + old_head = atomic_load(&ctx->head); + + atomic_store(&ctx->HP[tid], old_head); + mb(); + + /* another thread freed it before seeing our HP[tid] store */ + if (old_head != atomic_load(&ctx->head)) + goto retry; + new_head = atomic_load(&old_head->next); + + if (new_head == 0) { + atomic_store(&ctx->HP[tid], 0); + return NULL; /* never remove the last node */ + } + } while (!CAS(&ctx->head, &old_head, new_head)); + + /* We have atomically advanced head, and we are the thread that won the race + * to claim a node. We return the data from the *new* head. The list starts + * off with a dummy node, so the current head is always a node that is + * already been read. + */ + atomic_store(&ctx->HP[tid], 0); + void *ret = new_head->data; + atomic_store(&new_head->can_free, true); + + /* we need to avoid freeing until other readers are definitely not going to + * load its ->next in the CAS loop + */ + safe_free(ctx, (struct lfq_node *) old_head); + + return ret; +} + +void *lfq_dequeue(struct lfq_ctx *ctx) +{ + int tid = alloc_tid(ctx); + /* To many thread race */ + if (tid == -1) + return (void *) -1; + + void *ret = lfq_dequeue_tid(ctx, tid); + free_tid(ctx, tid); + return ret; +} \ No newline at end of file diff --git a/lf-queue/lfq.h b/lf-queue/lfq.h new file mode 100644 index 0000000..7349651 --- /dev/null +++ b/lf-queue/lfq.h @@ -0,0 +1,69 @@ +#pragma once + +#include +#include + +struct lfq_node { + void *data; + union { + struct lfq_node *next; + struct lfq_node *free_next; + }; + bool can_free; +}; + +struct lfq_ctx { + alignas(64) struct lfq_node *head; + int count; + struct lfq_node **HP; /* hazard pointers */ + int *tid_map; + bool is_freeing; + struct lfq_node *fph, *fpt; /* free pool head/tail */ + + /* FIXME: get rid of struct. Make it configurable */ + int MAX_HP_SIZE; + + /* avoid cacheline contention */ + alignas(64) struct lfq_node *tail; +}; + +/** + * lfq_init - Initialize lock-free queue. + * @ctx: Lock-free queue handler. + * @max_consume_thread: Max consume thread numbers. If this value set to zero, + * use default value (16). + * Return zero on success. On error, negative errno. + */ +int lfq_init(struct lfq_ctx *ctx, int max_consume_thread); + +/** + * lfq_release - Release lock-free queue from ctx. + * @ctx: Lock-free queue handler. + * Return zero on success. On error, -1. + */ +int lfq_release(struct lfq_ctx *ctx); + +/* internal function */ +long lfg_count_freelist(const struct lfq_ctx *ctx); + +/** + * lfq_enqueue - Push data into queue. + * @ctx: Lock-free queue handler. + * @data: User data + * Return zero on success. On error, negative errno. + */ +int lfq_enqueue(struct lfq_ctx *ctx, void *data); + +/** + * lfq_dequeue_tid - Pop data from queue. + * @ctx: Lock-free queue handler. + * @tid: Unique thread id. + * Return zero if empty queue. On error, negative errno. + */ +void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid); + +/** + * lfq_dequeue - Pop data from queue. + * @ctx: Lock-free queue handler. + */ +void *lfq_dequeue(struct lfq_ctx *ctx); \ No newline at end of file diff --git a/lf-queue/queues.h b/lf-queue/queues.h deleted file mode 100644 index 33d0781..0000000 --- a/lf-queue/queues.h +++ /dev/null @@ -1,290 +0,0 @@ -#include -#include -#include - -#if !defined(QUEUE_TYPE) || !defined(QUEUE_MP) || !defined(QUEUE_MC) -#error Please define QUEUE_TYPE, QUEUE_MP and QUEUE_MC -#endif - -#if !defined(QUEUE_COMMON_DEFINED) - -#define QUEUE_COMMON_DEFINED - -#if (__STDC_VERSION__ < 201112L) -#error C11 is required for this file -#endif - -#include - -#if defined(__STDC_NO_ATOMICS__) -#error Your C compiler does not support C11 atomics -#endif - -#define QUEUE_ATOMIC_SIZE_T atomic_size_t -#define QUEUE_ORDER_RELAXED memory_order_relaxed -#define QUEUE_ORDER_RELEASE memory_order_release -#define QUEUE_ORDER_ACQUIRE memory_order_acquire -#define QUEUE_ATOMIC_STORE atomic_store_explicit -#define QUEUE_ATOMIC_LOAD atomic_load_explicit - -#define QUEUE_MERGE_BASE(a, b) a##b -#define QUEUE_MERGE(a, b) QUEUE_MERGE_BASE(a, b) - -#if !defined(QUEUE_CACHELINE_BYTES) -#define QUEUE_CACHELINE_BYTES 64 -#endif - -#if !defined(QUEUE_TOO_BIG) -#define QUEUE_TOO_BIG (1024ULL * 1024ULL * 256ULL) -#endif - -typedef enum { - QueueResult_Ok, - QueueResult_Full, - QueueResult_Empty, - QueueResult_Contention, - - QueueResult_Error = 128, - QueueResult_Error_Too_Small, - QueueResult_Error_Too_Big, - QueueResult_Error_Not_Pow2, - QueueResult_Error_Not_Aligned_16_Bytes, - QueueResult_Error_Null_Bytes, - QueueResult_Error_Bytes_Smaller_Than_Needed -} QueueResult_t; -#endif - -#if (QUEUE_MP) -#define QUEUE_P_NAME_FN mp -#define QUEUE_P_NAME_TYPE Mp -#define QUEUE_P_TYPE QUEUE_ATOMIC_SIZE_T -#define QUEUE_P_SETUP(a, b, c) QUEUE_ATOMIC_STORE(&a, b, c) -#define QUEUE_P_LOAD(a, b) QUEUE_ATOMIC_LOAD(&a, b) - -#define QUEUE_P_IF_CAS(a, b, c, d, e) \ - if (atomic_compare_exchange_weak_explicit(&a, &b, c, d, e)) -#else -#define QUEUE_P_NAME_FN sp -#define QUEUE_P_NAME_TYPE Sp -#define QUEUE_P_TYPE size_t -#define QUEUE_P_SETUP(a, b, c) -#define QUEUE_P_LOAD(a, b) a -#define QUEUE_P_IF_CAS(a, b, c, d, e) a = c; -#endif - -#if (QUEUE_MC) -#define QUEUE_C_NAME mc -#define QUEUE_C_TYPE QUEUE_ATOMIC_SIZE_T -#define QUEUE_C_SETUP(a, b, c) QUEUE_ATOMIC_STORE(&a, b, c) -#define QUEUE_C_LOAD(a, b) QUEUE_ATOMIC_LOAD(&a, b) - -#define QUEUE_C_IF_CAS(a, b, c, d, e) \ - if (atomic_compare_exchange_weak_explicit(&a, &b, c, d, e)) -#else -#define QUEUE_C_NAME sc -#define QUEUE_C_TYPE size_t -#define QUEUE_C_SETUP(a, b, c) -#define QUEUE_C_LOAD(a, b) a -#define QUEUE_C_IF_CAS(a, b, c, d, e) a = c; -#endif - -#define QUEUE_FN_A QUEUE_MERGE(QUEUE_P_NAME_FN, QUEUE_C_NAME) -#define QUEUE_FN_B QUEUE_MERGE(QUEUE_FN_A, _) -#define QUEUE_FN(name) QUEUE_MERGE(QUEUE_MERGE(QUEUE_FN_B, name##_), QUEUE_TYPE) - -#define QUEUE_STRUCT_A QUEUE_MERGE(QUEUE_P_NAME_TYPE, QUEUE_C_NAME) -#define QUEUE_STRUCT_B QUEUE_MERGE(QUEUE_STRUCT_A, _) -#define QUEUE_STRUCT_C QUEUE_MERGE(QUEUE_STRUCT_B, QUEUE_TYPE) -#define QUEUE_STRUCT QUEUE_MERGE(Queue_, QUEUE_STRUCT_C) -#define QUEUE_CELL QUEUE_MERGE(Cell_, QUEUE_STRUCT_C) - -typedef struct QUEUE_STRUCT QUEUE_STRUCT; - -QueueResult_t QUEUE_FN(make_queue)(size_t cell_count, - QUEUE_STRUCT *queue, - size_t *bytes); - -QueueResult_t QUEUE_FN(try_enqueue)(QUEUE_STRUCT *queue, - QUEUE_TYPE const *data); -QueueResult_t QUEUE_FN(try_dequeue)(QUEUE_STRUCT *queue, QUEUE_TYPE *data); -QueueResult_t QUEUE_FN(enqueue)(QUEUE_STRUCT *queue, QUEUE_TYPE const *data); -QueueResult_t QUEUE_FN(dequeue)(QUEUE_STRUCT *queue, QUEUE_TYPE *data); - -#if defined(QUEUE_IMPLEMENTATION) -#undef QUEUE_IMPLEMENTATION - -typedef struct QUEUE_CELL { - QUEUE_ATOMIC_SIZE_T sequence; - QUEUE_TYPE data; -} QUEUE_CELL; - -typedef struct QUEUE_STRUCT { - uint8_t pad0[QUEUE_CACHELINE_BYTES]; - - QUEUE_P_TYPE enqueue_index; - uint8_t pad2[QUEUE_CACHELINE_BYTES - sizeof(QUEUE_P_TYPE)]; - - QUEUE_C_TYPE dequeue_index; - uint8_t pad3[QUEUE_CACHELINE_BYTES - sizeof(QUEUE_C_TYPE)]; - - size_t cell_mask; - uint8_t pad4[QUEUE_CACHELINE_BYTES - sizeof(size_t)]; - - QUEUE_CELL cells[]; -} QUEUE_STRUCT; - -QueueResult_t QUEUE_FN(make_queue)(size_t cell_count, - QUEUE_STRUCT *queue, - size_t *bytes) -{ - if (!bytes) - return QueueResult_Error_Null_Bytes; - - if (cell_count < 2) - return QueueResult_Error_Too_Small; - - if (cell_count > QUEUE_TOO_BIG) - return QueueResult_Error_Too_Big; - - if (cell_count & (cell_count - 1)) - return QueueResult_Error_Not_Pow2; - - size_t bytes_local = - sizeof(QUEUE_STRUCT) + (sizeof(QUEUE_CELL) * cell_count); - - if (!queue) { - *bytes = bytes_local; - return QueueResult_Ok; - } - - if (*bytes < bytes_local) - return QueueResult_Error_Bytes_Smaller_Than_Needed; - - { - intptr_t queue_value = (intptr_t) queue; - - if (queue_value & 0x0F) { - return QueueResult_Error_Not_Aligned_16_Bytes; - } - } - - memset(queue, 0, bytes_local); - - queue->cell_mask = cell_count - 1; - - for (size_t i = 0; i < cell_count; i++) { - QUEUE_ATOMIC_STORE(&queue->cells[i].sequence, i, QUEUE_ORDER_RELAXED); - } - - QUEUE_P_SETUP(queue->enqueue_index, 0, QUEUE_ORDER_RELAXED); - QUEUE_C_SETUP(queue->dequeue_index, 0, QUEUE_ORDER_RELAXED); - - return QueueResult_Ok; -} - -QueueResult_t QUEUE_FN(try_enqueue)(QUEUE_STRUCT *queue, QUEUE_TYPE const *data) -{ - size_t pos = QUEUE_P_LOAD(queue->enqueue_index, QUEUE_ORDER_RELAXED); - - QUEUE_CELL *cell = &queue->cells[pos & queue->cell_mask]; - - size_t sequence = QUEUE_ATOMIC_LOAD(&cell->sequence, QUEUE_ORDER_ACQUIRE); - - intptr_t difference = (intptr_t) sequence - (intptr_t) pos; - - if (!difference) { - QUEUE_P_IF_CAS(queue->enqueue_index, pos, pos + 1, QUEUE_ORDER_RELAXED, - QUEUE_ORDER_RELAXED) - { - cell->data = *data; - - QUEUE_ATOMIC_STORE(&cell->sequence, pos + 1, QUEUE_ORDER_RELEASE); - - return QueueResult_Ok; - } - } - - if (difference < 0) - return QueueResult_Full; - - return QueueResult_Contention; -} - -QueueResult_t QUEUE_FN(try_dequeue)(QUEUE_STRUCT *queue, QUEUE_TYPE *data) -{ - size_t pos = QUEUE_C_LOAD(queue->dequeue_index, QUEUE_ORDER_RELAXED); - - QUEUE_CELL *cell = &queue->cells[pos & queue->cell_mask]; - - size_t sequence = QUEUE_ATOMIC_LOAD(&cell->sequence, QUEUE_ORDER_ACQUIRE); - - intptr_t difference = (intptr_t) sequence - (intptr_t)(pos + 1); - - if (!difference) { - QUEUE_C_IF_CAS(queue->dequeue_index, pos, pos + 1, QUEUE_ORDER_RELAXED, - QUEUE_ORDER_RELAXED) - { - *data = cell->data; - - QUEUE_ATOMIC_STORE(&cell->sequence, pos + queue->cell_mask + 1, - QUEUE_ORDER_RELEASE); - - return QueueResult_Ok; - } - } - - if (difference < 0) { - return QueueResult_Empty; - } - - return QueueResult_Contention; -} - -QueueResult_t QUEUE_FN(enqueue)(QUEUE_STRUCT *queue, QUEUE_TYPE const *data) -{ - QueueResult_t result; - - do { - result = QUEUE_FN(try_enqueue)(queue, data); - } while (result == QueueResult_Contention); - - return result; -} - -QueueResult_t QUEUE_FN(dequeue)(QUEUE_STRUCT *queue, QUEUE_TYPE *data) -{ - QueueResult_t result; - - do { - result = QUEUE_FN(try_dequeue)(queue, data); - } while (result == QueueResult_Contention); - - return result; -} -#endif - -#undef QUEUE_TYPE -#undef QUEUE_MP -#undef QUEUE_MC - -#undef QUEUE_P_NAME_FN -#undef QUEUE_P_NAME_TYPE -#undef QUEUE_P_NAME -#undef QUEUE_P_TYPE -#undef QUEUE_P_SETUP -#undef QUEUE_P_LOAD -#undef QUEUE_P_IF_CAS - -#undef QUEUE_C_NAME -#undef QUEUE_C_TYPE -#undef QUEUE_C_SETUP -#undef QUEUE_C_LOAD -#undef QUEUE_C_IF_CAS - -#undef QUEUE_FN_A -#undef QUEUE_FN -#undef QUEUE_STRUCT_A -#undef QUEUE_STRUCT_B -#undef QUEUE_STRUCT_C -#undef QUEUE_STRUCT -#undef QUEUE_CELL diff --git a/lf-queue/test.c b/lf-queue/test.c index b9cee79..8578806 100644 --- a/lf-queue/test.c +++ b/lf-queue/test.c @@ -1,433 +1,121 @@ +#define _GNU_SOURCE +#include +#include #include #include #include -#include /* C11 */ +#include +#include +#include -#define QUEUE_TEST_THREADS_MAX 16 +#include "atomics.h" +#include "lfq.h" -typedef struct Data { - float a; - uint32_t b; - uint8_t bytes[16]; -} Data; +#ifndef MAX_PRODUCER +#define MAX_PRODUCER 100 +#endif +#ifndef MAX_CONSUMER +#define MAX_CONSUMER 10 +#endif -#define QUEUE_MP 0 -#define QUEUE_MC 0 -#define QUEUE_TYPE Data -#define QUEUE_IMPLEMENTATION -#include "queues.h" +#define SOME_ID 667814649 -#define QUEUE_MP 1 -#define QUEUE_MC 0 -#define QUEUE_TYPE Data -#define QUEUE_IMPLEMENTATION -#include "queues.h" +static uint64_t cnt_added = 0; +static uint64_t cnt_removed = 0; -#define QUEUE_MP 0 -#define QUEUE_MC 1 -#define QUEUE_TYPE Data -#define QUEUE_IMPLEMENTATION -#include "queues.h" +static int cnt_thread = 0; +static int cnt_producer = 0; -#define QUEUE_MP 1 -#define QUEUE_MC 1 -#define QUEUE_TYPE Data -#define QUEUE_IMPLEMENTATION -#include "queues.h" - -#define CAST(x, y) ((x) y) - -typedef enum Tag { - Spsc, - Mpsc, - Spmc, - Mpmc, - Max = Mpmc, -} Tag; - -static const char *tag_to_name[] = {"Spsc", "Mpsc", "Spmc", "Mpmc"}; - -QueueResult_t make(Tag tag, size_t cell_count, void *queue, size_t *bytes) -{ - switch (tag) { - case Spsc: - return spsc_make_queue_Data(cell_count, CAST(Queue_Spsc_Data *, queue), - bytes); - case Mpsc: - return mpsc_make_queue_Data(cell_count, CAST(Queue_Mpsc_Data *, queue), - bytes); - case Spmc: - return spmc_make_queue_Data(cell_count, CAST(Queue_Spmc_Data *, queue), - bytes); - case Mpmc: - return mpmc_make_queue_Data(cell_count, CAST(Queue_Mpmc_Data *, queue), - bytes); - } - - return QueueResult_Error; -} - -QueueResult_t try_enqueue(Tag tag, void *q, Data const *d) -{ - switch (tag) { - case Spsc: - return spsc_try_enqueue_Data(CAST(Queue_Spsc_Data *, q), d); - case Mpsc: - return mpsc_try_enqueue_Data(CAST(Queue_Mpsc_Data *, q), d); - case Spmc: - return spmc_try_enqueue_Data(CAST(Queue_Spmc_Data *, q), d); - case Mpmc: - return mpmc_try_enqueue_Data(CAST(Queue_Mpmc_Data *, q), d); - } - - return QueueResult_Error; -} -QueueResult_t try_dequeue(Tag tag, void *q, Data *d) -{ - switch (tag) { - case Spsc: - return spsc_try_dequeue_Data(CAST(Queue_Spsc_Data *, q), d); - case Mpsc: - return mpsc_try_dequeue_Data(CAST(Queue_Mpsc_Data *, q), d); - case Spmc: - return spmc_try_dequeue_Data(CAST(Queue_Spmc_Data *, q), d); - case Mpmc: - return mpmc_try_dequeue_Data(CAST(Queue_Mpmc_Data *, q), d); - } - - return QueueResult_Error; -} -QueueResult_t enqueue(Tag tag, void *q, Data const *d) -{ - switch (tag) { - case Spsc: - return spsc_enqueue_Data(CAST(Queue_Spsc_Data *, q), d); - case Mpsc: - return mpsc_enqueue_Data(CAST(Queue_Mpsc_Data *, q), d); - case Spmc: - return spmc_enqueue_Data(CAST(Queue_Spmc_Data *, q), d); - case Mpmc: - return mpmc_enqueue_Data(CAST(Queue_Mpmc_Data *, q), d); - } - - return QueueResult_Error; -} -QueueResult_t dequeue(Tag tag, void *q, Data *d) -{ - switch (tag) { - case Spsc: - return spsc_dequeue_Data(CAST(Queue_Spsc_Data *, q), d); - case Mpsc: - return mpsc_dequeue_Data(CAST(Queue_Mpsc_Data *, q), d); - case Spmc: - return spmc_dequeue_Data(CAST(Queue_Spmc_Data *, q), d); - case Mpmc: - return mpmc_dequeue_Data(CAST(Queue_Mpmc_Data *, q), d); - } - - return QueueResult_Error; -} - -#define EXPECT(x) \ - do { \ - if (!(x)) { \ - free(q); \ - return #x; \ - } \ - } while (0) - -const char *null_pointers(Tag tag, unsigned count_in, unsigned count_out) -{ - (void) count_in; - (void) count_out; - - QueueResult_t result = make(tag, 0, NULL, NULL); - void *q = NULL; - - EXPECT(result == QueueResult_Error_Null_Bytes); - - return NULL; -} - -const char *create(Tag tag, unsigned count_in, unsigned count_out) -{ - (void) count_in; - (void) count_out; - - size_t bytes = -1; - void *q = NULL; - - EXPECT(make(tag, 0, NULL, &bytes) == QueueResult_Error_Too_Small); - EXPECT(make(tag, 1, NULL, &bytes) == QueueResult_Error_Too_Small); - // min size - - EXPECT(make(tag, 13, NULL, &bytes) == QueueResult_Error_Not_Pow2); - EXPECT(make(tag, 255, NULL, &bytes) == QueueResult_Error_Not_Pow2); - // must be pow2 - - EXPECT(make(tag, -1, NULL, &bytes) == QueueResult_Error_Too_Big); - - EXPECT(make(tag, -3000000, NULL, &bytes) == QueueResult_Error_Too_Big); - - EXPECT(make(tag, 1ULL << 63, NULL, &bytes) == QueueResult_Error_Too_Big); - - EXPECT(make(tag, 1ULL << 33, NULL, &bytes) == QueueResult_Error_Too_Big); - // Insane sizes - - { - make(tag, 1 << 8, NULL, &bytes); - - EXPECT(bytes > 0); - EXPECT(bytes < 100000); - - void *queue = malloc(bytes); - - QueueResult_t create = make(tag, 1 << 8, q, &bytes); - - free(queue); - - EXPECT(create == QueueResult_Ok); - } - - return NULL; -} - -const char *empty(Tag tag, unsigned count_in, unsigned count_out) -{ - (void) count_in; - (void) count_out; - - size_t bytes = 0; - void *q = NULL; - - make(tag, 1 << 8, NULL, &bytes); - - EXPECT(bytes > 0); - - q = malloc(bytes); - - make(tag, 1 << 8, q, &bytes); - - { - Data data = {0}; - - QueueResult_t result_try_dequeue = try_dequeue(tag, q, &data); - - EXPECT(result_try_dequeue == QueueResult_Empty); - } - - { - Data data = {0}; - - QueueResult_t result_dequeue = dequeue(tag, q, &data); - - EXPECT(result_dequeue == QueueResult_Empty); - } - - free(q); - - return NULL; -} - -const char *full(Tag tag, unsigned count_in, unsigned count_out) -{ - (void) count_in; - (void) count_out; - - size_t bytes = 0; - void *q = NULL; - - make(tag, 1 << 8, NULL, &bytes); - - EXPECT(bytes > 0); - - q = malloc(bytes); - - make(tag, 1 << 8, q, &bytes); - - for (unsigned i = 0; i < (1 << 8); i++) { - Data data = {0}; - - EXPECT(enqueue(tag, q, &data) == QueueResult_Ok); - } - - { - Data data = {0}; - - QueueResult_t result_try_dequeue = try_enqueue(tag, q, &data); - - EXPECT(result_try_dequeue == QueueResult_Full); - } - - { - Data data = {0}; - - QueueResult_t result_dequeue = enqueue(tag, q, &data); - - EXPECT(result_dequeue == QueueResult_Full); - } - - free(q); - - return NULL; -} - -typedef struct Thread_Data { - void *q; - int multiplier; - Tag tag; - atomic_size_t *global_count; - atomic_size_t *done; -} Thread_Data; +struct user_data { + long data; +}; -int thread_in(void *data) +void *add_queue(void *data) { - Data item = {11.0f, 22, {0}}; - - Thread_Data *info = CAST(Thread_Data *, data); - - unsigned max = 10000 * info->multiplier; - - for (unsigned j = 0; j < max; j++) { - while (enqueue(info->tag, info->q, &item) != QueueResult_Ok) - ; + struct lfq_ctx *ctx = data; + int ret = 0; + long added; + for (added = 0; added < 500000; added++) { + struct user_data *p = malloc(sizeof(struct user_data)); + p->data = SOME_ID; + if ((ret = lfq_enqueue(ctx, p)) != 0) { + printf("lfq_enqueue failed, reason:%s\n", strerror(-ret)); + ATOMIC_ADD(&cnt_added, added); + ATOMIC_SUB(&cnt_producer, 1); + return 0; + } } - - atomic_fetch_sub_explicit(info->done, 1, memory_order_relaxed); - + ATOMIC_ADD(&cnt_added, added); + ATOMIC_SUB(&cnt_producer, 1); + printf("Producer thread [%lu] exited! Still %d running...\n", + pthread_self(), atomic_load(&cnt_producer)); return 0; } -int thread_out(void *data) +void *remove_queue(void *data) { - Thread_Data *info = CAST(Thread_Data *, data); - - unsigned max = 10000 * info->multiplier; - - for (unsigned j = 0; j < max; j++) { - Data item = {0}; - while (dequeue(info->tag, info->q, &item) != QueueResult_Ok) - ; - - atomic_fetch_add_explicit(info->global_count, item.b, - memory_order_relaxed); + struct lfq_ctx *ctx = data; + struct user_data *p; + int tid = ATOMIC_ADD(&cnt_thread, 1); + long deleted = 0; + while (1) { + p = lfq_dequeue_tid(ctx, tid); + if (p) { + if (p->data != SOME_ID) { + printf("data wrong!!\n"); + exit(1); + } + + free(p); + deleted++; + } else { + if (ctx->count || atomic_load(&cnt_producer)) + sched_yield(); /* queue is empty, release CPU slice */ + else + break; /* queue is empty and no more producers */ + } } + ATOMIC_ADD(&cnt_removed, deleted); - atomic_fetch_sub_explicit(info->done, 1, memory_order_relaxed); - + printf("Consumer thread [%lu] exited %d\n", pthread_self(), cnt_producer); return 0; } -const char *sums10000(Tag tag, unsigned count_in, unsigned count_out) +int main() { - void *q = NULL; - { - size_t bytes = 0; - - make(tag, 1 << 8, NULL, &bytes); - - EXPECT(bytes > 0); - - q = malloc(bytes); - - QueueResult_t create = make(tag, 1 << 8, q, &bytes); - - EXPECT(create == QueueResult_Ok); - } - - thrd_t in_threads[QUEUE_TEST_THREADS_MAX]; - thrd_t out_threads[QUEUE_TEST_THREADS_MAX]; - - atomic_size_t done_in_count = ATOMIC_VAR_INIT(count_in); - atomic_size_t done_out_count = ATOMIC_VAR_INIT(count_out); - atomic_size_t global_count = ATOMIC_VAR_INIT(0); - - int multiplier_in = QUEUE_TEST_THREADS_MAX / count_in; - int multiplier_out = QUEUE_TEST_THREADS_MAX / count_out; - - Thread_Data data_in = {q, multiplier_in, tag, &global_count, - &done_in_count}; - Thread_Data data_out = {q, multiplier_out, tag, &global_count, - &done_out_count}; - - for (unsigned i = 0; i < count_in; i++) { - int result = thrd_create(&in_threads[i], thread_in, &data_in); - EXPECT(result == thrd_success); - } - - for (unsigned i = 0; i < count_out; i++) { - int result = thrd_create(&out_threads[i], thread_out, &data_out); - EXPECT(result == thrd_success); - } - - while (atomic_load_explicit(&done_in_count, memory_order_relaxed)) - ; - while (atomic_load_explicit(&done_out_count, memory_order_relaxed)) - ; + struct lfq_ctx ctx; + lfq_init(&ctx, MAX_CONSUMER); - for (unsigned i = 0; i < count_in; i++) { - int ignored = 0; - int result = thrd_join(in_threads[i], &ignored); - - EXPECT(result == thrd_success); - } - for (unsigned i = 0; i < count_out; i++) { - int ignored = 0; - int result = thrd_join(out_threads[i], &ignored); + pthread_t thread_cons[MAX_CONSUMER], thread_pros[MAX_PRODUCER]; - EXPECT(result == thrd_success); + ATOMIC_ADD(&cnt_producer, 1); + for (int i = 0; i < MAX_CONSUMER; i++) { + pthread_create(&thread_cons[i], NULL, remove_queue, (void *) &ctx); } - size_t expected_count = 10000ULL * 22 * QUEUE_TEST_THREADS_MAX; - - EXPECT(atomic_load(&global_count) == expected_count); - - return NULL; -} - -typedef const char *(*Test)(Tag, unsigned, unsigned); -#define TEST(x) \ - { \ -#x, x \ + for (int i = 0; i < MAX_PRODUCER; i++) { + ATOMIC_ADD(&cnt_producer, 1); + pthread_create(&thread_pros[i], NULL, add_queue, (void *) &ctx); } -struct { - const char *name; - Test test; -} static tests[] = { - TEST(null_pointers), TEST(create), TEST(empty), TEST(full), TEST(sums10000), -}; - -#define TEST_COUNT 5 - -int main(int arg_count, char **args) -{ - (void) arg_count; - (void) args; + ATOMIC_SUB(&cnt_producer, 1); - struct { - unsigned count_in; - unsigned count_out; - } thread_counts[(Max + 1)] = { - {1, 1}, - {QUEUE_TEST_THREADS_MAX, 1}, - {1, QUEUE_TEST_THREADS_MAX}, - {QUEUE_TEST_THREADS_MAX, QUEUE_TEST_THREADS_MAX}}; + for (int i = 0; i < MAX_PRODUCER; i++) + pthread_join(thread_pros[i], NULL); - for (unsigned tag = 0; tag < (Max + 1); tag++) { - for (unsigned j = 0; j < TEST_COUNT; j++) { - const char *error = tests[j].test(tag, thread_counts[tag].count_in, - thread_counts[tag].count_out); + for (int i = 0; i < MAX_CONSUMER; i++) + pthread_join(thread_cons[i], NULL); - printf("Test: %s: %-20s: %s%s\n", tag_to_name[tag], tests[j].name, - (error ? "FAIL: " : "PASS"), (error ? error : "")); + long cnt_free = lfg_count_freelist(&ctx); + int clean = lfq_release(&ctx); + printf("Total push %" PRId64 " elements, pop %" PRId64 + " elements. freelist=%ld, clean = %d\n", + cnt_added, cnt_removed, cnt_free, clean); - fflush(stdout); + if (cnt_added == cnt_removed) + printf("Test PASS!!\n"); + else + printf("Test Failed!!\n"); - if (error) - return 1; - } - } - - return 0; + return (cnt_added != cnt_removed); }