Skip to content

Commit 9618dd0

Browse files
committed
Add Quiescent state based reclamation (QSBR)
1 parent 29fb6de commit 9618dd0

File tree

4 files changed

+378
-0
lines changed

4 files changed

+378
-0
lines changed

.clang-format

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ ForEachMacros:
2525
- rb_list_foreach
2626
- rb_list_foreach_safe
2727
- EV_FOREACH
28+
- LIST_FOREACH

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
- [redirect](redirect/): An I/O multiplexer to monitor stdin redirect using `timerfd` and `epoll`.
1616
- [rcu\_list](rcu_list/): A concurrent linked list utilizing the simplified RCU algorithm.
1717
- [ringbuf\_shm](ringbuf_shm/): An optimized lock-free ring buffer with shared memory.
18+
- [qsbr](qsbr/): An implementation of Quiescent state based reclamation (QSBR).
1819

1920
## License
2021

qsbr/Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
all:
2+
$(CC) -Wall -o main main.c -lpthread
3+
4+
indent:
5+
clang-format -i main.c
6+
7+
clean:
8+
rm -f main

qsbr/main.c

Lines changed: 368 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,368 @@
1+
#include <assert.h>
2+
#include <stdbool.h>
3+
4+
/* Branch prediction hints */
5+
#define unlikely(x) __builtin_expect((x) != 0, 0)
6+
7+
#ifndef atomic_fetch_add
8+
#define atomic_fetch_add(x, a) __sync_fetch_and_add(x, a)
9+
#endif
10+
11+
#ifndef atomic_thread_fence
12+
#define memory_order_relaxed __ATOMIC_RELAXED
13+
#define memory_order_acquire __ATOMIC_ACQUIRE
14+
#define memory_order_release __ATOMIC_RELEASE
15+
#define memory_order_seq_cst __ATOMIC_SEQ_CST
16+
#define atomic_thread_fence(m) __atomic_thread_fence(m)
17+
#endif
18+
19+
#ifndef atomic_store_explicit
20+
#define atomic_store_explicit __atomic_store_n
21+
#endif
22+
23+
#ifndef atomic_load_explicit
24+
#define atomic_load_explicit __atomic_load_n
25+
#endif
26+
27+
/* Exponential back-off for the spinning paths */
28+
#define SPINLOCK_BACKOFF_MIN 4
29+
#define SPINLOCK_BACKOFF_MAX 128
30+
#if defined(__x86_64__) || defined(__i386__)
31+
#define SPINLOCK_BACKOFF_HOOK __asm volatile("pause" ::: "memory")
32+
#else
33+
#define SPINLOCK_BACKOFF_HOOK \
34+
do { \
35+
} while (0)
36+
#endif
37+
#define SPINLOCK_BACKOFF(count) \
38+
do { \
39+
for (int __i = (count); __i != 0; __i--) { \
40+
SPINLOCK_BACKOFF_HOOK; \
41+
} \
42+
if ((count) < SPINLOCK_BACKOFF_MAX) \
43+
(count) += (count); \
44+
} while (0)
45+
46+
#define CACHE_LINE_SIZE 64
47+
48+
/* Quiescent state based reclamation (QSBR).
49+
*
50+
* Each registered thread has to periodically indicate that it is in a
51+
* quiescent i.e. the state when it does not hold any memory references to the
52+
* objects which may be garbage collected. A typical use of the qsbr_checkpoint
53+
* function would be e.g. after processing a single request when any shared
54+
* state is no longer referenced. The higher the period, the higher the
55+
* reclamation granularity.
56+
*
57+
* Writers i.e. threads which are trying to garbage collect the object should
58+
* ensure that the objects are no longer globally visible and then issue a
59+
* barrier using qsbr_barrier function. This function returns a generation
60+
* number. It is safe to reclaim the said objects when qsbr_sync returns true
61+
* on a given number.
62+
*
63+
* Note that this interface is asynchronous.
64+
*/
65+
66+
#include <errno.h>
67+
#include <pthread.h>
68+
#include <stdint.h>
69+
#include <stdlib.h>
70+
#include <string.h>
71+
#include <sys/queue.h>
72+
73+
typedef uint64_t qsbr_epoch_t;
74+
75+
typedef struct qsbr_tls {
76+
/* The thread (local) epoch, observed at qsbr_checkpoint. Also, a pointer
77+
* to the TLS structure of a next thread.
78+
*/
79+
qsbr_epoch_t local_epoch;
80+
LIST_ENTRY(qsbr_tls) entry;
81+
} qsbr_tls_t;
82+
83+
typedef struct qsbr {
84+
/* The global epoch, TLS key with a list of the registered threads. */
85+
qsbr_epoch_t global_epoch;
86+
pthread_key_t tls_key;
87+
pthread_mutex_t lock;
88+
LIST_HEAD(priv, qsbr_tls) list;
89+
} qsbr_t;
90+
91+
qsbr_t *qsbr_create(void)
92+
{
93+
qsbr_t *qs;
94+
int ret = posix_memalign((void **) &qs, CACHE_LINE_SIZE, sizeof(qsbr_t));
95+
if (ret != 0) {
96+
errno = ret;
97+
return NULL;
98+
}
99+
memset(qs, 0, sizeof(qsbr_t));
100+
101+
if (pthread_key_create(&qs->tls_key, free) != 0) {
102+
free(qs);
103+
return NULL;
104+
}
105+
pthread_mutex_init(&qs->lock, NULL);
106+
qs->global_epoch = 1;
107+
return qs;
108+
}
109+
110+
void qsbr_destroy(qsbr_t *qs)
111+
{
112+
pthread_key_delete(qs->tls_key);
113+
pthread_mutex_destroy(&qs->lock);
114+
free(qs);
115+
}
116+
117+
/* qsbr_register: register the current thread for QSBR. */
118+
int qsbr_register(qsbr_t *qs)
119+
{
120+
qsbr_tls_t *t = pthread_getspecific(qs->tls_key);
121+
if (unlikely(!t)) {
122+
int ret =
123+
posix_memalign((void **) &t, CACHE_LINE_SIZE, sizeof(qsbr_tls_t));
124+
if (ret != 0) {
125+
errno = ret;
126+
return -1;
127+
}
128+
pthread_setspecific(qs->tls_key, t);
129+
}
130+
memset(t, 0, sizeof(qsbr_tls_t));
131+
132+
pthread_mutex_lock(&qs->lock);
133+
LIST_INSERT_HEAD(&qs->list, t, entry);
134+
pthread_mutex_unlock(&qs->lock);
135+
return 0;
136+
}
137+
138+
void qsbr_unregister(qsbr_t *qsbr)
139+
{
140+
qsbr_tls_t *t = pthread_getspecific(qsbr->tls_key);
141+
if (!t)
142+
return;
143+
144+
pthread_setspecific(qsbr->tls_key, NULL);
145+
146+
pthread_mutex_lock(&qsbr->lock);
147+
LIST_REMOVE(t, entry);
148+
pthread_mutex_unlock(&qsbr->lock);
149+
free(t);
150+
}
151+
152+
/* qsbr_checkpoint: indicate a quiescent state of the current thread. */
153+
void qsbr_checkpoint(qsbr_t *qs)
154+
{
155+
qsbr_tls_t *t = pthread_getspecific(qs->tls_key);
156+
assert(t);
157+
158+
/* Observe the current epoch and issue a load barrier.
159+
*
160+
* Additionally, issue a store barrier before observation, so the callers
161+
* could assume qsbr_checkpoint() being a full barrier.
162+
*/
163+
atomic_thread_fence(memory_order_seq_cst);
164+
t->local_epoch = qs->global_epoch;
165+
}
166+
167+
qsbr_epoch_t qsbr_barrier(qsbr_t *qs)
168+
{
169+
/* Note: atomic operation will issue a store barrier. */
170+
return atomic_fetch_add(&qs->global_epoch, 1) + 1;
171+
}
172+
173+
bool qsbr_sync(qsbr_t *qs, qsbr_epoch_t target)
174+
{
175+
/* First, our thread should observe the epoch itself. */
176+
qsbr_checkpoint(qs);
177+
178+
/* Have all threads observed the target epoch? */
179+
qsbr_tls_t *t;
180+
LIST_FOREACH (t, &qs->list, entry) {
181+
if (t->local_epoch < target) /* not ready to reclaim */
182+
return false;
183+
}
184+
185+
/* Detected the grace period */
186+
return true;
187+
}
188+
189+
/* Test program starts here */
190+
191+
#include <inttypes.h>
192+
#include <signal.h>
193+
#include <stdio.h>
194+
#include <unistd.h>
195+
196+
static unsigned nsec = 10; /* seconds */
197+
198+
static pthread_barrier_t barrier;
199+
static unsigned n_workers;
200+
static volatile bool stop;
201+
202+
typedef struct {
203+
unsigned int *ptr;
204+
bool visible;
205+
char _pad[CACHE_LINE_SIZE - 8 - 4 - 4 - 8];
206+
} data_struct_t;
207+
208+
#define N_DS 4
209+
210+
#define MAGIC 0xDEADBEEF
211+
static unsigned magic_val = MAGIC;
212+
213+
static qsbr_t *qsbr;
214+
215+
static data_struct_t ds[N_DS] __attribute__((__aligned__(CACHE_LINE_SIZE)));
216+
static uint64_t destructions;
217+
218+
static void access_obj(data_struct_t *obj)
219+
{
220+
if (atomic_load_explicit(&obj->visible, memory_order_relaxed)) {
221+
atomic_thread_fence(memory_order_acquire);
222+
if (*obj->ptr != MAGIC)
223+
abort();
224+
}
225+
}
226+
227+
static void mock_insert_obj(data_struct_t *obj)
228+
{
229+
obj->ptr = &magic_val;
230+
assert(!obj->visible);
231+
atomic_thread_fence(memory_order_release);
232+
atomic_store_explicit(&obj->visible, true, memory_order_relaxed);
233+
}
234+
235+
static void mock_remove_obj(data_struct_t *obj)
236+
{
237+
assert(obj->visible);
238+
obj->visible = false;
239+
}
240+
241+
static void mock_destroy_obj(data_struct_t *obj)
242+
{
243+
obj->ptr = NULL;
244+
destructions++;
245+
}
246+
247+
/* QSBR stress test */
248+
249+
static void qsbr_writer(unsigned target)
250+
{
251+
data_struct_t *obj = &ds[target];
252+
253+
if (obj->visible) {
254+
/* The data structure is visible. First, ensure it is no longer
255+
* visible (think of "remove" semantics).
256+
*/
257+
unsigned count = SPINLOCK_BACKOFF_MIN;
258+
qsbr_epoch_t target_epoch;
259+
260+
mock_remove_obj(obj);
261+
262+
/* QSBR synchronisation barrier. */
263+
target_epoch = qsbr_barrier(qsbr);
264+
while (!qsbr_sync(qsbr, target_epoch)) {
265+
SPINLOCK_BACKOFF(count);
266+
/* Other threads might have exited and the checkpoint would never
267+
* be passed.
268+
*/
269+
if (stop)
270+
return;
271+
}
272+
273+
/* It is safe to "destroy" the object now. */
274+
mock_destroy_obj(obj);
275+
} else {
276+
/* Data structure is not globally visible. Set the value and make it
277+
* visible (think of the "insert" semantics).
278+
*/
279+
mock_insert_obj(obj);
280+
}
281+
}
282+
283+
static void *qsbr_stress(void *arg)
284+
{
285+
const unsigned id = (uintptr_t) arg;
286+
unsigned n = 0;
287+
288+
qsbr_register(qsbr);
289+
290+
/* There are NCPU threads concurrently reading data and a single writer
291+
* thread (ID 0) modifying data. The writer will modify the pointer used
292+
* by the readers to NULL as soon as it considers the object ready for
293+
* reclaim.
294+
*/
295+
pthread_barrier_wait(&barrier);
296+
while (!stop) {
297+
n = (n + 1) & (N_DS - 1);
298+
if (id == 0) {
299+
qsbr_writer(n);
300+
continue;
301+
}
302+
303+
/* Reader: iterate through the data structures and if the object is
304+
* visible (think of "lookup" semantics), read its value through a
305+
* pointer. The writer will set the pointer to NULL when it thinks
306+
* the object is ready to be reclaimed.
307+
*
308+
* Incorrect reclamation mechanism would lead to the crash in the
309+
* following pointer dereference.
310+
*/
311+
access_obj(&ds[n]);
312+
qsbr_checkpoint(qsbr);
313+
}
314+
pthread_barrier_wait(&barrier);
315+
qsbr_unregister(qsbr);
316+
pthread_exit(NULL);
317+
return NULL;
318+
}
319+
320+
static void leave(int sig)
321+
{
322+
(void) sig;
323+
stop = true;
324+
}
325+
326+
typedef void *(*func_t)(void *);
327+
328+
static void run_test(func_t func)
329+
{
330+
struct sigaction sigalarm;
331+
332+
n_workers = sysconf(_SC_NPROCESSORS_CONF);
333+
pthread_t *thr = calloc(n_workers, sizeof(pthread_t));
334+
pthread_barrier_init(&barrier, NULL, n_workers);
335+
stop = false;
336+
337+
memset(&sigalarm, 0, sizeof(struct sigaction));
338+
sigalarm.sa_handler = leave;
339+
int ret = sigaction(SIGALRM, &sigalarm, NULL);
340+
assert(ret == 0);
341+
342+
memset(&ds, 0, sizeof(ds));
343+
qsbr = qsbr_create();
344+
destructions = 0;
345+
346+
alarm(nsec); /* Spin the test */
347+
348+
for (unsigned i = 0; i < n_workers; i++) {
349+
if ((errno = pthread_create(&thr[i], NULL, func,
350+
(void *) (uintptr_t) i)) != 0) {
351+
exit(EXIT_FAILURE);
352+
}
353+
}
354+
for (unsigned i = 0; i < n_workers; i++)
355+
pthread_join(thr[i], NULL);
356+
pthread_barrier_destroy(&barrier);
357+
printf("# %" PRIu64 "\n", destructions);
358+
359+
qsbr_destroy(qsbr);
360+
}
361+
362+
int main(int argc, char **argv)
363+
{
364+
printf("stress test...\n");
365+
run_test(qsbr_stress);
366+
printf("OK\n");
367+
return 0;
368+
}

0 commit comments

Comments
 (0)