Skip to content

Commit

Permalink
A better MPI_IN_PLACE alltoall algorithm.
Browse files Browse the repository at this point in the history
Provide optimized variant for the homogeneous case.

Signed-off-by: George Bosilca <bosilca@icl.utk.edu>
  • Loading branch information
bosilca committed Sep 3, 2021
1 parent 447b289 commit da90df7
Showing 1 changed file with 87 additions and 74 deletions.
161 changes: 87 additions & 74 deletions ompi/mca/coll/basic/coll_basic_alltoallw.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2016 The University of Tennessee and The University
* Copyright (c) 2004-2021 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
Expand Down Expand Up @@ -36,105 +36,118 @@
#include "ompi/mca/coll/base/coll_tags.h"
#include "ompi/mca/pml/pml.h"


/*
* We want to minimize the amount of temporary memory needed while allowing as many ranks
* to exchange data simultaneously. We use a variation of the ring algorithm, where in a
* single step a process echange the data with both neighbors at distance k (on the left
* and the right on a logical ring topology). With this approach we need to pack the data
* for a single of the two neighbors, as we can then use the original buffer (and datatype
* and count) to send the data to the other.
*/
static int
mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, const int *rdisps,
struct ompi_datatype_t * const *rdtypes,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
{
int i, j, size, rank, err = MPI_SUCCESS;
ompi_request_t *req;
char *save_buffer = NULL;
size_t max_size = 0, packed_size;
int i, size, rank, left, right, err = MPI_SUCCESS;
ompi_request_t *req = MPI_REQUEST_NULL;
char *tmp_buffer = NULL;
size_t max_size = 0, packed_size, msg_size_left, msg_size_right;
opal_convertor_t convertor;

size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);

/* If only one process, we're done. */
if (1 == size) {
if (1 == size) { /* If only one process, we're done. */
return MPI_SUCCESS;
}
rank = ompi_comm_rank(comm);

/* Find the largest amount of packed send/recv data */
for (i = 0, max_size = 0 ; i < size ; ++i) {
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i);

packed_size = opal_datatype_compute_remote_size(&rdtypes[i]->super,
ompi_proc->super.proc_convertor->master->remote_sizes);
packed_size *= rcounts[i];
/* Find the largest amount of packed send/recv data amoing all peers where
* we need to pack before the send.
*/
for (i = 1 ; i <= (size >> 1) ; ++i) {
right = (rank + i) % size;
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right);

if( OPAL_LIKELY(opal_local_arch == ompi_proc->super.proc_convertor->master->remote_arch)) {
opal_datatype_type_size(&rdtypes[right]->super, &packed_size);
} else {
packed_size = opal_datatype_compute_remote_size(&rdtypes[right]->super,
ompi_proc->super.proc_convertor->master->remote_sizes);
}
#else
opal_datatype_type_size(&rdtypes[right]->super, &packed_size);
#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */
packed_size *= rcounts[right];
max_size = packed_size > max_size ? packed_size : max_size;
}

/* Allocate a temporary buffer */
save_buffer = calloc (max_size, 1);
if (NULL == save_buffer) {
tmp_buffer = calloc (max_size, 1);
if (NULL == tmp_buffer) {
return OMPI_ERR_OUT_OF_RESOURCE;
}

/* in-place alltoallw slow algorithm (but works) */
for (i = 0 ; i < size ; ++i) {
size_t msg_size_i;
ompi_datatype_type_size(rdtypes[i], &msg_size_i);
msg_size_i *= rcounts[i];
for (j = i+1 ; j < size ; ++j) {
size_t msg_size_j;
struct iovec iov = {.iov_base = save_buffer, .iov_len = max_size};
uint32_t iov_count = 1;
ompi_datatype_type_size(rdtypes[j], &msg_size_j);
msg_size_j *= rcounts[j];

/* Initiate all send/recv to/from others. */
if (i == rank && msg_size_j != 0) {
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, j);
opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0);
opal_convertor_prepare_for_send(&convertor, &rdtypes[j]->super, rcounts[j],
(char *) rbuf + rdisps[j]);
packed_size = max_size;
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
if (1 != err) { goto error_hndl; }

/* Exchange data with the peer */
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[j], rcounts[j], rdtypes[j],
j, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED,
j, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }
} else if (j == rank && msg_size_i != 0) {
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i);
opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0);
opal_convertor_prepare_for_send(&convertor, &rdtypes[i]->super, rcounts[i],
(char *) rbuf + rdisps[i]);
packed_size = max_size;
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
if (1 != err) { goto error_hndl; }

/* Exchange data with the peer */
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[i], rcounts[i], rdtypes[i],
i, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED,
i, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }
} else {
continue;
}

/* Wait for the requests to complete */
for (i = 1 ; i <= (size >> 1) ; ++i) {
struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size};
uint32_t iov_count = 1;

right = (rank + i) % size;
left = (rank + size - i) % size;

ompi_datatype_type_size(rdtypes[right], &msg_size_right);
msg_size_right *= rcounts[right];

ompi_datatype_type_size(rdtypes[left], &msg_size_left);
msg_size_left *= rcounts[left];

if( 0 != msg_size_right ) { /* nothing to exchange with the peer on the right */
ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right);
opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0);
opal_convertor_prepare_for_send(&convertor, &rdtypes[right]->super, rcounts[right],
(char *) rbuf + rdisps[right]);
packed_size = max_size;
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
if (1 != err) { goto error_hndl; }

/* Receive data from the right */
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[right], rcounts[right], rdtypes[right],
right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }
}

if( (left != right) && (0 != msg_size_left) ) {
/* Send data to the left */
err = MCA_PML_CALL(send ((char *) rbuf + rdisps[left], rcounts[left], rdtypes[left],
left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }

/* Receive data from the left */
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[left], rcounts[left], rdtypes[left],
left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
if (MPI_SUCCESS != err) { goto error_hndl; }
}

if( 0 != msg_size_right ) { /* nothing to exchange with the peer on the right */
/* Send data to the right */
err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED,
right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
comm));
if (MPI_SUCCESS != err) { goto error_hndl; }

err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
if (MPI_SUCCESS != err) { goto error_hndl; }
}
}

error_hndl:
/* Free the temporary buffer */
free (save_buffer);
free (tmp_buffer);

/* All done */

Expand Down

0 comments on commit da90df7

Please sign in to comment.