Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add XGC migration algorithm #637

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions core/src/Cabana_CommunicationPlan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,55 @@ class CommunicationPlan
return counts_and_ids.second;
}

template <class ViewType>
BinningData<device_type>
createFromExportsOnly_XGC( const ViewType& element_export_ranks )
{
// Store the number of export elements.
_num_export_element = element_export_ranks.size();

// Get the size of this communicator.
int comm_size = -1;
MPI_Comm_size( comm(), &comm_size );

// Get the MPI rank we are currently on.
int my_rank = -1;
MPI_Comm_rank( comm(), &my_rank );

// Pick an mpi tag for communication. This object has it's own
// communication space so any mpi tag will do.
const int mpi_tag = 1221;

// Bin the elements based on input keys
const int num_bin = comm_size + 2; // Extra initial bin for particles remaining on rank
// Extra final bin for particles being removed
auto bin_data = Cabana::binByKey( keys, num_bin );

// Copy the bin counts to the host.
auto bin_counts_host = Kokkos::create_mirror_view_and_copy(
Kokkos::HostSpace(), bin_data.binSize );

// Determine exports from bin counts.
_num_export.assign( comm_size );
for(int i=0; i<comm_size; i++)
_num_export[i] = bin_counts_host(i+1);

// Initialize imports
_num_import.assign( comm_size );

// Determine number of imports via all-to-all communication
MPI_Alltoall(_num_export.data(), 1, MPI_UNSIGNED_LONG,
_num_import.data(), 1, MPI_UNSIGNED_LONG, comm);

// Compute the total number of exports.
_total_num_export =
std::accumulate( _num_export.begin(), _num_export.end(), 0 );

// Compute the total number of imports.
_total_num_import =
std::accumulate( _num_import.begin(), _num_import.end(), 0 );
}

