-
Notifications
You must be signed in to change notification settings - Fork 87
lf-queue: Rewrite based on Nikolaev's paper and hazard pointers #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,20 @@ | ||
CFLAGS = -Wall -O2 -g -I. | ||
CFLAGS = -std=gnu11 -O3 -Wall -Wextra | ||
LDFLAGS = -lpthread | ||
|
||
# Enable ThreadSanitizer | ||
# CFLAGS += -fsanitize=thread | ||
# LDFLAGS += -fsanitize=thread | ||
targets = main1 main2 main3 main4 | ||
|
||
all: test | ||
all: $(targets) | ||
|
||
test: test.c | ||
$(CC) $(CFLAGS) -o $@ $< $(LDFLAGS) | ||
tsan: CFLAGS += -fsanitize=thread -g | ||
tsan: all | ||
|
||
clean: | ||
rm -f test | ||
rm -f $(targets) | ||
|
||
main%: lfq.c test.c | ||
$(CC) $(CFLAGS) $^ $(LDFLAGS) -o $@ | ||
|
||
main1: CFLAGS += -D MAX_PRODUCER=1 -D MAX_CONSUMER=1 | ||
main2: CFLAGS += -D MAX_PRODUCER=4 -D MAX_CONSUMER=4 | ||
main3: CFLAGS += -D MAX_PRODUCER=100 -D MAX_CONSUMER=10 | ||
main4: CFLAGS += -D MAX_PRODUCER=10 -D MAX_CONSUMER=100 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,5 @@ | ||
# Multithreaded Bounded Lock-free Queue | ||
# lfq | ||
The lock-free queue implementation is based on the paper [A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue](https://drops.dagstuhl.de/opus/volltexte/2019/11335/pdf/LIPIcs-DISC-2019-28.pdf) by Ruslan Nikolaev. | ||
Nikolaev's paper provides an overview of various methods for implementing lock-free queues. This implementation specifically adopts the SCQ (Scalable Circular Queue) approach, which is designed for bounded queues. However, this approach can be easily extended to support unbounded FIFO queues capable of storing an arbitrary number of elements. | ||
|
||
Features: | ||
* single header style for C11 | ||
* strongly typed | ||
* multithreaded | ||
* bounded | ||
* lock-free | ||
In addition, this implementation employs a hazard pointer-based memory reclamation system for concurrent queues. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
#pragma once | ||
|
||
#include <stdatomic.h> | ||
#include <stdbool.h> | ||
#include <stdlib.h> | ||
#include <string.h> | ||
|
||
#define ATOMIC_SUB atomic_fetch_sub | ||
#define CAS atomic_compare_exchange_strong | ||
|
||
/* The 2nd argument is limited to 1 on machines with TAS but not XCHG. | ||
* On x86 it is an arbitrary value. | ||
*/ | ||
#define XCHG atomic_exchange | ||
#define ATOMIC_ADD atomic_fetch_add | ||
#define mb() atomic_thread_fence(memory_order_seq_cst) | ||
|
||
/* Memory barriers*/ | ||
#define smp_mb() atomic_thread_fence(memory_order_seq_cst) | ||
#define smp_rmb() atomic_thread_fence(memory_order_acquire) | ||
#define smp_wmb() atomic_thread_fence(memory_order_release) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
#include <assert.h> | ||
#include <errno.h> | ||
#include <stdbool.h> | ||
|
||
#include "atomics.h" | ||
#include "lfq.h" | ||
|
||
#define MAX_FREE 150 | ||
|
||
static bool in_hp(struct lfq_ctx *ctx, struct lfq_node *node) | ||
{ | ||
for (int i = 0; i < ctx->MAX_HP_SIZE; i++) { | ||
if (atomic_load(&ctx->HP[i]) == node) | ||
return true; | ||
} | ||
return false; | ||
} | ||
|
||
/* add to tail of the free list */ | ||
static void insert_pool(struct lfq_ctx *ctx, struct lfq_node *node) | ||
{ | ||
atomic_store(&node->free_next, NULL); | ||
struct lfq_node *old_tail = XCHG(&ctx->fpt, node); /* seq_cst */ | ||
atomic_store(&old_tail->free_next, node); | ||
} | ||
|
||
static void free_pool(struct lfq_ctx *ctx, bool freeall) | ||
{ | ||
bool old = 0; | ||
if (!CAS(&ctx->is_freeing, &old, 1)) | ||
return; | ||
|
||
for (int i = 0; i < MAX_FREE || freeall; i++) { | ||
struct lfq_node *p = ctx->fph; | ||
if ((!atomic_load(&p->can_free)) || (!atomic_load(&p->free_next)) || | ||
in_hp(ctx, (struct lfq_node *) p)) | ||
break; | ||
ctx->fph = p->free_next; | ||
free(p); | ||
} | ||
atomic_store(&ctx->is_freeing, false); | ||
smp_mb(); | ||
} | ||
|
||
static void safe_free(struct lfq_ctx *ctx, struct lfq_node *node) | ||
{ | ||
if (atomic_load(&node->can_free) && !in_hp(ctx, node)) { | ||
/* free is not thread-safe */ | ||
bool old = 0; | ||
if (CAS(&ctx->is_freeing, &old, 1)) { | ||
/* poison the pointer to detect use-after-free */ | ||
node->next = (void *) -1; | ||
free(node); /* we got the lock; actually free */ | ||
atomic_store(&ctx->is_freeing, false); | ||
smp_mb(); | ||
} else /* we did not get the lock; only add to a freelist */ | ||
insert_pool(ctx, node); | ||
} else | ||
insert_pool(ctx, node); | ||
free_pool(ctx, false); | ||
} | ||
|
||
static int alloc_tid(struct lfq_ctx *ctx) | ||
{ | ||
for (int i = 0; i < ctx->MAX_HP_SIZE; i++) { | ||
if (ctx->tid_map[i] == 0) { | ||
int old = 0; | ||
if (CAS(&ctx->tid_map[i], &old, 1)) | ||
return i; | ||
} | ||
} | ||
|
||
return -1; | ||
} | ||
|
||
static void free_tid(struct lfq_ctx *ctx, int tid) | ||
{ | ||
ctx->tid_map[tid] = 0; | ||
} | ||
|
||
int lfq_init(struct lfq_ctx *ctx, int max_consume_thread) | ||
{ | ||
struct lfq_node *tmp = calloc(1, sizeof(struct lfq_node)); | ||
if (!tmp) | ||
return -errno; | ||
|
||
struct lfq_node *node = calloc(1, sizeof(struct lfq_node)); | ||
if (!node) | ||
return -errno; | ||
|
||
tmp->can_free = node->can_free = true; | ||
memset(ctx, 0, sizeof(struct lfq_ctx)); | ||
ctx->MAX_HP_SIZE = max_consume_thread; | ||
ctx->HP = calloc(max_consume_thread, sizeof(struct lfq_node)); | ||
ctx->tid_map = calloc(max_consume_thread, sizeof(struct lfq_node)); | ||
ctx->head = ctx->tail = tmp; | ||
ctx->fph = ctx->fpt = node; | ||
|
||
return 0; | ||
} | ||
|
||
long lfg_count_freelist(const struct lfq_ctx *ctx) | ||
{ | ||
long count = 0; | ||
for (struct lfq_node *p = (struct lfq_node *) ctx->fph; p; p = p->free_next) | ||
count++; | ||
return count; | ||
} | ||
|
||
int lfq_release(struct lfq_ctx *ctx) | ||
{ | ||
if (ctx->tail && ctx->head) { /* if we have data in queue */ | ||
while ((struct lfq_node *) ctx->head) { /* while still have node */ | ||
struct lfq_node *tmp = (struct lfq_node *) ctx->head->next; | ||
safe_free(ctx, (struct lfq_node *) ctx->head); | ||
ctx->head = tmp; | ||
} | ||
ctx->tail = 0; | ||
} | ||
if (ctx->fph && ctx->fpt) { | ||
free_pool(ctx, true); | ||
if (ctx->fph != ctx->fpt) | ||
return -1; | ||
free(ctx->fpt); /* free the empty node */ | ||
ctx->fph = ctx->fpt = 0; | ||
} | ||
if (ctx->fph || ctx->fpt) | ||
return -1; | ||
|
||
free(ctx->HP); | ||
free(ctx->tid_map); | ||
memset(ctx, 0, sizeof(struct lfq_ctx)); | ||
|
||
return 0; | ||
} | ||
|
||
int lfq_enqueue(struct lfq_ctx *ctx, void *data) | ||
{ | ||
struct lfq_node *insert_node = calloc(1, sizeof(struct lfq_node)); | ||
if (!insert_node) | ||
return -errno; | ||
|
||
insert_node->data = data; | ||
struct lfq_node *old_tail = XCHG(&ctx->tail, insert_node); | ||
/* We have claimed our spot in the insertion order by modifying tail. | ||
* we are the only inserting thread with a pointer to the old tail. | ||
* | ||
* Now we can make it part of the list by overwriting the NULL pointer in | ||
* the old tail. This is safe whether or not other threads have updated | ||
* ->next in our insert_node. | ||
*/ | ||
assert(!old_tail->next && "old tail was not NULL"); | ||
atomic_store(&old_tail->next, insert_node); | ||
|
||
return 0; | ||
} | ||
|
||
void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid) | ||
{ | ||
struct lfq_node *old_head, *new_head; | ||
|
||
/* HP[tid] is necessary for deallocation. */ | ||
do { | ||
retry: | ||
/* continue jumps to the bottom of the loop, and would attempt a CAS | ||
* with uninitialized new_head. | ||
*/ | ||
old_head = atomic_load(&ctx->head); | ||
|
||
atomic_store(&ctx->HP[tid], old_head); | ||
mb(); | ||
|
||
/* another thread freed it before seeing our HP[tid] store */ | ||
if (old_head != atomic_load(&ctx->head)) | ||
goto retry; | ||
new_head = atomic_load(&old_head->next); | ||
|
||
if (new_head == 0) { | ||
atomic_store(&ctx->HP[tid], 0); | ||
return NULL; /* never remove the last node */ | ||
} | ||
} while (!CAS(&ctx->head, &old_head, new_head)); | ||
|
||
/* We have atomically advanced head, and we are the thread that won the race | ||
* to claim a node. We return the data from the *new* head. The list starts | ||
* off with a dummy node, so the current head is always a node that is | ||
* already been read. | ||
*/ | ||
atomic_store(&ctx->HP[tid], 0); | ||
void *ret = new_head->data; | ||
atomic_store(&new_head->can_free, true); | ||
|
||
/* we need to avoid freeing until other readers are definitely not going to | ||
* load its ->next in the CAS loop | ||
*/ | ||
safe_free(ctx, (struct lfq_node *) old_head); | ||
|
||
return ret; | ||
} | ||
|
||
void *lfq_dequeue(struct lfq_ctx *ctx) | ||
{ | ||
int tid = alloc_tid(ctx); | ||
/* To many thread race */ | ||
if (tid == -1) | ||
return (void *) -1; | ||
|
||
void *ret = lfq_dequeue_tid(ctx, tid); | ||
free_tid(ctx, tid); | ||
return ret; | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
#pragma once | ||
|
||
#include <stdalign.h> | ||
#include <stdbool.h> | ||
|
||
struct lfq_node { | ||
void *data; | ||
union { | ||
struct lfq_node *next; | ||
struct lfq_node *free_next; | ||
}; | ||
bool can_free; | ||
}; | ||
|
||
struct lfq_ctx { | ||
alignas(64) struct lfq_node *head; | ||
int count; | ||
struct lfq_node **HP; /* hazard pointers */ | ||
int *tid_map; | ||
bool is_freeing; | ||
struct lfq_node *fph, *fpt; /* free pool head/tail */ | ||
|
||
/* FIXME: get rid of struct. Make it configurable */ | ||
int MAX_HP_SIZE; | ||
|
||
/* avoid cacheline contention */ | ||
alignas(64) struct lfq_node *tail; | ||
}; | ||
|
||
/** | ||
* lfq_init - Initialize lock-free queue. | ||
* @ctx: Lock-free queue handler. | ||
* @max_consume_thread: Max consume thread numbers. If this value set to zero, | ||
* use default value (16). | ||
* Return zero on success. On error, negative errno. | ||
*/ | ||
int lfq_init(struct lfq_ctx *ctx, int max_consume_thread); | ||
|
||
/** | ||
* lfq_release - Release lock-free queue from ctx. | ||
* @ctx: Lock-free queue handler. | ||
* Return zero on success. On error, -1. | ||
*/ | ||
int lfq_release(struct lfq_ctx *ctx); | ||
|
||
/* internal function */ | ||
long lfg_count_freelist(const struct lfq_ctx *ctx); | ||
|
||
/** | ||
* lfq_enqueue - Push data into queue. | ||
* @ctx: Lock-free queue handler. | ||
* @data: User data | ||
* Return zero on success. On error, negative errno. | ||
*/ | ||
int lfq_enqueue(struct lfq_ctx *ctx, void *data); | ||
|
||
/** | ||
* lfq_dequeue_tid - Pop data from queue. | ||
* @ctx: Lock-free queue handler. | ||
* @tid: Unique thread id. | ||
* Return zero if empty queue. On error, negative errno. | ||
*/ | ||
void *lfq_dequeue_tid(struct lfq_ctx *ctx, int tid); | ||
|
||
/** | ||
* lfq_dequeue - Pop data from queue. | ||
* @ctx: Lock-free queue handler. | ||
*/ | ||
void *lfq_dequeue(struct lfq_ctx *ctx); |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.