Skip to content

Commit fc26d9c

Browse files
committed
Merge pull request #1734 from hjelmn/progress_threading
opal/progress: make progress function registration mt safe
2 parents b001184 + 2fad3b9 commit fc26d9c

File tree

1 file changed

+111
-75
lines changed

1 file changed

+111
-75
lines changed

opal/runtime/opal_progress.c

Lines changed: 111 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ int opal_progress_spin_count = 10000;
5555
static opal_atomic_lock_t progress_lock;
5656

5757
/* callbacks to progress */
58-
static opal_progress_callback_t *callbacks = NULL;
58+
static volatile opal_progress_callback_t *callbacks = NULL;
5959
static size_t callbacks_len = 0;
6060
static size_t callbacks_size = 0;
6161

62-
static opal_progress_callback_t *callbacks_lp = NULL;
62+
static volatile opal_progress_callback_t *callbacks_lp = NULL;
6363
static size_t callbacks_lp_len = 0;
6464
static size_t callbacks_lp_size = 0;
6565

@@ -93,6 +93,9 @@ static int debug_output = -1;
9393
*/
9494
static int fake_cb(void) { return 0; }
9595

96+
static int _opal_progress_unregister (opal_progress_callback_t cb, opal_progress_callback_t *callback_array,
97+
size_t *callback_array_len);
98+
9699
/* init the progress engine - called from orte_init */
97100
int
98101
opal_progress_init(void)
@@ -109,6 +112,27 @@ opal_progress_init(void)
109112
}
110113
#endif
111114