/*!
\brief Export rank creator. Use this when you don't know who you will
receiving from - only who you are sending to. This is less efficient
Expand Down
181 changes: 181 additions & 0 deletions core/src/Cabana_Distributor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,131 @@ void distributeData(
Kokkos::Profiling::popRegion();
}


// Synchronously move data between a source and destination AoSoA by executing
// the forward communication plan.
template <class Distributor_t, class AoSoA_t>
void distributeData_XGC(
const Distributor_t& distributor, AoSoA_t& aosoa,
typename std::enable_if<( is_distributor<Distributor_t>::value &&
is_aosoa<AoSoA_t>::value ),
int>::type* = 0 ){
Kokkos::Profiling::pushRegion( "Cabana::migrate" );

// Get the size of this communicator.
int n_ranks = -1;
MPI_Comm_size( distributor.comm(), &n_ranks );

// Get the MPI rank we are currently on.
int my_rank = -1;
MPI_Comm_rank( distributor.comm(), &my_rank );


// Create custom data type. Using MPI_BYTE may risk int overflow and MPI doesnt support long long int.
MPI_Datatype XGC_MPI_PTL;
MPI_Type_contiguous(sizeof( Cabana::Tuple<AoSoA_t::data_type>), MPI_BYTE, &XGC_MPI_PTL);
MPI_Type_commit(&XGC_MPI_PTL);

// Transpose (in place) all vectors particles where at least one particle in the vector is leaving the rank
int transpose_offset = n_staying/AoSoA_t::vector_length; // Mark first vector to transpose
int nvecs_to_transpose = divide_and_round_up(n_staying + n_leaving,AoSoA_t::vector_length) - transpose_offset; // How many vectors to transpose
transpose_particles_from_AoSoA_to_AoS(aosoa, transpose_offset, nvecs_to_transpose);

#ifdef USE_GPU_AWARE_MPI
using MPIDeviceType = AoSoA_t::device_type;
#else
using MPIDeviceType = HostType;
#endif
typename data_type = AoSoA_t::data_type;

// Allocate and fill send buffer
Kokkos::View<data_type*,MPIDeviceType> sendbuf(Kokkos::ViewAllocateWithoutInitializing("sendbuf"),n_leaving);
{
Kokkos::View<data_type*, AoSoA_t::device_type, Kokkos::MemoryTraits<Kokkos::Unmanaged>>
optl((data_type*)aosoa.data() + n_staying, sendbuf.size());
Kokkos::deep_copy(sendbuf, optl);
}

// Declare unmanaged recvbuffer within the AoSoA, starting after the staying particles
Kokkos::View<data_type*,AoSoA_t::device_type, Kokkos::MemoryTraits<Kokkos::Unmanaged>> recvbuf((data_type*)aosoa.data() + n_staying, n_arriving);

// Pass MPI pointer to the buffer
data_type* sendbuf_ptr = sendbuf.data();

#ifdef USE_GPU_AWARE_MPI
// Pass MPI device pointers
data_type* recvbuf_ptr = recvbuf.data();
#else
// Need to use a host mirror of recv buffer for MPI
typename Kokkos::View<data_type*>::HostMirror recvbuf_s = Kokkos::create_mirror_view(recvbuf);

// Pass MPI host pointers
data_type* recvbuf_ptr = recvbuf_s.data();
#endif

// Choose communication ordering
std::vector<int> ordered_pids(n_ranks);
bool use_pairwise_ordering=true;
if(use_pairwise_ordering){
// Match up unique pairs to try to keep communication flowing
int p=1; while ( p < n_ranks ) p*=2; // get next power of 2 above n_ranks
int step = 0;
for(int i=0; i<p; i++){
int pair = i ^ my_rank;
if(pair<n_ranks){
ordered_pids[step] = pair;
step++;
}
}
}else{
// Simply put them in order
for(int i=0; i<n_ranks; i++) ordered_pids[i] = i;
}

// Calculate displacements
std::vector<int> dispExport(n_ranks);
std::vector<int> dispImport(n_ranks);
dispExport[0] = 0;
dispImport[0] = 0;
for(int i = 1; i<n_ranks; i++){
dispExport[i] = dispExport[i-1] + distributor.numExport[i-1];
dispImport[i] = dispImport[i-1] + distributor.numImport[i-1];
}

// Transfer particles
std::vector<MPI_Request> rrequests(n_ranks);
for (int istep=0; istep<n_ranks; istep++){
int i = ordered_pids[istep];
if(distributor.numImport(i)>0){
MPI_Irecv(recvbuf_ptr+dispImport(i), distributor.numImport(i), XGC_MPI_PTL, i, i, distributor.comm(), &(rrequests[i]));
}
}
for (int istep=0; istep<n_ranks; istep++){
int i = ordered_pids[istep];
if(distributor.numExport(i)>0){
MPI_Send(sendbuf_ptr+dispExport(i), distributor.numExport(i), XGC_MPI_PTL, i, my_rank, distributor.comm());
}
}
for (int istep=0; istep<n_ranks; istep++){
int i = ordered_pids[istep];
if(distributor.numImport(i)>0){
MPI_Wait(&(rrequests[i]),MPI_STATUS_IGNORE);
}
}


#ifndef USE_GPU_AWARE_MPI
// Copy back to device
Kokkos::deep_copy(recvbuf, recvbuf_s);
#endif

// Transpose back to AoSoA
nvecs_to_transpose = divide_and_round_up(n_staying + n_arriving,AoSoA_t::vector_length) - transpose_offset; // How many vectors to transpose
transpose_particles_from_AoS_to_AoSoA(local_particles, transpose_offset, nvecs_to_transpose);

Kokkos::Profiling::popRegion();
}

//---------------------------------------------------------------------------//
//! \endcond
} // end namespace Impl
Expand Down Expand Up @@ -398,6 +523,62 @@ void migrate( const Distributor_t& distributor, AoSoA_t& aosoa,
aosoa.resize( distributor.totalNumImport() );
}

/*!
\brief Synchronously migrate data between two different decompositions using
the distributor forward communication plan. Single AoSoA version that will
resize in-place. Note that resizing does not necessarily allocate more
memory. The AoSoA memory will only increase if not enough has already been
reserved/allocated for the needed number of elements.

Migrate moves all data to a new distribution that is uniquely owned - each
element will only have a single destination rank.

\tparam Distributor_t Distributor type - must be a distributor.

\tparam AoSoA_t AoSoA type - must be an AoSoA.

\param distributor The distributor to use for the migration.

\param aosoa The AoSoA containing the data to be migrated. Upon input, must
have the same number of elements as the inputs used to construct the
destributor. At output, it will be the same size as th enumber of import
elements on this rank provided by the distributor. Before using this
function, consider reserving enough memory in the data structure so
reallocating is not necessary.
*/
template <class Distributor_t, class AoSoA_t>
void migrate_XGC( const MPI_Comm& comm, const ViewType& keys, AoSoA_t& aosoa,
typename std::enable_if<( is_distributor<Distributor_t>::value &&
is_aosoa<AoSoA_t>::value ),
int>::type* = 0 )
{
Distributor_t distributor(comm);

// Bin data based on keys and set up distributor
auto bin_data = distributor.createFromExportsOnly_XGC( keys );

// Determine if the source of destination decomposition has more data on
// this rank.
bool dst_is_bigger =
( distributor.totalNumImport() > distributor.exportSize() );

// If the destination decomposition is bigger than the source
// decomposition resize now so we have enough space to do the operation.
if ( dst_is_bigger )
aosoa.resize( distributor.totalNumImport() );

/* Permute the data */
Cabana::permute( bin_data, aosoa );

// Move the data.
Impl::distributeData_XGC( distributor, aosoa );

// If the destination decomposition is smaller than the source
// decomposition resize after we have moved the data.
if ( !dst_is_bigger )
aosoa.resize( distributor.totalNumImport() );
}

