Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1006 #2484

Merged
merged 7 commits into from
Oct 9, 2020
Merged

1006 #2484

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions examples/hello/hdf5Writer/helloHDF5Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ int main(int argc, char *argv[])
* Parameters, Transports, and Execution: Engines */
adios2::IO hdf5IO = adios.DeclareIO("HDFFileIO");
hdf5IO.SetEngine("HDF5");
hdf5IO.SetParameter("IdleH5Writer",
"true"); // set this if not all ranks are writting

/** global array : name, { shape (total) }, { start (local) }, { count
* (local) }, all are constant dimensions */
Expand All @@ -53,12 +55,25 @@ int main(int argc, char *argv[])
/** Engine derived class, spawned to start IO operations */
adios2::Engine hdf5Writer =
hdf5IO.Open("myVector.h5", adios2::Mode::Write);

#ifdef ALL_RANKS_WRITE
// all Ranks must call Put
/** Write variable for buffering */
hdf5Writer.Put<float>(h5Floats, myFloats.data());
hdf5Writer.Put(h5Ints, myInts.data());
hdf5Writer.Put(h5ScalarDouble, &myScalar);

#else
// using collective Begin/EndStep() to run the
// collective HDF5 calls. Now Ranks can skip writting if no data
// presented
hdf5Writer.BeginStep();
if (rank == 0)
{
hdf5Writer.Put<float>(h5Floats, myFloats.data());
hdf5Writer.Put(h5Ints, myInts.data());
hdf5Writer.Put(h5ScalarDouble, &myScalar);
}
hdf5Writer.EndStep();
#endif
std::vector<int64_t> m_globalDims = {10, 20, 30, 40};
hdf5IO.DefineAttribute<std::string>(
"adios2_schema/version_major",
Expand All @@ -79,13 +94,7 @@ int main(int argc, char *argv[])
hdf5IO.DefineAttribute<std::int64_t>("adios2_schema/mesh/dimension-num",
m_globalDims.size());

#ifdef NEVER
/** Create h5 file, engine becomes unreachable after this*/
hdf5Writer.Close();
#else
hdf5Writer.Flush();
hdf5Writer.Flush();
#endif
}
catch (std::invalid_argument &e)
{
Expand Down
41 changes: 7 additions & 34 deletions source/adios2/engine/hdf5/HDF5WriterP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,18 @@ HDF5WriterP::~HDF5WriterP() { DoClose(); }
StepStatus HDF5WriterP::BeginStep(StepMode mode, const float timeoutSeconds)
{
m_IO.m_ReadStreaming = false;
#ifndef RELAY_DEFINE_TO_HDF5 // RELAY_DEFINE_TO_HDF5 = variables in io are
// created at begin_step
#else

// defines variables at this collective call.
// this will ensure all vars are defined in hdf5 for all processors
// (collective requirement) writing a variable is not a collective call
m_H5File.CreateVarsFromIO(m_IO);
#endif

return StepStatus::OK;
}

void HDF5WriterP::EndStep()
{
m_H5File.CleanUpNullVars(m_IO);
m_H5File.Advance();
m_H5File.WriteAttrFromIO(m_IO);
}
Expand All @@ -60,8 +62,6 @@ void HDF5WriterP::Init()
", in call to ADIOS Open or HDF5Writer constructor\n");
}

m_H5File.ParseParameters(m_IO);

if (m_OpenMode == Mode::Append)
{
m_H5File.Append(m_Name, m_Comm);
Expand All @@ -71,6 +71,7 @@ void HDF5WriterP::Init()
else
m_H5File.Init(m_Name, m_Comm, true);

m_H5File.ParseParameters(m_IO); // has to follow m_H5File Init/Append/
/*
// enforce .h5 ending
std::string suffix = ".h5";
Expand Down Expand Up @@ -120,34 +121,6 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
template <class T>
void HDF5WriterP::DoPutSyncCommon(Variable<T> &variable, const T *values)
{

bool isOrderC = helper::IsRowMajor(m_IO.m_HostLanguage);

if (!isOrderC)
{
int ndims = std::max(variable.m_Shape.size(), variable.m_Count.size());

if (ndims > 1)
{
Dims c_shape(ndims), c_offset(ndims), c_count(ndims);
for (int i = 0; i < ndims; i++)
{
c_shape[i] = variable.m_Shape[ndims - i - 1];
c_offset[i] = variable.m_Start[ndims - i - 1];
c_count[i] = variable.m_Count[ndims - i - 1];
}

Variable<T> dup = Variable<T>(variable.m_Name, c_shape, c_offset,
c_count, variable.IsConstantDims());

/*
* duplicate var attributes and convert to c order before saving.
*/
dup.SetData(values);
m_H5File.Write(dup, values);
return;
}
}
variable.SetData(values);
m_H5File.Write(variable, values);
}
Expand Down
148 changes: 142 additions & 6 deletions source/adios2/toolkit/interop/hdf5/HDF5Common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const std::string HDF5Common::PREFIX_STAT = "ADIOS_STAT_";
const std::string HDF5Common::PARAMETER_COLLECTIVE = "H5CollectiveMPIO";
const std::string HDF5Common::PARAMETER_CHUNK_FLAG = "H5ChunkDim";
const std::string HDF5Common::PARAMETER_CHUNK_VARS = "H5ChunkVars";
const std::string HDF5Common::PARAMETER_HAS_IDLE_WRITER_RANK = "IdleH5Writer";