115+
callbacks_size = callbacks_lp_size = 8;
116+
117+
callbacks = malloc (callbacks_size * sizeof (callbacks[0]));
118+
callbacks_lp = malloc (callbacks_lp_size * sizeof (callbacks_lp[0]));
119+
120+
if (NULL == callbacks || NULL == callbacks_lp) {
121+
free (callbacks);
122+
free (callbacks_lp);
123+
callbacks_size = callbacks_lp_size = 0;
124+
callbacks = callbacks_lp = NULL;
125+
return OPAL_ERR_OUT_OF_RESOURCE;
126+
}
127+
128+
for (size_t i = 0 ; i < callbacks_size ; ++i) {
129+
callbacks[i] = fake_cb;
130+
}
131+
132+
for (size_t i = 0 ; i < callbacks_lp_size ; ++i) {
133+
callbacks_lp[i] = fake_cb;
134+
}
135+
112136
OPAL_OUTPUT((debug_output, "progress: initialized event flag to: %x",
113137
opal_progress_event_flag));
114138
OPAL_OUTPUT((debug_output, "progress: initialized yield_when_idle to: %s",
@@ -130,10 +154,13 @@ opal_progress_finalize(void)
130154

131155
callbacks_len = 0;
132156
callbacks_size = 0;
133-
if (NULL != callbacks) {
134-
free(callbacks);
135-
callbacks = NULL;
136-
}
157+
free(callbacks);
158+
callbacks = NULL;
159+
160+
callbacks_lp_len = 0;
161+
callbacks_lp_size = 0;
162+
free(callbacks_lp);
163+
callbacks_lp = NULL;
137164

138165
opal_atomic_unlock(&progress_lock);
139166

@@ -322,38 +349,73 @@ opal_progress_set_event_poll_rate(int polltime)
322349
#endif
323350
}
324351

352+
static int opal_progress_find_cb (opal_progress_callback_t cb, opal_progress_callback_t *cbs,
353+
size_t cbs_len)
354+
{
355+
for (size_t i = 0 ; i < cbs_len ; ++i) {
356+
if (cbs[i] == cb) {
357+
return (int) i;
358+
}
359+
}
325360

326-
int
327-
opal_progress_register(opal_progress_callback_t cb)
361+
return OPAL_ERR_NOT_FOUND;
362+
}
363+
364+
static int _opal_progress_register (opal_progress_callback_t cb, opal_progress_callback_t **cbs,
365+
size_t *cbs_size, size_t *cbs_len)
328366
{
329367
int ret = OPAL_SUCCESS;
330-
size_t index;
331368

332-
/* just in case there is a low-priority callback remove it */
333-
(void) opal_progress_unregister (cb);
334-
335-
opal_atomic_lock(&progress_lock);
369+
if (OPAL_ERR_NOT_FOUND != opal_progress_find_cb (cb, *cbs, *cbs_len)) {
370+
return OPAL_SUCCESS;
371+
}
336372

337373
/* see if we need to allocate more space */
338-
if (callbacks_len + 1 > callbacks_size) {
339-
opal_progress_callback_t *tmp;
340-
tmp = (opal_progress_callback_t*)realloc(callbacks, sizeof(opal_progress_callback_t) * (callbacks_size + 4));
374+
if (*cbs_len + 1 > *cbs_size) {
375+
opal_progress_callback_t *tmp, *old;
376+
377+
tmp = (opal_progress_callback_t *) malloc (sizeof (tmp[0]) * 2 * *cbs_size);
341378
if (tmp == NULL) {
342-
ret = OPAL_ERR_TEMP_OUT_OF_RESOURCE;
343-
goto cleanup;
379+
return OPAL_ERR_TEMP_OUT_OF_RESOURCE;
344380
}
345-
/* registering fake callbacks to fill callbacks[] */
346-
for( index = callbacks_len + 1 ; index < callbacks_size + 4 ; index++) {
347-
tmp[index] = &fake_cb;
381+
382+
if (*cbs) {
383+
/* copy old callbacks */
384+
memcpy (tmp, *cbs, sizeof(tmp[0]) * *cbs_size);
348385
}
349386

350-
callbacks = tmp;
351-
callbacks_size += 4;
387+
for (size_t i = *cbs_len ; i < 2 * *cbs_size ; ++i) {
388+
tmp[i] = fake_cb;
389+
}
390+
391+
opal_atomic_wmb ();
392+
393+
/* swap out callback array */
394+
old = opal_atomic_swap_ptr (cbs, tmp);
395+
396+
opal_atomic_wmb ();
397+
398+
free (old);
399+
*cbs_size *= 2;
352400
}
353401

354-
callbacks[callbacks_len++] = cb;
402+
cbs[0][*cbs_len] = cb;
403+
++*cbs_len;
404+
405+
opal_atomic_wmb ();
406+
407+
return ret;
408+
}
409+
410+
int opal_progress_register (opal_progress_callback_t cb)
411+
{
412+
int ret;
413+
414+
opal_atomic_lock(&progress_lock);
415+
416+
(void) _opal_progress_unregister (cb, callbacks_lp, &callbacks_lp_len);
355417

356-
cleanup:
418+
ret = _opal_progress_register (cb, &callbacks, &callbacks_size, &callbacks_len);
357419

358420
opal_atomic_unlock(&progress_lock);
359421

@@ -362,84 +424,58 @@ opal_progress_register(opal_progress_callback_t cb)
362424

363425
int opal_progress_register_lp (opal_progress_callback_t cb)
364426
{
365-
int ret = OPAL_SUCCESS;
366-
size_t index;
367-
368-
/* just in case there is a high-priority callback remove it */
369-
(void) opal_progress_unregister (cb);
427+
int ret;
370428

371429
opal_atomic_lock(&progress_lock);
372430

373-
/* see if we need to allocate more space */
374-
if (callbacks_lp_len + 1 > callbacks_lp_size) {
375-
opal_progress_callback_t *tmp;
376-
tmp = (opal_progress_callback_t*)realloc(callbacks_lp, sizeof(opal_progress_callback_t) * (callbacks_lp_size + 4));
377-
if (tmp == NULL) {
378-
ret = OPAL_ERR_TEMP_OUT_OF_RESOURCE;
379-
goto cleanup;
380-
}
381-
/* registering fake callbacks_lp to fill callbacks_lp[] */
382-
for( index = callbacks_lp_len + 1 ; index < callbacks_lp_size + 4 ; index++) {
383-
tmp[index] = &fake_cb;
384-
}
431+
(void) _opal_progress_unregister (cb, callbacks, &callbacks_len);
385432

386-
callbacks_lp = tmp;
387-
callbacks_lp_size += 4;
388-
}
389-
390-
callbacks_lp[callbacks_lp_len++] = cb;
391-
392-
cleanup:
433+
ret = _opal_progress_register (cb, &callbacks_lp, &callbacks_lp_size, &callbacks_lp_len);
393434

394435
opal_atomic_unlock(&progress_lock);
395436

396437
return ret;
397438
}
398439

399440
static int _opal_progress_unregister (opal_progress_callback_t cb, opal_progress_callback_t *callback_array,
400-
size_t callback_array_len)
441+
size_t *callback_array_len)
401442
{
402-
size_t i;
403-
int ret = OPAL_ERR_NOT_FOUND;
404-
405-
opal_atomic_lock(&progress_lock);
406-
407-
for (i = 0 ; i < callback_array_len ; ++i) {
408-
if (cb == callback_array[i]) {
409-
callback_array[i] = &fake_cb;
410-
ret = OPAL_SUCCESS;
411-
break;
412-
}
443+
int ret = opal_progress_find_cb (cb, callback_array, *callback_array_len);
444+
if (OPAL_ERR_NOT_FOUND == ret) {
445+
return ret;
413446
}
414447

415448
/* If we found the function we're unregistering: If callbacks_len
416449
is 0, we're not goig to do anything interesting anyway, so
417450
skip. If callbacks_len is 1, it will soon be 0, so no need to
418-
do any repacking. size_t can be unsigned, so 0 - 1 is bad for
419-
a loop condition :). */
420-
if (OPAL_SUCCESS == ret) {
421-
if (i < callback_array_len - 1) {
422-
memmove (callback_array + i, callback_array + i + 1,
423-
(callback_array_len - i - 1) * sizeof (callback_array[0]));
424-
}
425-
426-
callback_array[callback_array_len - 1] = &fake_cb;
427-
callback_array_len--;
451+
do any repacking. */
452+
for (size_t i = (size_t) ret ; i < *callback_array_len - 1 ; ++i) {
453+
/* copy callbacks atomically since another thread may be in
454+
* opal_progress(). */
455+
(void) opal_atomic_swap_ptr (callback_array + i, callback_array[i+1]);
428456
}
429457

430-
opal_atomic_unlock(&progress_lock);
458+
callback_array[*callback_array_len] = fake_cb;
459+
--*callback_array_len;
431460

432-
return ret;
461+
return OPAL_SUCCESS;
433462
}
434463

435464
int opal_progress_unregister (opal_progress_callback_t cb)
436465
{
437-
int ret = _opal_progress_unregister (cb, callbacks, callbacks_len);
466+
int ret;
467+
468+
opal_atomic_lock(&progress_lock);
469+
470+
ret = _opal_progress_unregister (cb, callbacks, &callbacks_len);
471+
438472
if (OPAL_SUCCESS != ret) {
439473
/* if not in the high-priority array try to remove from the lp array.
440474
* a callback will never be in both. */
441-
return _opal_progress_unregister (cb, callbacks_lp, callbacks_lp_len);
475+
ret = _opal_progress_unregister (cb, callbacks_lp, &callbacks_lp_len);
442476
}
443477

478+
opal_atomic_unlock(&progress_lock);
479+
444480
return ret;
445481
}

0 commit comments

Comments
 (0)