1
+ #include <assert.h>
2
+ #include <errno.h>
3
+ #include <stdbool.h>
4
+
5
+ #include "atomics.h"
6
+ #include "lfq.h"
7
+
8
+ #define MAX_FREE 150
9
+
10
+ static bool in_hp (struct lfq_ctx * ctx , struct lfq_node * node )
11
+ {
12
+ for (int i = 0 ; i < ctx -> MAX_HP_SIZE ; i ++ ) {
13
+ if (atomic_load (& ctx -> HP [i ]) == node )
14
+ return true;
15
+ }
16
+ return false;
17
+ }
18
+
19
+ /* add to tail of the free list */
20
+ static void insert_pool (struct lfq_ctx * ctx , struct lfq_node * node )
21
+ {
22
+ atomic_store (& node -> free_next , NULL );
23
+ struct lfq_node * old_tail = XCHG (& ctx -> fpt , node ); /* seq_cst */
24
+ atomic_store (& old_tail -> free_next , node );
25
+ }
26
+ static void free_pool (struct lfq_ctx * ctx , bool freeall )
27
+ {
28
+ bool old = 0 ;
29
+ if (!CAS (& ctx -> is_freeing , & old , 1 ))
30
+ return ;
31
+
32
+ for (int i = 0 ; i < MAX_FREE || freeall ; i ++ ) {
33
+ struct lfq_node * p = ctx -> fph ;
34
+ if ((!atomic_load (& p -> can_free )) || (!atomic_load (& p -> free_next )) ||
35
+ in_hp (ctx , (struct lfq_node * ) p ))
36
+ goto final ;
37
+ ctx -> fph = p -> free_next ;
38
+ free (p );
39
+ }
40
+ final :
41
+ atomic_store (& ctx -> is_freeing , false);
42
+ smb ();
43
+ }
44
+
45
+ static void safe_free (struct lfq_ctx * ctx , struct lfq_node * node )
46
+ {
47
+ if (atomic_load (& node -> can_free ) && !in_hp (ctx , node )) {
48
+ /* free is not thread-safe */
49
+ bool old = 0 ;
50
+ if (CAS (& ctx -> is_freeing , & old , 1 )) {
51
+ /* poison the pointer to detect use-after-free */
52
+ node -> next = (void * ) -1 ;
53
+ free (node ); /* we got the lock; actually free */
54
+ atomic_store (& ctx -> is_freeing , false);
55
+ smb ();
56
+ } else /* we did not get the lock; only add to a freelist */
57
+ insert_pool (ctx , node );
58
+ } else
59
+ insert_pool (ctx , node );
60
+ free_pool (ctx , false);
61
+ }
62
+
63
+ static int alloc_tid (struct lfq_ctx * ctx )
64
+ {
65
+ for (int i = 0 ; i < ctx -> MAX_HP_SIZE ; i ++ ) {
66
+ if (ctx -> tid_map [i ] == 0 ) {
67
+ int old = 0 ;
68
+ if (CAS (& ctx -> tid_map [i ], & old , 1 ))
69
+ return i ;
70
+ }
71
+ }
72
+
73
+ return -1 ;
74
+ }
75
+
76
+ static void free_tid (struct lfq_ctx * ctx , int tid )
77
+ {
78
+ ctx -> tid_map [tid ] = 0 ;
79
+ }
80
+
81
+ int lfq_init (struct lfq_ctx * ctx , int max_consume_thread )
82
+ {
83
+ struct lfq_node * tmp = calloc (1 , sizeof (struct lfq_node ));
84
+ if (!tmp )
85
+ return - errno ;
86
+
87
+ struct lfq_node * node = calloc (1 , sizeof (struct lfq_node ));
88
+ if (!node )
89
+ return - errno ;
90
+
91
+ tmp -> can_free = node -> can_free = true;
92
+ memset (ctx , 0 , sizeof (struct lfq_ctx ));
93
+ ctx -> MAX_HP_SIZE = max_consume_thread ;
94
+ ctx -> HP = calloc (max_consume_thread , sizeof (struct lfq_node ));
95
+ ctx -> tid_map = calloc (max_consume_thread , sizeof (struct lfq_node ));
96
+ ctx -> head = ctx -> tail = tmp ;
97
+ ctx -> fph = ctx -> fpt = node ;
98
+
99
+ return 0 ;
100
+ }
101
+
102
+ long lfg_count_freelist (const struct lfq_ctx * ctx )
103
+ {
104
+ long count = 0 ;
105
+ for (struct lfq_node * p = (struct lfq_node * ) ctx -> fph ; p ; p = p -> free_next )
106
+ count ++ ;
107
+ return count ;
108
+ }
109
+
110
+ int lfq_release (struct lfq_ctx * ctx )
111
+ {
112
+ if (ctx -> tail && ctx -> head ) { /* if we have data in queue */
113
+ while ((struct lfq_node * ) ctx -> head ) { /* while still have node */
114
+ struct lfq_node * tmp = (struct lfq_node * ) ctx -> head -> next ;
115
+ safe_free (ctx , (struct lfq_node * ) ctx -> head );
116
+ ctx -> head = tmp ;
117
+ }
118
+ ctx -> tail = 0 ;
119
+ }
120
+ if (ctx -> fph && ctx -> fpt ) {
121
+ free_pool (ctx , true);
122
+ if (ctx -> fph != ctx -> fpt )
123
+ return -1 ;
124
+ free (ctx -> fpt ); /* free the empty node */
125
+ ctx -> fph = ctx -> fpt = 0 ;
126
+ }
127
+ if (ctx -> fph || ctx -> fpt )
128
+ return -1 ;
129
+
130
+ free (ctx -> HP );
131
+ free (ctx -> tid_map );
132
+ memset (ctx , 0 , sizeof (struct lfq_ctx ));
133
+
134
+ return 0 ;
135
+ }
136
+
137
+ int lfq_enqueue (struct lfq_ctx * ctx , void * data )
138
+ {
139
+ struct lfq_node * insert_node = calloc (1 , sizeof (struct lfq_node ));
140
+ if (!insert_node )
141
+ return - errno ;
142
+
143
+ insert_node -> data = data ;
144
+ struct lfq_node * old_tail = XCHG (& ctx -> tail , insert_node );
145
+ /* We have claimed our spot in the insertion order by modifying tail.
146
+ * we are the only inserting thread with a pointer to the old tail.
147
+ *
148
+ * Now we can make it part of the list by overwriting the NULL pointer in
149
+ * the old tail. This is safe whether or not other threads have updated
150
+ * ->next in our insert_node.
151
+ */
152
+ #ifdef DEBUG
153
+ assert (!(old_tail -> next ) && "old tail was not NULL" );
154
+ #endif
155
+ atomic_store (& old_tail -> next , insert_node );
156
+ /* TODO: could a consumer thread could have freed the old tail? no because
157
+ * that would leave head=NULL
158
+ */
159
+
160
+ return 0 ;
161
+ }
162
+
163
+ void * lfq_dequeue_tid (struct lfq_ctx * ctx , int tid )
164
+ {
165
+ struct lfq_node * old_head , * new_head ;
166
+
167
+ /* HP[tid] is necessary for deallocation. */
168
+ do {
169
+ retry :
170
+ /* continue jumps to the bottom of the loop, and would attempt a CAS
171
+ * with uninitialized new_head.
172
+ */
173
+ old_head = atomic_load (& ctx -> head );
174
+
175
+ /* seq-cst store.
176
+ * FIXME: use xchg instead of mov + mfence on x86.
177
+ */
178
+ atomic_store (& ctx -> HP [tid ], old_head );
179
+ mb ();
180
+
181
+ /* another thread freed it before seeing our HP[tid] store */
182
+ if (old_head != atomic_load (& ctx -> head ))
183
+ goto retry ;
184
+ new_head = atomic_load (& old_head -> next );
185
+
186
+ if (new_head == 0 ) {
187
+ atomic_store (& ctx -> HP [tid ], 0 );
188
+ return NULL ; /* never remove the last node */
189
+ }
190
+ #ifdef DEBUG
191
+ // FIXME: check for already-freed nodes
192
+ // assert(new_head != (void *) -1 && "read an already-freed node");
193
+ #endif
194
+ } while (!CAS (& ctx -> head , & old_head , new_head ));
195
+
196
+ /* We have atomically advanced head, and we are the thread that won the race
197
+ * to claim a node. We return the data from the *new* head. The list starts
198
+ * off with a dummy node, so the current head is always a node that is
199
+ * already been read.
200
+ */
201
+ atomic_store (& ctx -> HP [tid ], 0 );
202
+ void * ret = new_head -> data ;
203
+ atomic_store (& new_head -> can_free , true);
204
+
205
+ /* we need to avoid freeing until other readers are definitely not going to
206
+ * load its ->next in the CAS loop
207
+ */
208
+ safe_free (ctx , (struct lfq_node * ) old_head );
209
+
210
+ return ret ;
211
+ }
212
+
213
+ void * lfq_dequeue (struct lfq_ctx * ctx )
214
+ {
215
+ int tid = alloc_tid (ctx );
216
+ /* To many thread race */
217
+ if (tid == -1 )
218
+ return (void * ) -1 ;
219
+
220
+ void * ret = lfq_dequeue_tid (ctx , tid );
221
+ free_tid (ctx , tid );
222
+ return ret ;
223
+ }
0 commit comments