/*
//need to know ndim before defining this.
Expand Down Expand Up @@ -90,12 +91,21 @@ void HDF5Common::ParseParameters(core::IO &io)
{
if (m_MPI)
{
m_MPI->set_dxpl_mpio(m_PropertyTxfID,
H5FD_MPIO_INDEPENDENT); // explicit
auto itKey = io.m_Parameters.find(PARAMETER_COLLECTIVE);
if (itKey != io.m_Parameters.end())
{
if (itKey->second == "yes" || itKey->second == "true")
m_MPI->set_dxpl_mpio(m_PropertyTxfID, H5FD_MPIO_COLLECTIVE);
}

itKey = io.m_Parameters.find(PARAMETER_HAS_IDLE_WRITER_RANK);
if (itKey != io.m_Parameters.end())
{
if (itKey->second == "yes" || itKey->second == "true")
m_IdleWriterOn = true;
}
}

m_ChunkVarNames.clear();
Expand Down Expand Up @@ -133,6 +143,8 @@ void HDF5Common::ParseParameters(core::IO &io)
m_ChunkVarNames.insert(token);
}
}

m_OrderByC = helper::IsRowMajor(io.m_HostLanguage);
}

void HDF5Common::Append(const std::string &name, helper::Comm const &comm)
Expand Down Expand Up @@ -694,6 +706,46 @@ void HDF5Common::SetAdiosStep(int step)
m_CurrentAdiosStep = step;
}

//
// This function is intend to pair with CreateVarsFromIO().
//
// Because of the collective call requirement, we creata
// all variables through CreateVarFromIO() at BeginStep().
// At EndStep(), this function is called to remove unwritten variables
// to comply with ADIOS custom that define all vars, use a few per step
// and only see these few at the step.
//
// note that this works with 1 rank currently.
// need to find a general way in parallel HDF5 to
// detect whether a dataset called H5Dwrite or not
//
void HDF5Common::CleanUpNullVars(core::IO &io)
{
if (!m_WriteMode)
return;

if (m_CommSize != 1) // because the H5D_storage_size > 0 after H5Dcreate
// when there are multiple processors
return;

const core::VarMap &variables = io.GetVariables();
for (const auto &vpair : variables)
{
const std::string &varName = vpair.first;
const DataType varType = vpair.second->m_Type;
#define declare_template_instantiation(T) \
if (varType == helper::GetDataType<T>()) \
{ \
core::Variable<T> *v = io.InquireVariable<T>(varName); \
if (!v) \
return; \
RemoveEmptyDataset(varName); \
}
ADIOS2_FOREACH_STDTYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation
}
}

void HDF5Common::Advance()
{
if (m_WriteMode)
Expand Down Expand Up @@ -847,6 +899,9 @@ void HDF5Common::CreateDataset(const std::string &varName, hid_t h5Type,
dsetID = H5Dopen(topId, list.back().c_str(), H5P_DEFAULT);

datasetChain.push_back(dsetID);

hid_t dspace = H5Dget_space(dsetID);
const int ndims = H5Sget_simple_extent_ndims(dspace);
// return dsetID;
}

Expand Down Expand Up @@ -915,9 +970,17 @@ bool HDF5Common::OpenDataset(const std::string &varName,

if (list.size() == 1)
{
hid_t dsetID = H5Dopen(m_GroupId, list[0].c_str(), H5P_DEFAULT);
datasetChain.push_back(dsetID);
return true;
if (H5Lexists(m_GroupId, list[0].c_str(), H5P_DEFAULT) == 0)
{
datasetChain.push_back(-1);
return false;
}
else
{
hid_t dsetID = H5Dopen(m_GroupId, list[0].c_str(), H5P_DEFAULT);
datasetChain.push_back(dsetID);
return true;
}
}

hid_t topId = m_GroupId;
Expand Down Expand Up @@ -945,6 +1008,74 @@ bool HDF5Common::OpenDataset(const std::string &varName,
return true;
}

//
// We use H5Dget_storage_size to see whether H5Dwrite has been called.
// and looks like when there are multiple processors, H5Dcreate causes
// storage allcoation already. So we limit this function when there is only
// one rank.
//
void HDF5Common::RemoveEmptyDataset(const std::string &varName)
{
if (m_CommSize > 1)
return;

std::vector<std::string> list;
char delimiter = '/';
int delimiterLength = 1;
std::string s = std::string(varName);
size_t pos = 0;
std::string token;
while ((pos = s.find(delimiter)) != std::string::npos)
{
if (pos > 0)
{ // "///a/b/c" == "a/b/c"
token = s.substr(0, pos);
list.push_back(token);
}
s.erase(0, pos + delimiterLength);
}
list.push_back(s);

if (list.size() == 1)
{
if (H5Lexists(m_GroupId, list[0].c_str(), H5P_DEFAULT) != 0)
{
hid_t dsetID = H5Dopen(m_GroupId, list[0].c_str(), H5P_DEFAULT);
HDF5TypeGuard d(dsetID, E_H5_DATASET);

H5D_space_status_t status;
herr_t s1 = H5Dget_space_status(dsetID, &status);

if (0 == H5Dget_storage_size(dsetID)) /*nothing is written */
H5Ldelete(m_GroupId, list[0].c_str(), H5P_DEFAULT);
}
return;
}

