Skip to content

Commit 43b2aa9

Browse files
committed
Add A concurrent list utilizing simplified RCU
1 parent 6bbf476 commit 43b2aa9

File tree

3 files changed

+390
-0
lines changed

3 files changed

+390
-0
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
- [mpmc](mpmc/): A multiple-producer/multiple-consumer (MPMC) queue.
1414
- [channel](channel/): A Linux futex based Go channel implementation.
1515
- [redirect](redirect/): An I/O multiplexer to monitor stdin redirect using `timerfd` and `epoll`.
16+
- [rcu\_list](rcu_list/): A concurrent linked list utilizing the simplified RCU algorithm.
17+
1618

1719
## License
1820

rcu_list/Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
all:
2+
$(CC) -Wall -o rcu_list rcu_list.c -lpthread -g -fsanitize=thread
3+
4+
indent:
5+
clang-format -i rcu_list.c
6+
7+
clean:
8+
rm -f rcu_list

rcu_list/rcu_list.c

Lines changed: 380 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,380 @@
1+
/* A concurrent linked list utilizing the simplified RCU algorithm */
2+
3+
#include <stdbool.h>
4+
5+
typedef struct rcu_list rcu_list_t;
6+
7+
typedef struct {
8+
struct list_node *entry;
9+
} iterator_t;
10+
11+
typedef struct {
12+
struct rcu_list *list;
13+
struct zombie_node *zombie;
14+
} rcu_handle_t;
15+
16+
typedef rcu_handle_t read_handle_t;
17+
typedef rcu_handle_t write_handle_t;
18+
19+
typedef void (*deleter_func_t)(void *);
20+
typedef bool (*finder_func_t)(void *, void *);
21+
22+
#define _GNU_SOURCE
23+
#include <pthread.h>
24+
#include <stdbool.h>
25+
#include <stdlib.h>
26+
27+
typedef struct list_node {
28+
bool deleted;
29+
struct list_node *next, *prev;
30+
void *data;
31+
} list_node_t;
32+
33+
typedef struct zombie_node {
34+
struct zombie_node *next;
35+
struct list_node *zombie;
36+
rcu_handle_t *owner;
37+
} zombie_node_t;
38+
39+
struct rcu_list {
40+
pthread_mutex_t write_lock; /* exclusive lock acquired by writers */
41+
list_node_t *head, *tail; /* head and tail of the "live" list */
42+
zombie_node_t *zombie_head; /* head of the zombie list */
43+
deleter_func_t deleter;
44+
};
45+
46+
static list_node_t *make_node(void *data);
47+
48+
static zombie_node_t *make_zombie_node(void);
49+
50+
static void lock_for_write(rcu_list_t *list);
51+
static void unlock_for_write(rcu_list_t *list);
52+
53+
static rcu_list_t *list_new_with_deleter(deleter_func_t deleter)
54+
{
55+
if (!deleter)
56+
return NULL;
57+
58+
rcu_list_t *list = malloc(sizeof(rcu_list_t));
59+
if (!list)
60+
return NULL;
61+
62+
if (pthread_mutex_init(&list->write_lock, NULL) != 0) {
63+
free(list);
64+
return NULL;
65+
}
66+
67+
list->head = list->tail = NULL;
68+
list->zombie_head = NULL;
69+
list->deleter = deleter;
70+
71+
return list;
72+
}
73+
74+
static void list_free(void *arg)
75+
{
76+
rcu_list_t *list = arg;
77+
for (list_node_t *iter = list->head; iter;) {
78+
list_node_t *tmp = iter->next;
79+
free(iter->data);
80+
free(iter);
81+
iter = tmp;
82+
}
83+
free(list);
84+
}
85+
86+
rcu_list_t *list_new(void)
87+
{
88+
return list_new_with_deleter(list_free);
89+
}
90+
91+
void list_delete(rcu_list_t *list)
92+
{
93+
if (!list || !list->deleter)
94+
return;
95+
list->deleter(list);
96+
}
97+
98+
void list_push_front(rcu_list_t *list, void *data, write_handle_t *handle)
99+
{
100+
if (!list)
101+
return;
102+
103+
list_node_t *node = make_node(data);
104+
if (!node)
105+
return;
106+
107+
lock_for_write(list);
108+
109+
list_node_t *old_head;
110+
__atomic_load(&list->head, &old_head, __ATOMIC_RELAXED);
111+
112+
if (!old_head) {
113+
/* list is currently empty */
114+
__atomic_store(&list->head, &node, __ATOMIC_SEQ_CST);
115+
__atomic_store(&list->tail, &node, __ATOMIC_SEQ_CST);
116+
} else {
117+
/* general case */
118+
__atomic_store(&node->next, &old_head, __ATOMIC_SEQ_CST);
119+
__atomic_store(&old_head->prev, &node, __ATOMIC_SEQ_CST);
120+
__atomic_store(&list->head, &node, __ATOMIC_SEQ_CST);
121+
}
122+
123+
unlock_for_write(list);
124+
}
125+
126+
iterator_t list_find(rcu_list_t *list,
127+
void *data,
128+
finder_func_t finder,
129+
read_handle_t *handle)
130+
{
131+
iterator_t iter = {.entry = NULL}; /* initialize an invalid iterator */
132+
if (!list)
133+
return iter;
134+
135+
list_node_t *current;
136+
__atomic_load(&list->head, &current, __ATOMIC_SEQ_CST);
137+
138+
while (current) {
139+
if (finder(current->data, data)) {
140+
iter.entry = current;
141+
break;
142+
}
143+
144+
__atomic_load(&current->next, &current, __ATOMIC_SEQ_CST);
145+
}
146+
return iter;
147+
}
148+
149+
iterator_t list_begin(rcu_list_t *list, read_handle_t *handle)
150+
{
151+
iterator_t iter = {.entry = NULL};
152+
if (!list)
153+
return iter;
154+
155+
list_node_t *head;
156+
__atomic_load(&list->head, &head, __ATOMIC_SEQ_CST);
157+
158+
iter.entry = head;
159+
return iter;
160+
}
161+
162+
void *iterator_get(iterator_t *iter)
163+
{
164+
return (iter && iter->entry) ? iter->entry->data : NULL;
165+
}
166+
167+
read_handle_t list_register_reader(rcu_list_t *list)
168+
{
169+
read_handle_t handle = {.list = list, .zombie = NULL};
170+
return handle;
171+
}
172+
173+
write_handle_t list_register_writer(rcu_list_t *list)
174+
{
175+
write_handle_t handle = {.list = list, .zombie = NULL};
176+
return handle;
177+
}
178+
179+
void rcu_read_lock(read_handle_t *handle)
180+
{
181+
zombie_node_t *z_node = make_zombie_node();
182+
183+
z_node->owner = handle;
184+
handle->zombie = z_node;
185+
186+
rcu_list_t *list = handle->list;
187+
188+
zombie_node_t *old_head;
189+
__atomic_load(&list->zombie_head, &old_head, __ATOMIC_SEQ_CST);
190+
191+
do {
192+
__atomic_store(&z_node->next, &old_head, __ATOMIC_SEQ_CST);
193+
194+
} while (!__atomic_compare_exchange(&list->zombie_head, &old_head, &z_node,
195+
true, __ATOMIC_SEQ_CST,
196+
__ATOMIC_SEQ_CST));
197+
}
198+
199+
void rcu_read_unlock(read_handle_t *handle)
200+
{
201+
zombie_node_t *z_node = handle->zombie;
202+
203+
zombie_node_t *cached_next;
204+
__atomic_load(&z_node->next, &cached_next, __ATOMIC_SEQ_CST);
205+
206+
bool last = true;
207+
208+
/* walk through the zombie list to determine if this is the last active
209+
* reader in the list.
210+
*/
211+
zombie_node_t *n = cached_next;
212+
while (n) {
213+
list_node_t *owner;
214+
__atomic_load(&n->owner, &owner, __ATOMIC_SEQ_CST);
215+
216+
if (owner) {
217+
last = false; /* this is not the last active reader */
218+
break;
219+
}
220+
221+
__atomic_load(&n->next, &n, __ATOMIC_SEQ_CST);
222+
}
223+
224+
n = cached_next;
225+
226+
if (last) {
227+
while (n) {
228+
list_node_t *dead_node = n->zombie;
229+
free(dead_node);
230+
231+
zombie_node_t *old_node = n;
232+
__atomic_load(&n->next, &n, __ATOMIC_SEQ_CST);
233+
free(old_node);
234+
}
235+
236+
__atomic_store(&z_node->next, &n, __ATOMIC_SEQ_CST);
237+
}
238+
239+
void *null = NULL;
240+
__atomic_store(&z_node->owner, &null, __ATOMIC_SEQ_CST);
241+
}
242+
243+
void rcu_write_lock(write_handle_t *handle)
244+
{
245+
rcu_read_lock(handle);
246+
}
247+
248+
void rcu_write_unlock(write_handle_t *handle)
249+
{
250+
rcu_read_unlock(handle);
251+
}
252+
253+
static list_node_t *make_node(void *data)
254+
{
255+
list_node_t *node = malloc(sizeof(list_node_t));
256+
if (!node)
257+
return NULL;
258+
259+
node->data = data;
260+
node->next = node->prev = NULL;
261+
node->deleted = false;
262+
263+
return node;
264+
}
265+
266+
static zombie_node_t *make_zombie_node(void)
267+
{
268+
zombie_node_t *z_node = malloc(sizeof(zombie_node_t));
269+
if (!z_node)
270+
return NULL;
271+
272+
z_node->zombie = NULL;
273+
z_node->owner = NULL;
274+
z_node->next = NULL;
275+
276+
return z_node;
277+
}
278+
279+
static void lock_for_write(rcu_list_t *list)
280+
{
281+
pthread_mutex_lock(&list->write_lock);
282+
}
283+
284+
static void unlock_for_write(rcu_list_t *list)
285+
{
286+
pthread_mutex_unlock(&list->write_lock);
287+
}
288+
289+
/* test program starts here */
290+
291+
#include <assert.h>
292+
#include <stdio.h>
293+
#include <stdlib.h>
294+
295+
typedef struct dummy {
296+
int a, b;
297+
} dummy_t;
298+
299+
static dummy_t *make_dummy(int a, int b)
300+
{
301+
dummy_t *dummy = malloc(sizeof(dummy_t));
302+
dummy->a = a, dummy->b = b;
303+
return dummy;
304+
}
305+
306+
static bool finder(void *x, void *y)
307+
{
308+
dummy_t *dx = x, *dy = y;
309+
return (dx->a == dy->a) && (dx->b == dy->b);
310+
}
311+
312+
static void *reader_thread(void *arg)
313+
{
314+
rcu_list_t *list = arg;
315+
read_handle_t reader = list_register_reader(list);
316+
317+
rcu_read_lock(&reader);
318+
319+
/* read from list here */
320+
iterator_t iter = list_find(list, &(dummy_t){1, 1}, finder, &reader);
321+
void *data = iterator_get(&iter);
322+
assert(data);
323+
324+
iter = list_find(list, &(dummy_t){2, 2}, finder, &reader);
325+
data = iterator_get(&iter);
326+
assert(data);
327+
328+
iterator_t first = list_begin(list, &reader);
329+
data = iterator_get(&first);
330+
assert(data);
331+
332+
dummy_t *as_d2 = data;
333+
assert(as_d2->a == 2);
334+
assert(as_d2->b == 2);
335+
336+
assert(iter.entry == first.entry);
337+
338+
rcu_read_unlock(&reader);
339+
return NULL;
340+
}
341+
342+
static void *writer_thread(void *arg)
343+
{
344+
dummy_t *d1 = make_dummy(1, 1);
345+
dummy_t *d2 = make_dummy(2, 2);
346+
347+
rcu_list_t *list = arg;
348+
write_handle_t writer = list_register_writer(list);
349+
350+
rcu_write_lock(&writer);
351+
352+
/* write to list here */
353+
list_push_front(list, d1, &writer);
354+
list_push_front(list, d2, &writer);
355+
356+
rcu_write_unlock(&writer);
357+
return NULL;
358+
}
359+
360+
#define N_READERS 10
361+
362+
int main(void)
363+
{
364+
rcu_list_t *list = list_new();
365+
dummy_t *d0 = make_dummy(0, 0);
366+
list_push_front(list, d0, NULL);
367+
368+
pthread_t t0, t_r[N_READERS];
369+
pthread_create(&t0, NULL, writer_thread, list);
370+
for (int i = 0; i < N_READERS; i++)
371+
pthread_create(&t_r[i], NULL, reader_thread, list);
372+
373+
for (int i = 0; i < N_READERS; i++)
374+
pthread_join(t_r[i], NULL);
375+
pthread_join(t0, NULL);
376+
377+
list_delete(list);
378+
379+
return EXIT_SUCCESS;
380+
}

0 commit comments

Comments
 (0)