Skip to content

Commit 59e3a83

Browse files
committed
Thread safety of partition flags.
Partition flags weren't thread safe, as they could be modified from `MPI_Pready` by another thread while the progress engine was running in another. The new `part_ready` array prevents the progress engine from using flags which may modified in `MPI_Pready`, and vice versa. This fix only adds one array, and doesn't add any atomic operations: performance shouldn't be impacted. The `done_count` value was moved outside `mca_part_persist_request_t` to fully prevent the edge-case were it would go above the number of partitions in the request. Signed-off-by: Keluaa <34173752+Keluaa@users.noreply.github.com>
1 parent efdb534 commit 59e3a83

File tree

2 files changed

+68
-43
lines changed

2 files changed

+68
-43
lines changed

ompi/mca/part/persist/part_persist.h

Lines changed: 60 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ mca_part_persist_free_req(struct mca_part_persist_request_t* req)
105105
if( MCA_PART_PERSIST_REQUEST_PRECV == req->req_type ) {
106106
MCA_PART_PERSIST_PRECV_REQUEST_RETURN(req);
107107
} else {
108+
free(req->part_ready);
108109
MCA_PART_PERSIST_PSEND_REQUEST_RETURN(req);
109110
}
110111
return err;
@@ -261,8 +262,11 @@ mca_part_persist_progress(void)
261262
uint32_t bytes = req->real_count * dt_size;
262263