hid_t topId = m_GroupId;
std::vector<hid_t> datasetChain;

for (int i = 0; i < list.size() - 1; i++)
{
if (H5Lexists(topId, list[i].c_str(), H5P_DEFAULT) == 0)
break;
else
topId = H5Gopen(topId, list[i].c_str(), H5P_DEFAULT);

datasetChain.push_back(topId);
}
hid_t dsetID = H5Dopen(topId, list.back().c_str(), H5P_DEFAULT);
datasetChain.push_back(dsetID);

HDF5DatasetGuard g(datasetChain);

if (H5Lexists(topId, list.back().c_str(), H5P_DEFAULT) != 0)
{
if (0 == H5Dget_storage_size(dsetID)) // nothing is written
H5Ldelete(topId, list.back().c_str(), H5P_DEFAULT);
}
}

// trim from right
inline std::string &rtrim(std::string &s, const char *t = " \t\n\r\f\v")
{
Expand Down Expand Up @@ -1246,7 +1377,14 @@ void HDF5Common::LocateAttrParent(const std::string &attrName,
//
void HDF5Common::CreateVarsFromIO(core::IO &io)
{
CheckWriteGroup();
if (!m_WriteMode)
return;

CheckWriteGroup(); // making sure all processors are creating new step

if (!m_IdleWriterOn)
return;

const core::VarMap &variables = io.GetVariables();
for (const auto &vpair : variables)
{
Expand Down Expand Up @@ -1335,8 +1473,6 @@ void HDF5Common::WriteAttrFromIO(core::IO &io)
ADIOS2_FOREACH_ATTRIBUTE_STDTYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation
}

// std::string attrType = attributesInfo[attrName]["Type"];
}

//
Expand Down
9 changes: 8 additions & 1 deletion source/adios2/toolkit/interop/hdf5/HDF5Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class HDF5Common
static const std::string PARAMETER_COLLECTIVE;
static const std::string PARAMETER_CHUNK_FLAG;
static const std::string PARAMETER_CHUNK_VARS;
static const std::string PARAMETER_HAS_IDLE_WRITER_RANK;

void ParseParameters(core::IO &io);
void Init(const std::string &name, helper::Comm const &comm, bool toWrite);
Expand All @@ -144,7 +145,7 @@ class HDF5Common
void CreateDataset(const std::string &varName, hid_t h5Type,
hid_t filespaceID, std::vector<hid_t> &chain);
bool OpenDataset(const std::string &varName, std::vector<hid_t> &chain);

void RemoveEmptyDataset(const std::string &varName);
void StoreADIOSName(const std::string adiosName, hid_t dsetID);
void ReadADIOSName(hid_t dsetID, std::string &adiosName);

Expand All @@ -163,6 +164,7 @@ class HDF5Common
* required by HDF5
*/
void CreateVarsFromIO(core::IO &io);
void CleanUpNullVars(core::IO &io);

void WriteAttrFromIO(core::IO &io);
void ReadAttrToIO(core::IO &io);
Expand Down Expand Up @@ -257,6 +259,11 @@ class HDF5Common
hid_t m_ChunkPID;
int m_ChunkDim;
std::set<std::string> m_ChunkVarNames;
bool m_OrderByC = true; // C or fortran

// Some write rank can be idle. This causes conflict with HDF5 collective
// requirement in functions Guard this by load vars in beginStep
bool m_IdleWriterOn = false;
};

// Explicit declaration of the public template methods
Expand Down
12 changes: 12 additions & 0 deletions source/adios2/toolkit/interop/hdf5/HDF5Common.tcc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@ void HDF5Common::GetHDF5SpaceSpec(const core::Variable<T> &variable,
offset.push_back(0);
}
}

if (dimSize <= 1)
return;
if (m_OrderByC)
return;

for (int i = 0; i < dimSize / 2; ++i)
{
std::swap(dimsf[i], dimsf[dimSize - 1 - i]);
std::swap(count[i], count[dimSize - 1 - i]);
std::swap(offset[i], offset[dimSize - 1 - i]);
}
}

template <class T>
Expand Down