Skip to content

Commit e7d8cc7

Browse files
committed
osc/ucx: implement rput and rget using ucp_worker_flush_nb
Fallback to the old method of acquiring a request from an atomic operation is preserved. Some networks might provide better performance if the request-based operations do not rely on atomic operations. Some minor fixes to opal_common_ucx_winfo_flush included in this commit. Signed-off-by: Joseph Schuchart <schuchart@icl.utk.edu>
1 parent 3c8b927 commit e7d8cc7

File tree

3 files changed

+83
-27
lines changed

3 files changed

+83
-27
lines changed

ompi/mca/osc/ucx/osc_ucx_comm.c

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1071,21 +1071,25 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
10711071
return ret;
10721072
}
10731073

1074-
ret = opal_common_ucx_wpmem_fence(mem);
1075-
if (ret != OMPI_SUCCESS) {
1076-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1077-
return OMPI_ERROR;
1078-
}
1079-
10801074
mca_osc_ucx_component.num_incomplete_req_ops++;
1081-
/* TODO: investigate whether ucp_worker_flush_nb is a better choice here */
1082-
ret = opal_common_ucx_wpmem_fetch_nb(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD,
1083-
0, target, &(module->req_result),
1084-
sizeof(uint64_t), remote_addr & (~0x7),
1085-
req_completion, ucx_req);
1075+
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req);
1076+
10861077
if (ret != OMPI_SUCCESS) {
1087-
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1088-
return ret;
1078+
/* fallback to using an atomic op to acquire a request handle */
1079+
ret = opal_common_ucx_wpmem_fence(mem);
1080+
if (ret != OMPI_SUCCESS) {
1081+
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1082+
return OMPI_ERROR;
1083+
}
1084+
1085+
ret = opal_common_ucx_wpmem_fetch_nb(mem, UCP_ATOMIC_FETCH_OP_FADD,
1086+
0, target, &(module->req_result),
1087+
sizeof(uint64_t), remote_addr & (~0x7),
1088+
req_completion, ucx_req);
1089+
if (ret != OMPI_SUCCESS) {
1090+
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1091+
return ret;
1092+
}
10891093
}
10901094

10911095
*request = &ucx_req->super;
@@ -1120,21 +1124,25 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
11201124
return ret;
11211125
}
11221126

1123-
ret = opal_common_ucx_wpmem_fence(mem);
1124-
if (ret != OMPI_SUCCESS) {
1125-
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1126-
return OMPI_ERROR;
1127-
}
1128-
11291127
mca_osc_ucx_component.num_incomplete_req_ops++;
1130-
/* TODO: investigate whether ucp_worker_flush_nb is a better choice here */
1131-
ret = opal_common_ucx_wpmem_fetch_nb(module->state_mem, UCP_ATOMIC_FETCH_OP_FADD,
1132-
0, target, &(module->req_result),
1133-
sizeof(uint64_t), remote_addr & (~0x7),
1134-
req_completion, ucx_req);
1128+
ret = opal_common_ucx_wpmem_flush_ep_nb(mem, target, req_completion, ucx_req);
1129+
11351130
if (ret != OMPI_SUCCESS) {
1136-
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1137-
return ret;
1131+
/* fallback to using an atomic op to acquire a request handle */
1132+
ret = opal_common_ucx_wpmem_fence(mem);
1133+
if (ret != OMPI_SUCCESS) {
1134+
OSC_UCX_VERBOSE(1, "opal_common_ucx_mem_fence failed: %d", ret);
1135+
return OMPI_ERROR;
1136+
}
1137+
1138+
ret = opal_common_ucx_wpmem_fetch_nb(mem, UCP_ATOMIC_FETCH_OP_FADD,
1139+
0, target, &(module->req_result),
1140+
sizeof(uint64_t), remote_addr & (~0x7),
1141+
req_completion, ucx_req);
1142+
if (ret != OMPI_SUCCESS) {
1143+
OMPI_OSC_UCX_REQUEST_RETURN(ucx_req);
1144+
return ret;
1145+
}
11381146
}
11391147

11401148
*request = &ucx_req->super;

opal/mca/common/ucx/common_ucx_wpool.c

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, in
761761
((opal_common_ucx_request_t *) req)->winfo = winfo;
762762
}
763763

764-
if (OPAL_COMMON_UCX_FLUSH_B) {
764+
if (OPAL_COMMON_UCX_FLUSH_B == type) {
765765
rc = opal_common_ucx_wait_request_mt(req, "ucp_ep_flush_nb");
766766
} else {
767767
*req_ptr = req;
@@ -818,13 +818,57 @@ OPAL_DECLSPEC int opal_common_ucx_ctx_flush(opal_common_ucx_ctx_t *ctx,
818818
if (rc != OPAL_SUCCESS) {
819819
MCA_COMMON_UCX_ERROR("opal_common_ucx_flush failed: %d", rc);
820820
rc = OPAL_ERROR;
821+
break;
821822
}
822823
}
823824
opal_mutex_unlock(&ctx->mutex);
824825

825826
return rc;
826827
}
827828

829+
830+
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush_ep_nb(opal_common_ucx_wpmem_t *mem,
831+
int target,
832+
opal_common_ucx_user_req_handler_t user_req_cb,
833+
void *user_req_ptr)
834+
{
835+
#if HAVE_DECL_UCP_EP_FLUSH_NB
836+
int rc = OPAL_SUCCESS;
837+
ucp_ep_h ep = NULL;
838+
ucp_rkey_h rkey = NULL;
839+
opal_common_ucx_winfo_t *winfo = NULL;
840+
841+
if (NULL == mem) {
842+
return OPAL_SUCCESS;
843+
}
844+
845+
rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
846+
if (OPAL_UNLIKELY(OPAL_SUCCESS != rc)) {
847+
MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
848+
return rc;
849+
}
850+
851+
opal_mutex_lock(&winfo->mutex);
852+
opal_common_ucx_request_t *req;
853+
req = ucp_worker_flush_nb(winfo->worker, 0, opal_common_ucx_req_completion);
854+
if (UCS_PTR_IS_PTR(req)) {
855+
req->ext_req = user_req_ptr;
856+
req->ext_cb = user_req_cb;
857+
req->winfo = winfo;
858+
} else {
859+
if (user_req_cb != NULL) {
860+
(*user_req_cb)(user_req_ptr);
861+
}
862+
}
863+
opal_mutex_unlock(&winfo->mutex);
864+
return rc;
865+
#else
866+
return OPAL_ERR_NOT_SUPPORTED;
867+
#endif // HAVE_DECL_UCP_EP_FLUSH_NB
868+
869+
}
870+
871+
828872
OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem)
829873
{
830874
ucs_status_t status = UCS_OK;

opal/mca/common/ucx/common_ucx_wpool.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,10 @@ OPAL_DECLSPEC void opal_common_ucx_wpmem_free(opal_common_ucx_wpmem_t *mem);
247247

248248
OPAL_DECLSPEC int opal_common_ucx_ctx_flush(opal_common_ucx_ctx_t *ctx,
249249
opal_common_ucx_flush_scope_t scope, int target);
250+
OPAL_DECLSPEC int opal_common_ucx_wpmem_flush_ep_nb(opal_common_ucx_wpmem_t *mem,
251+
int target,
252+
opal_common_ucx_user_req_handler_t user_req_cb,
253+
void *user_req_ptr);
250254
OPAL_DECLSPEC int opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem);
251255

252256
OPAL_DECLSPEC int opal_common_ucx_winfo_flush(opal_common_ucx_winfo_t *winfo, int target,

0 commit comments

Comments
 (0)