//---------------------------------------------------------------------------//
/*!
\brief Synchronously migrate data between two different decompositions using
Expand Down
94 changes: 94 additions & 0 deletions core/src/Cabana_TransposeInPlace.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#ifndef CABANA_TRANSPOSE_IN_PLACE_HPP
#define CABANA_TRANSPOSE_IN_PLACE_HPP

#include <Cabana_AoSoA.hpp>
#include <Kokkos_Core.hpp>

namespace Cabana
{

template<class AoSoA_t>
void transpose_particles_from_AoS_to_AoSoA(AoSoA_t& aosoa, int ioffset, int n_vec){
const std::size_t PTL_N_DBL = sizeof( Cabana::Tuple<AoSoA_t::data_type>)/8;

// Pointer to local particles
VecParticlesSimple<PTL_N_DBL>* vptl = (VecParticlesSimple<PTL_N_DBL>*)aosoa.data() + ioffset;

#ifdef USE_GPU
int team_size = AoSoA_t::vector_length;
#else
int team_size = 1;
#endif
int league_size = n_vec;

typedef Kokkos::View<double[AoSoA_t::vector_length*PTL_N_DBL],ExSpace::scratch_memory_space, Kokkos::MemoryTraits<Kokkos::Unmanaged>> PtlVec;
size_t shmem_size = PtlVec::shmem_size(); // Could halve this, but may worsen memory access
Kokkos::parallel_for("transpose_particles_from_AoS_to_AoSoA",
Kokkos::TeamPolicy<ExSpace> (league_size, team_size).set_scratch_size(KOKKOS_TEAM_SCRATCH_OPT,Kokkos::PerTeam(shmem_size)),
KOKKOS_LAMBDA (Kokkos::TeamPolicy<ExSpace>::member_type team_member){
// Locally shared: global index in population
PtlVec shmem_ptl_vec(team_member.team_scratch(KOKKOS_TEAM_SCRATCH_OPT));

int ivec = team_member.league_rank(); // vector assigned to this team
int ith = team_member.team_rank(); // This thread's rank in the team

// Copy vector to shared memory
for(int idbl = ith; idbl<AoSoA_t::vector_length*PTL_N_DBL; idbl+=team_size){ // Loop through doubles
shmem_ptl_vec(idbl) = vptl[ivec].data[idbl];
}

// Write transposed particles back to particle array
for(int iptl = ith; iptl<AoSoA_t::vector_length; iptl+=team_size){ // Loop through particles
for(int iprop = 0; iprop<PTL_N_DBL; iprop++){ // Loop through properties
vptl[ivec].data[iprop*AoSoA_t::vector_length + iptl] = shmem_ptl_vec(iptl*PTL_N_DBL + iprop);
}
}
});

Kokkos::fence();
}

template<class AoSoA_t>
void transpose_particles_from_AoSoA_to_AoS(AoSoA_t& aosoa, int ioffset, int n_vec){
const std::size_t PTL_N_DBL = sizeof( Cabana::Tuple<AoSoA_t::data_type>)/8;

// Pointer to local particles
VecParticlesSimple<PTL_N_DBL>* vptl = (VecParticlesSimple<PTL_N_DBL>*)aosoa.data() + ioffset;

#ifdef USE_GPU
int team_size = AoSoA_t::vector_length;
#else
int team_size = 1;
#endif
int league_size = n_vec;

typedef Kokkos::View<double[AoSoA_t::vector_length*PTL_N_DBL],ExSpace::scratch_memory_space, Kokkos::MemoryTraits<Kokkos::Unmanaged>> PtlVec;
size_t shmem_size = PtlVec::shmem_size(); // Could halve this, but may worsen memory access
Kokkos::parallel_for("transpose_particles_from_AoS_to_AoSoA",
Kokkos::TeamPolicy<ExSpace> (league_size, team_size).set_scratch_size(KOKKOS_TEAM_SCRATCH_OPT,Kokkos::PerTeam(shmem_size)),
KOKKOS_LAMBDA (Kokkos::TeamPolicy<ExSpace>::member_type team_member){
// Locally shared: global index in population
PtlVec shmem_ptl_vec(team_member.team_scratch(KOKKOS_TEAM_SCRATCH_OPT));

int ivec = team_member.league_rank(); // vector assigned to this team
int ith = team_member.team_rank(); // This thread's rank in the team

// Transpose into shared memory
for(int iptl = ith; iptl<AoSoA_t::vector_length; iptl+=team_size){ // Loop through particles
for(int iprop = 0; iprop<PTL_N_DBL; iprop++){ // Loop through properties
shmem_ptl_vec(iptl*PTL_N_DBL + iprop) = vptl[ivec].data[iprop*AoSoA_t::vector_length + iptl];
}
}

// Copy transposed particles back to particle array
for(int idbl = ith; idbl<AoSoA_t::vector_length*PTL_N_DBL; idbl+=team_size){ // Loop through doubles
vptl[ivec].data[idbl] = shmem_ptl_vec(idbl);
}
});

Kokkos::fence();
}

}

#endif