263264
/* Set up persistent receives */
264-
req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*(req->real_parts));
265-
req->flags = (int*) calloc(req->real_parts,sizeof(int));
265+
req->persist_reqs = (ompi_request_t**) malloc(sizeof(ompi_request_t*)*req->real_parts);
266+
req->flags = (mca_part_persist_partition_state_t*) malloc(sizeof(mca_part_persist_partition_state_t)*req->real_parts);
267+
for(i = 0; i < req->real_parts; i++) {
268+
req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED;
269+
}
266270
if(req->real_dt_size == dt_size) {
267271
for(i = 0; i < req->real_parts; i++) {
268272
void *buf = ((void*) (((char*)req->req_addr) + (bytes * i)));
@@ -290,30 +294,37 @@ mca_part_persist_progress(void)
290294
}
291295
} else {
292296
if(false == req->req_part_complete && REQUEST_COMPLETED != req->req_ompi.req_complete && OMPI_REQUEST_ACTIVE == req->req_ompi.req_state) {
297+
int part_done;
298+
size_t done_count = 0;
293299
for(i = 0; i < req->real_parts; i++) {
300+
/* Letting 'MPI_Pready' change 'req->flags' directly would lead to concurrency
301+
* issues, therefore 'req->part_ready' acts as a proxy for thread safety.
302+
* 'req->part_ready' is only relevant for send requests. */
303+
if(NULL != req->part_ready && 0 == req->part_ready[i]) continue;
304+
294305
/* Check to see if partition is queued for being started. Only applicable to sends. */
295-
if(-2 == req->flags[i]) {
296-
err = req->persist_reqs[i]->req_start(1, (&(req->persist_reqs[i])));
306+
if(MCA_PART_PERSIST_PARTITION_QUEUED == req->flags[i]) {
307+
err = req->persist_reqs[i]->req_start(1, &req->persist_reqs[i]);
297308
if(OMPI_SUCCESS != err) goto end_part_progress;
298-
req->flags[i] = 0;
309+
req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED;
299310
}
300311

301-
if(0 == req->flags[i] && OMPI_REQUEST_ACTIVE == req->persist_reqs[i]->req_state) {
302-
err = ompi_request_test(&(req->persist_reqs[i]), &(req->flags[i]), MPI_STATUS_IGNORE);
312+
if(MCA_PART_PERSIST_PARTITION_STARTED == req->flags[i] && OMPI_REQUEST_ACTIVE == req->persist_reqs[i]->req_state) {
313+
err = ompi_request_test(&(req->persist_reqs[i]), &part_done, MPI_STATUS_IGNORE);
303314
if(OMPI_SUCCESS != err) goto end_part_progress;
304-
if(0 != req->flags[i]) {
305-
req->done_count++;
315+
if(0 != part_done) {
316+
req->flags[i] = MCA_PART_PERSIST_PARTITION_COMPLETE;
306317
}
307318
}
319+
320+
done_count += MCA_PART_PERSIST_PARTITION_COMPLETE == req->flags[i];
308321
}
309322

310323
/* Check for completion and complete the requests */
311-
if(req->done_count == req->real_parts) {
324+
if(done_count == req->real_parts) {
312325
req->first_send = false;
313326
mca_part_persist_complete(req);
314327
completed++;
315-
} else if(req->done_count > req->real_parts) {
316-
ompi_rte_abort(OMPI_ERR_FATAL, "internal part request done count is %d > %d", req->done_count, req->real_parts);
317328
}
318329
}
319330

@@ -375,6 +386,7 @@ mca_part_persist_precv_init(void *buf,
375386
req->first_send = true;
376387
req->flag_post_setup_recv = false;
377388
req->flags = NULL;
389+
req->part_ready = NULL;
378390
/* Non-blocking receive on setup info */
379391
err = MCA_PML_CALL(irecv(&req->setup_info[1], sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, src, tag, comm, &req->setup_req[1]));
380392
if(OMPI_SUCCESS != err) return OMPI_ERROR;
@@ -416,7 +428,7 @@ mca_part_persist_psend_init(const void* buf,
416428
ompi_request_t** request)
417429
{
418430
int err = OMPI_SUCCESS;
419-
size_t dt_size_;
431+
size_t dt_size_, i;
420432
uint32_t dt_size;
421433
mca_part_persist_list_t* new_progress_elem = NULL;
422434
mca_part_persist_psend_request_t *sendreq;
@@ -456,7 +468,12 @@ mca_part_persist_psend_init(const void* buf,
456468
req->real_count = count;
457469
req->setup_info[0].dt_size = dt_size;
458470

459-
req->flags = (int*) calloc(req->real_parts, sizeof(int));
471+
req->flags = (mca_part_persist_partition_state_t*) malloc(sizeof(mca_part_persist_partition_state_t) * req->real_parts);
472+
req->part_ready = (int32_t*) malloc(sizeof(int32_t) * req->real_parts);
473+
for (i = 0; i < req->real_parts; i++) {
474+
req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED;
475+
req->part_ready[i] = 0;
476+
}
460477

461478
err = MCA_PML_CALL(isend(&(req->setup_info[0]), sizeof(struct ompi_mca_persist_setup_t), MPI_BYTE, dst, tag, MCA_PML_BASE_SEND_STANDARD, comm, &req->setup_req[0]));
462479
if(OMPI_SUCCESS != err) return OMPI_ERROR;
@@ -502,21 +519,20 @@ mca_part_persist_start(size_t count, ompi_request_t** requests)
502519
if(false == req->first_send)
503520
{
504521
if(MCA_PART_PERSIST_REQUEST_PSEND == req->req_type) {
505-
req->done_count = 0;
506-
memset((void*)req->flags,0,sizeof(uint32_t)*req->real_parts);
522+
for(i = 0; i < req->real_parts; i++) {
523+
req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED;
524+
req->part_ready[i] = 0;
525+
}
507526
} else {
508-
req->done_count = 0;
509527
err = req->persist_reqs[0]->req_start(req->real_parts, req->persist_reqs);
510-
memset((void*)req->flags,0,sizeof(uint32_t)*req->real_parts);
511-
}
512-
} else {
513-
if(MCA_PART_PERSIST_REQUEST_PSEND == req->req_type) {
514-
req->done_count = 0;
515-
for(i = 0; i < req->real_parts && OMPI_SUCCESS == err; i++) {
516-
req->flags[i] = -1;
528+
for(i = 0; i < req->real_parts; i++) {
529+
req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED;
517530
}
518-
} else {
519-
req->done_count = 0;
531+
}
532+
} else if(MCA_PART_PERSIST_REQUEST_PSEND == req->req_type) {
533+
for(i = 0; i < req->real_parts; i++) {
534+
req->flags[i] = MCA_PART_PERSIST_PARTITION_QUEUED;
535+
req->part_ready[i] = 0;
520536
}
521537
}
522538
req->req_ompi.req_state = OMPI_REQUEST_ACTIVE;
@@ -540,17 +556,17 @@ mca_part_persist_pready(size_t min_part,
540556
size_t i;
541557

542558
mca_part_persist_request_t *req = (mca_part_persist_request_t *)(request);
543-
if(true == req->initialized)
544-
{
545-
err = req->persist_reqs[min_part]->req_start(max_part-min_part+1, (&(req->persist_reqs[min_part])));
546-
for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) {
547-
req->flags[i] = 0; /* Mark partition as ready for testing */
559+
if(MCA_PART_PERSIST_REQUEST_PSEND != req->req_type) {
560+
err = OMPI_ERROR;
561+
} else {
562+
if(true == req->initialized) {
563+
err = req->persist_reqs[min_part]->req_start(max_part-min_part+1, &req->persist_reqs[min_part]);
564+
for(i = min_part; i <= max_part; i++) {
565+
req->flags[i] = MCA_PART_PERSIST_PARTITION_STARTED;
566+
}
548567
}
549-
}
550-
else
551-
{
552-
for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) {
553-
req->flags[i] = -2; /* Mark partition as queued */
568+
for(i = min_part; i <= max_part; i++) {
569+
req->part_ready[i] = 1;
554570
}
555571
}
556572
return err;
@@ -567,23 +583,27 @@ mca_part_persist_parrived(size_t min_part,
567583
int _flag = false;
568584
mca_part_persist_request_t *req = (mca_part_persist_request_t *)request;
569585

570-
if(0 != req->flags) {
586+
if(MCA_PART_PERSIST_REQUEST_PRECV != req->req_type) {
587+
err = OMPI_ERROR;
588+
} else if(OMPI_REQUEST_INACTIVE == req->req_ompi.req_state) {
589+
_flag = 1;
590+
} else if(0 != req->flags) {
571591
_flag = 1;
572592
if(req->req_parts == req->real_parts) {
573593
for(i = min_part; i <= max_part; i++) {
574-
_flag = _flag && req->flags[i];
594+
_flag &= MCA_PART_PERSIST_PARTITION_COMPLETE == req->flags[i];
575595
}
576596
} else {
577597
float convert = ((float)req->real_parts) / ((float)req->req_parts);
578598
size_t _min = floor(convert * min_part);
579599
size_t _max = ceil(convert * max_part);
580600
for(i = _min; i <= _max; i++) {
581-
_flag = _flag && req->flags[i];
601+
_flag &= MCA_PART_PERSIST_PARTITION_COMPLETE == req->flags[i];
582602
}
583603
}
584604
}
585605

586-
if(!_flag) {
606+
if(!_flag && OMPI_SUCCESS == err) {
587607
opal_progress();
588608
}
589609
*flag = _flag;

ompi/mca/part/persist/part_persist_request.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ struct ompi_mca_persist_setup_t {
4444
size_t count;
4545
};
4646

47+
typedef enum {
48+
MCA_PART_PERSIST_PARTITION_STARTED = 0, /* partition request started. The default after initialization, or after MPI_Start for receive requests */
49+
MCA_PART_PERSIST_PARTITION_COMPLETE = 1, /* partition request complete */
50+
MCA_PART_PERSIST_PARTITION_QUEUED = 2 /* next progress loop will start the partition request (send only) */
51+
} mca_part_persist_partition_state_t;
4752

4853
/**
4954
* Base type for PART PERSIST requests
@@ -89,10 +94,10 @@ struct mca_part_persist_request_t {
8994

9095
int32_t initialized; /**< flag for initialized state */
9196
int32_t first_send; /**< flag for whether the first send has happened */
92-
int32_t flag_post_setup_recv;
93-
size_t done_count; /**< counter for the number of partitions marked ready */
97+
int32_t flag_post_setup_recv;
9498

95-
int32_t *flags; /**< array of flags to determine whether a partition has arrived */
99+
mca_part_persist_partition_state_t* flags; /**< the state of each partition */
100+
int32_t *part_ready; /**< readiness flag of each partition, NULL for receive requests */
96101

97102
struct ompi_mca_persist_setup_t setup_info[2]; /**< Setup info to send during initialization. */
98103

0 commit comments

Comments
 (0)