Skip to content

Commit d335963

Browse files
committed
Add a lightweight thread pool
1 parent cb3e7e1 commit d335963

File tree

4 files changed

+374
-0
lines changed

4 files changed

+374
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Complementary Programs for course "Linux Kernel Internals"
22

33
## Project Listing
4+
- [tpool](tpool/): A lightweight thread pool.
45
- [picosh](picosh/): A minimalist UNIX shell.
56
- [httpd](httpd/): A multi-threaded web server.
67
- [ringbuffer](ringbuffer/): A lock-less ring buffer.

tpool/Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
all:
2+
$(CC) -Wall -o tpool tpool.c -lpthread -lm
3+
4+
clean:
5+
rm -f tpool
6+
7+
check: all
8+
./tpool

tpool/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# tpool
2+
3+
`tpool` is a lightweight, POSIX compliant thread pool implementation.
4+
- Timed wait for asynchronous execution result
5+
- Flexible to cancel pending tasks

tpool/tpool.c

Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
#include <stddef.h>
2+
3+
typedef struct __tpool_future *tpool_future_t;
4+
typedef struct __threadpool *tpool_t;
5+
6+
/**
7+
* Create a thread pool containing specified number of threads.
8+
* If successful, the thread pool is returned. Otherwise, it
9+
* returns NULL.
10+
*/
11+
tpool_t tpool_create(size_t count);
12+
13+
/**
14+
* Schedules the specific function to be executed.
15+
* If successful, a future object representing the execution of
16+
* the task is returned. Otherwise, it returns NULL.
17+
*/
18+
tpool_future_t tpool_apply(tpool_t pool, void *(*func)(void *), void *arg);
19+
20+
/**
21+
* Wait for all pending tasks to complete before destroying the thread pool.
22+
*/
23+
int tpool_join(tpool_t pool);
24+
25+
/**
26+
* Return the result when it becomes available.
27+
* If @seconds is non-zero and the result does not arrive within specified time,
28+
* NULL is returned. Each tpool_future_get() resets the timeout status on
29+
* @future.
30+
*/
31+
void *tpool_future_get(tpool_future_t future, unsigned int seconds);
32+
33+
/**
34+
* Destroy the future object and free resources once it is no longer used.
35+
* It is an error to refer to a destroyed future object. Note that destroying
36+
* a future object does not prevent a pending task from being executed.
37+
*/
38+
int tpool_future_destroy(tpool_future_t future);
39+
40+
#include <errno.h>
41+
#include <pthread.h>
42+
#include <stdlib.h>
43+
#include <time.h>
44+
45+
enum __future_flags {
46+
__FUTURE_RUNNING = 01,
47+
__FUTURE_FINISHED = 02,
48+
__FUTURE_TIMEOUT = 04,
49+
__FUTURE_CANCELLED = 010,
50+
__FUTURE_DESTROYED = 020,
51+
};
52+
53+
typedef struct __threadtask {
54+
void *(*func)(void *);
55+
void *arg;
56+
struct __tpool_future *future;
57+
struct __threadtask *next;
58+
} threadtask_t;
59+
60+
typedef struct __jobqueue {
61+
threadtask_t *head, *tail;
62+
pthread_cond_t cond_nonempty;
63+
pthread_mutex_t rwlock;
64+
} jobqueue_t;
65+
66+
struct __tpool_future {
67+
int flag;
68+
void *result;
69+
pthread_mutex_t mutex;
70+
pthread_cond_t cond_finished;
71+
};
72+
73+
struct __threadpool {
74+
size_t count;
75+
pthread_t *workers;
76+
jobqueue_t *jobqueue;
77+
};
78+
79+
static struct __tpool_future *tpool_future_create(void)
80+
{
81+
struct __tpool_future *future = malloc(sizeof(struct __tpool_future));
82+
if (future) {
83+
future->flag = 0;
84+
future->result = NULL;
85+
pthread_mutex_init(&future->mutex, NULL);
86+
pthread_condattr_t attr;
87+
pthread_condattr_init(&attr);
88+
pthread_cond_init(&future->cond_finished, &attr);
89+
pthread_condattr_destroy(&attr);
90+
}
91+
return future;
92+
}
93+
94+
int tpool_future_destroy(struct __tpool_future *future)
95+
{
96+
if (future) {
97+
pthread_mutex_lock(&future->mutex);
98+
if (future->flag & __FUTURE_FINISHED ||
99+
future->flag & __FUTURE_CANCELLED) {
100+
pthread_mutex_unlock(&future->mutex);
101+
pthread_mutex_destroy(&future->mutex);
102+
pthread_cond_destroy(&future->cond_finished);
103+
free(future);
104+
} else {
105+
future->flag |= __FUTURE_DESTROYED;
106+
pthread_mutex_unlock(&future->mutex);
107+
}
108+
}
109+
return 0;
110+
}
111+
112+
void *tpool_future_get(struct __tpool_future *future, unsigned int seconds)
113+
{
114+
pthread_mutex_lock(&future->mutex);
115+
/* turn off the timeout bit set previously */
116+
future->flag &= ~__FUTURE_TIMEOUT;
117+
while ((future->flag & __FUTURE_FINISHED) == 0) {
118+
if (seconds) {
119+
struct timespec expire_time;
120+
clock_gettime(CLOCK_MONOTONIC, &expire_time);
121+
expire_time.tv_sec += seconds;
122+
int status = pthread_cond_timedwait(&future->cond_finished,
123+
&future->mutex, &expire_time);
124+
if (status == ETIMEDOUT) {
125+
future->flag |= __FUTURE_TIMEOUT;
126+
pthread_mutex_unlock(&future->mutex);
127+
return NULL;
128+
}
129+
} else
130+
pthread_cond_wait(&future->cond_finished, &future->mutex);
131+
}
132+
133+
pthread_mutex_unlock(&future->mutex);
134+
return future->result;
135+
}
136+
137+
static jobqueue_t *jobqueue_create(void)
138+
{
139+
jobqueue_t *jobqueue = malloc(sizeof(jobqueue_t));
140+
if (jobqueue) {
141+
jobqueue->head = jobqueue->tail = NULL;
142+
pthread_cond_init(&jobqueue->cond_nonempty, NULL);
143+
pthread_mutex_init(&jobqueue->rwlock, NULL);
144+
}
145+
return jobqueue;
146+
}
147+
148+
static void jobqueue_destroy(jobqueue_t *jobqueue)
149+
{
150+
threadtask_t *tmp = jobqueue->head;
151+
while (tmp) {
152+
jobqueue->head = jobqueue->head->next;
153+
pthread_mutex_lock(&tmp->future->mutex);
154+
if (tmp->future->flag & __FUTURE_DESTROYED) {
155+
pthread_mutex_unlock(&tmp->future->mutex);
156+
pthread_mutex_destroy(&tmp->future->mutex);
157+
pthread_cond_destroy(&tmp->future->cond_finished);
158+
free(tmp->future);
159+
} else {
160+
tmp->future->flag |= __FUTURE_CANCELLED;
161+
pthread_mutex_unlock(&tmp->future->mutex);
162+
}
163+
free(tmp);
164+
tmp = jobqueue->head;
165+
}
166+
167+
pthread_mutex_destroy(&jobqueue->rwlock);
168+
pthread_cond_destroy(&jobqueue->cond_nonempty);
169+
free(jobqueue);
170+
}
171+
172+
static void __jobqueue_fetch_cleanup(void *arg)
173+
{
174+
pthread_mutex_t *mutex = (pthread_mutex_t *) arg;
175+
pthread_mutex_unlock(mutex);
176+
}
177+
178+
static void *jobqueue_fetch(void *queue)
179+
{
180+
jobqueue_t *jobqueue = (jobqueue_t *) queue;
181+
threadtask_t *task;
182+
int old_state;
183+
184+
pthread_cleanup_push(__jobqueue_fetch_cleanup, (void *) &jobqueue->rwlock);
185+
186+
while (1) {
187+
pthread_mutex_lock(&jobqueue->rwlock);
188+
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &old_state);
189+
pthread_testcancel();
190+
191+
while (!jobqueue->tail)
192+
pthread_cond_wait(&jobqueue->cond_nonempty, &jobqueue->rwlock);
193+
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_state);
194+
if (jobqueue->head == jobqueue->tail) {
195+
task = jobqueue->tail;
196+
jobqueue->head = jobqueue->tail = NULL;
197+
} else {
198+
threadtask_t *tmp;
199+
for (tmp = jobqueue->head; tmp->next != jobqueue->tail;
200+
tmp = tmp->next)
201+
;
202+
task = tmp->next;
203+
tmp->next = NULL;
204+
jobqueue->tail = tmp;
205+
}
206+
pthread_mutex_unlock(&jobqueue->rwlock);
207+
208+
if (task->func) {
209+
pthread_mutex_lock(&task->future->mutex);
210+
if (task->future->flag & __FUTURE_CANCELLED) {
211+
pthread_mutex_unlock(&task->future->mutex);
212+
free(task);
213+
continue;
214+
} else {
215+
task->future->flag |= __FUTURE_RUNNING;
216+
pthread_mutex_unlock(&task->future->mutex);
217+
}
218+
219+
void *ret_value = task->func(task->arg);
220+
pthread_mutex_lock(&task->future->mutex);
221+
if (task->future->flag & __FUTURE_DESTROYED) {
222+
pthread_mutex_unlock(&task->future->mutex);
223+
pthread_mutex_destroy(&task->future->mutex);
224+
pthread_cond_destroy(&task->future->cond_finished);
225+
free(task->future);
226+
} else {
227+
task->future->flag |= __FUTURE_FINISHED;
228+
task->future->result = ret_value;
229+
pthread_cond_broadcast(&task->future->cond_finished);
230+
pthread_mutex_unlock(&task->future->mutex);
231+
}
232+
free(task);
233+
} else {
234+
pthread_mutex_destroy(&task->future->mutex);
235+
pthread_cond_destroy(&task->future->cond_finished);
236+
free(task->future);
237+
free(task);
238+
break;
239+
}
240+
}
241+
242+
pthread_cleanup_pop(0);
243+
pthread_exit(NULL);
244+
}
245+
246+
struct __threadpool *tpool_create(size_t count)
247+
{
248+
jobqueue_t *jobqueue = jobqueue_create();
249+
struct __threadpool *pool = malloc(sizeof(struct __threadpool));
250+
if (!jobqueue || !pool) {
251+
if (jobqueue)
252+
jobqueue_destroy(jobqueue);
253+
free(pool);
254+
return NULL;
255+
}
256+
257+
pool->count = count, pool->jobqueue = jobqueue;
258+
if ((pool->workers = malloc(count * sizeof(pthread_t)))) {
259+
for (int i = 0; i < count; i++) {
260+
if (pthread_create(&pool->workers[i], NULL, jobqueue_fetch,
261+
(void *) jobqueue)) {
262+
for (int j = 0; j < i; j++)
263+
pthread_cancel(pool->workers[j]);
264+
for (int j = 0; j < i; j++)
265+
pthread_join(pool->workers[j], NULL);
266+
free(pool->workers);
267+
jobqueue_destroy(jobqueue);
268+
free(pool);
269+
return NULL;
270+
}
271+
}
272+
return pool;
273+
}
274+
275+
jobqueue_destroy(jobqueue);
276+
free(pool);
277+
return NULL;
278+
}
279+
280+
struct __tpool_future *tpool_apply(struct __threadpool *pool,
281+
void *(*func)(void *),
282+
void *arg)
283+
{
284+
jobqueue_t *jobqueue = pool->jobqueue;
285+
threadtask_t *new_head = malloc(sizeof(threadtask_t));
286+
struct __tpool_future *future = tpool_future_create();
287+
if (new_head && future) {
288+
new_head->func = func, new_head->arg = arg, new_head->future = future;
289+
pthread_mutex_lock(&jobqueue->rwlock);
290+
if (jobqueue->head) {
291+
new_head->next = jobqueue->head;
292+
jobqueue->head = new_head;
293+
} else {
294+
jobqueue->head = jobqueue->tail = new_head;
295+
pthread_cond_broadcast(&jobqueue->cond_nonempty);
296+
}
297+
pthread_mutex_unlock(&jobqueue->rwlock);
298+
} else if (new_head) {
299+
free(new_head);
300+
return NULL;
301+
} else if (future) {
302+
tpool_future_destroy(future);
303+
return NULL;
304+
}
305+
return future;
306+
}
307+
308+
int tpool_join(struct __threadpool *pool)
309+
{
310+
size_t num_threads = pool->count;
311+
for (int i = 0; i < num_threads; i++)
312+
tpool_apply(pool, NULL, NULL);
313+
for (int i = 0; i < num_threads; i++)
314+
pthread_join(pool->workers[i], NULL);
315+
free(pool->workers);
316+
jobqueue_destroy(pool->jobqueue);
317+
free(pool);
318+
return 0;
319+
}
320+
321+
#include <math.h>
322+
#include <stdio.h>
323+
324+
#define PRECISION 100 /* upper bound in BPP sum */
325+
326+
/* Use Bailey–Borwein–Plouffe formula to approximate PI */
327+
static void *bpp(void *arg)
328+
{
329+
int k = *(int *) arg;
330+
double sum = (4.0 / (8 * k + 1)) - (2.0 / (8 * k + 4)) -
331+
(1.0 / (8 * k + 5)) - (1.0 / (8 * k + 6));
332+
double *product = malloc(sizeof(double));
333+
if (product)
334+
*product = 1 / pow(16, k) * sum;
335+
return (void *) product;
336+
}
337+
338+
int main()
339+
{
340+
int bpp_args[PRECISION + 1];
341+
double bpp_sum = 0;
342+
tpool_t pool = tpool_create(4);
343+
tpool_future_t futures[PRECISION + 1];
344+
345+
for (int i = 0; i <= PRECISION; i++) {
346+
bpp_args[i] = i;
347+
futures[i] = tpool_apply(pool, bpp, (void *) &bpp_args[i]);
348+
}
349+
350+
for (int i = 0; i <= PRECISION; i++) {
351+
double *result = tpool_future_get(futures[i], 0 /* blocking wait */);
352+
bpp_sum += *result;
353+
tpool_future_destroy(futures[i]);
354+
free(result);
355+
}
356+
357+
tpool_join(pool);
358+
printf("PI calculated with %d terms: %.15f\n", PRECISION + 1, bpp_sum);
359+
return 0;
360+
}

0 commit comments

Comments
 (0)