Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#410: Dependent Epochs rewritten #2204

Open
wants to merge 20 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ccd5afd
#410: epoch: change unused InsertEpoch to DependentEpoch
lifflander Jun 17, 2019
b393469
#410: epoch: add function to bit-combine epoch category bits
lifflander Jun 19, 2019
9a1efe8
#410: termination: add isDep check
lifflander Sep 28, 2023
72fbfec
#410: term: implement dependent epochs
lifflander Jun 20, 2019
5cd8599
#410: test: add release dependent epoch test
lifflander Jun 20, 2019
10147f4
#410: reduce: fix warning
lifflander Oct 12, 2023
1db25fa
#410: epoch: add test, move pending epochs to scheduler
lifflander Oct 12, 2023
a9816c6
#410: epoch: rework deps, objgroup dep epochs, scheduler buffers
lifflander Oct 16, 2023
4de8a74
#410: objgroup: implement objgroup proxy functions for dependent epochs
lifflander Oct 17, 2023
bfdc3f4
#410: collection: add dependent epochs to collections, system message…
lifflander Oct 18, 2023
3c1ac7f
#410: test: add new test for dep epochs and collections
lifflander Aug 16, 2019
cdf69ab
#410: collection: add missing header include
lifflander Oct 18, 2023
9764708
#410: tests: cleanup tests, fix name collison
lifflander Oct 18, 2023
a974aba
#410: tests: fix license
lifflander Oct 18, 2023
3e76af5
#410: tests: fix some small compilation errors
lifflander Oct 18, 2023
3d2b117
#410: collection: switch broadcast after system broadcast to user msg
lifflander Oct 18, 2023
f746269
#410: collection: fix missing system message designation
lifflander Oct 19, 2023
cb2b519
#410: tests: rewrite dep epoch test to fix logic error
lifflander Oct 19, 2023
4b3a9c6
#410: termination: remove unneeded code, cleanup scheduler
lifflander Oct 31, 2023
48f7a8c
#410: termination: cleanup more code---remove unecessary condition
lifflander Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion src/vt/epoch/epoch_manip.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,29 @@ EpochWindow* EpochManip::getTerminatedWindow(EpochType epoch) {
return BitPackerType::boolGetField<field,size,ImplType>(*epoch);
}

/*static*/ bool EpochManip::isDS(EpochType epoch) {
using T = typename std::underlying_type<epoch::eEpochCategory>::type;
if (isRooted(epoch)) {
auto const ds_bit = epoch::eEpochCategory::DijkstraScholtenEpoch;
auto const cat = category(epoch);
bool const is_ds = static_cast<T>(cat) & static_cast<T>(ds_bit);
return is_ds;
} else {
return false;
}
}

/*static*/ bool EpochManip::isDep(EpochType epoch) {
using T = typename std::underlying_type<epoch::eEpochCategory>::type;
if (epoch == no_epoch or epoch == term::any_epoch_sentinel) {
return false;
}
auto const dep_bit = epoch::eEpochCategory::DependentEpoch;
auto const cat = epoch::EpochManip::category(epoch);
bool const is_dep = static_cast<T>(cat) & static_cast<T>(dep_bit);
return is_dep;
}

/*static*/ eEpochCategory EpochManip::category(EpochType const& epoch) {
return BitPackerType::getField<
eEpochRoot::rEpochCategory, epoch_category_num_bits, eEpochCategory
Expand Down Expand Up @@ -190,7 +213,7 @@ void EpochManip::setCategory(EpochType& epoch, eEpochCategory const cat) {
>(*epoch,cat);
}

/*static*/ inline eEpochCategory EpochManip::makeCat(
/*static*/ eEpochCategory EpochManip::makeCat(
eEpochCategory c1, eEpochCategory c2
) {
using T = typename std::underlying_type<eEpochCategory>::type;
Expand Down
18 changes: 18 additions & 0 deletions src/vt/epoch/epoch_manip.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,24 @@ struct EpochManip : runtime::component::Component<EpochManip> {
*/
static bool isRooted(EpochType const& epoch);

/**
* \brief Gets whether an epoch is DS or onot
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo.

*
* \param[in] epoch the epoch
*
* \return whether it is DS
*/
static bool isDS(EpochType epoch);

/**
* \brief Gets whether an epoch is dependent or onot
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo.

*
* \param[in] epoch the epoch
*
* \return whether it is dependent
*/
static bool isDep(EpochType epoch);

/**
* \brief Gets the \c eEpochCategory of a given epoch
*
Expand Down
29 changes: 24 additions & 5 deletions src/vt/messaging/active.cc
Original file line number Diff line number Diff line change
Expand Up @@ -934,12 +934,13 @@ void ActiveMessenger::prepareActiveMsgToRun(
using MsgType = ShortMessage;
auto msg = base.to<MsgType>().get();

auto const is_term = envelopeIsTerm(msg->env);
auto const is_bcast = envelopeIsBcast(msg->env);
auto const dest = envelopeGetDest(msg->env);
auto const handler = envelopeGetHandler(msg->env);
auto const epoch = envelopeIsEpochType(msg->env) ?
auto const is_term = envelopeIsTerm(msg->env);
auto const is_bcast = envelopeIsBcast(msg->env);
auto const dest = envelopeGetDest(msg->env);
auto const handler = envelopeGetHandler(msg->env);
auto const epoch = envelopeIsEpochType(msg->env) ?
envelopeGetEpoch(msg->env) : term::any_epoch_sentinel;

auto const from_node = is_bcast ? dest : in_from_node;

if (!is_term || vt_check_enabled(print_term_msgs)) {
Expand All @@ -963,6 +964,13 @@ void ActiveMessenger::prepareActiveMsgToRun(
if (is_obj) {
objgroup::dispatchObjGroup(base, handler, from_node, cont);
} else {
if (epoch != term::any_epoch_sentinel and epoch::EpochManip::isDep(epoch)) {
PhilMiller marked this conversation as resolved.
Show resolved Hide resolved
if (not theTerm()->epochReleased(epoch)) {
pending_epoch_msgs_[epoch].emplace_back(base, from_node);
lifflander marked this conversation as resolved.
Show resolved Hide resolved
return;
}
}

runnable::makeRunnable(base, not is_term, handler, from_node)
.withContinuation(cont)
.withTDEpochFromMsg(is_term)
Expand All @@ -981,6 +989,17 @@ void ActiveMessenger::prepareActiveMsgToRun(
}
}

void ActiveMessenger::releaseEpochMsgs(EpochType epoch) {
auto iter = pending_epoch_msgs_.find(epoch);
if (iter != pending_epoch_msgs_.end()) {
auto msgs = std::move(iter->second);
pending_epoch_msgs_.erase(iter);
for (auto&& m : msgs) {
prepareActiveMsgToRun(m.buffered_msg, m.from_node, true, m.cont);
}
}
}

bool ActiveMessenger::tryProcessIncomingActiveMsg() {
CountType num_probe_bytes;
MPI_Status stat;
Expand Down
12 changes: 11 additions & 1 deletion src/vt/messaging/active.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ struct BufferedActiveMsg {

BufferedActiveMsg(
MessageType const& in_buffered_msg, NodeType const& in_from_node,
ActionType in_cont
ActionType in_cont = nullptr
) : buffered_msg(in_buffered_msg), from_node(in_from_node), cont(in_cont)
{ }

Expand Down Expand Up @@ -327,6 +327,8 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
using SendFnType = std::function<SendInfo(PtrLenPairType,NodeType,TagType)>;
using UserSendFnType = std::function<void(SendFnType)>;
using ContainerPendingType = std::unordered_map<TagType,PendingRecvType>;
using MsgContType = std::list<BufferedMsgType>;
PhilMiller marked this conversation as resolved.
Show resolved Hide resolved
using EpochWaitType = std::unordered_map<EpochType, MsgContType>;
using ReadyHanTagType = std::tuple<HandlerType, TagType>;
using HandlerManagerType = HandlerManager;
using PendingSendType = PendingSend;
Expand Down Expand Up @@ -1650,6 +1652,13 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
# endif
}

/*
* \brief Deliver messages that are now released with a dependent epoch
*
* \param[in] epoch the epoch to release
*/
void releaseEpochMsgs(EpochType epoch);

private:
/**
* \internal \brief Allocate a new, unused tag.
Expand Down Expand Up @@ -1764,6 +1773,7 @@ struct ActiveMessenger : runtime::component::PollableComponent<ActiveMessenger>
elm::ElementIDStruct bare_handler_dummy_elm_id_for_lb_data_ = {};
elm::ElementLBData bare_handler_lb_data_;
MPI_Comm comm_ = MPI_COMM_NULL;
EpochWaitType pending_epoch_msgs_ = {};
};

}} // end namespace vt::messaging
Expand Down
158 changes: 140 additions & 18 deletions src/vt/termination/termination.cc
Original file line number Diff line number Diff line change
Expand Up @@ -931,10 +931,12 @@ void TerminationDetector::finishedEpoch(EpochType const& epoch) {
}

EpochType TerminationDetector::makeEpochRootedWave(
ParentEpochCapture successor, std::string const& label
ParentEpochCapture successor, std::string const& label, bool is_dep
) {
auto const no_cat = epoch::eEpochCategory::NoCategoryEpoch;
auto const epoch = theEpoch()->getNextRootedEpoch(no_cat);
auto const cat = is_dep ?
epoch::eEpochCategory::DependentEpoch :
epoch::eEpochCategory::NoCategoryEpoch;
auto const epoch = theEpoch()->getNextRootedEpoch(cat);
initializeRootedWaveEpoch(epoch, successor, label);
return epoch;

Expand Down Expand Up @@ -970,10 +972,16 @@ void TerminationDetector::initializeRootedWaveEpoch(
}

EpochType TerminationDetector::makeEpochRootedDS(
ParentEpochCapture successor, std::string const& label
ParentEpochCapture successor, std::string const& label, bool is_dep
) {
auto const ds_cat = epoch::eEpochCategory::DijkstraScholtenEpoch;
auto const epoch = theEpoch()->getNextRootedEpoch(ds_cat);
auto cat = epoch::eEpochCategory::DijkstraScholtenEpoch;
if (is_dep) {
cat = theEpoch()->makeCat(
epoch::eEpochCategory::DependentEpoch,
epoch::eEpochCategory::DijkstraScholtenEpoch
);
}
auto const epoch = theEpoch()->getNextRootedEpoch(cat);
initializeRootedDSEpoch(epoch, successor, label);
return epoch;
}
Expand Down Expand Up @@ -1002,13 +1010,14 @@ void TerminationDetector::initializeRootedDSEpoch(
}

EpochType TerminationDetector::makeEpochRooted(
UseDS use_ds, ParentEpochCapture successor
UseDS use_ds, ParentEpochCapture successor, bool is_dep
) {
return makeEpochRooted("", use_ds, successor);
return makeEpochRooted("", use_ds, successor, is_dep);
}

EpochType TerminationDetector::makeEpochRooted(
std::string const& label, UseDS use_ds, ParentEpochCapture successor
std::string const& label, UseDS use_ds, ParentEpochCapture successor,
bool is_dep
) {
/*
* This method should only be called by the root node for the rooted epoch
Expand All @@ -1029,9 +1038,9 @@ EpochType TerminationDetector::makeEpochRooted(
vtAssertExpr(not (force_use_ds and force_use_wave));

if ((use_ds or force_use_ds) and not force_use_wave) {
return makeEpochRootedDS(successor, label);
return makeEpochRootedDS(successor, label, is_dep);
} else {
return makeEpochRootedWave(successor, label);
return makeEpochRootedWave(successor, label, is_dep);
}
}

Expand All @@ -1047,20 +1056,23 @@ void TerminationDetector::initializeRootedEpoch(
}

EpochType TerminationDetector::makeEpochCollective(
ParentEpochCapture successor
ParentEpochCapture successor, bool is_dep
) {
vt_debug_print(
normal, term,
"makeEpochCollective: no label\n"
);

return makeEpochCollective("", successor);
return makeEpochCollective("", successor, is_dep);
}

EpochType TerminationDetector::makeEpochCollective(
std::string const& label, ParentEpochCapture successor
std::string const& label, ParentEpochCapture successor, bool is_dep
) {
auto const epoch = theEpoch()->getNextCollectiveEpoch();
auto const cat = is_dep ?
epoch::eEpochCategory::DependentEpoch :
epoch::eEpochCategory::NoCategoryEpoch;
auto const epoch = theEpoch()->getNextCollectiveEpoch(cat);
initializeCollectiveEpoch(epoch, label, successor);
return epoch;
}
Expand Down Expand Up @@ -1105,11 +1117,121 @@ void TerminationDetector::initializeCollectiveEpoch(

EpochType TerminationDetector::makeEpoch(
std::string const& label, bool is_coll, UseDS use_ds,
ParentEpochCapture successor
ParentEpochCapture successor, bool is_dep
) {
return is_coll ?
makeEpochCollective(label, successor) :
makeEpochRooted(label, use_ds, successor);
makeEpochCollective(label, successor, is_dep) :
makeEpochRooted(label, use_ds, successor, is_dep);
}

void TerminationDetector::releaseEpoch(EpochType epoch) {
bool const is_dep = isDep(epoch);

if (is_dep) {
// Put the epoch in the released set, which is not conclusive due to
// dependencies, which effects the status. An epoch is *released* iff the
lifflander marked this conversation as resolved.
Show resolved Hide resolved
// epoch is in the released set and all succesrros are *released* (or there
lifflander marked this conversation as resolved.
Show resolved Hide resolved
// are no successors). The epoch any_epoch_sentinel does not count as a
// succcessor.
epoch_released_.insert(epoch);

bool const is_released = epochReleased(epoch);
if (is_released) {
runReleaseEpochActions(epoch);
} else {
// Enqueue continuations to potentially release this epoch since the
// successor graph is not inverted (one-way knowledge)
auto const& successors = getEpochDep(epoch)->getSuccessors();
vtAssert(successors.size() > 0, "Must have unreleased successors in this case");
for (auto&& suc : successors) {
if (not epochReleased(suc)) {
onReleaseEpoch(suc, [epoch]{ theTerm()->releaseEpoch(epoch); });
}
}
}
} else {
// The user might have made a mistake if they are trying to release an epoch
// that is released-by-default (not dependent)
vtWarn("Trying to release non-dependent epoch");
}
}

void TerminationDetector::runReleaseEpochActions(EpochType epoch) {
auto iter = epoch_release_action_.find(epoch);
if (iter != epoch_release_action_.end()) {
auto actions = std::move(iter->second);
epoch_release_action_.erase(iter);
for (auto&& fn : actions) {
fn();
}
}
lifflander marked this conversation as resolved.
Show resolved Hide resolved
theMsg()->releaseEpochMsgs(epoch);
}

void TerminationDetector::onReleaseEpoch(EpochType epoch, ActionType action) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the same approach as term actions after #2196

// Run an action if an epoch has been released
bool const is_dep = isDep(epoch);
if (not is_dep or (is_dep and epochReleased(epoch))) {
action();
} else {
epoch_release_action_[epoch].push_back(action);
}
}

bool TerminationDetector::epochSuccessorsReleased(EpochType epoch) {
// Test of all parents of a given epoch are released
bool released = true;
auto const& successors = getEpochDep(epoch)->getSuccessors();
if (successors.size() != 0) {
for (auto&& suc : successors) {
released &= epochReleased(suc);
}
}
return released;
}

bool TerminationDetector::epochReleased(EpochType epoch) {
// Because of case (2), ignore dep <- no-dep because this should not be called
// unless dep is released
bool const is_dep = isDep(epoch);
if (not is_dep) {
return true;
}

// Terminated epochs are always released
bool const is_term = theEpoch()->getTerminatedWindow(epoch)->isTerminated(
epoch
);
if (is_term) {
return true;
}
Comment on lines +1149 to +1155
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be true, or an empty dependent epoch could terminate before being released, which seems counter-intuitive.

auto epoch = vt::theTerm()->makeEpochCollective(term::ParentEpochCapture{}, true);
vt::theTerm()->finishedEpoch(epoch);
vt::theTerm()->addAction(epoch, {...});

Your added action could run before releasing in the above, or in cases where a set of functions may or may not actually send messages. We should produce once on dependent epoch creation and consume on release, to align the no-messages behavior with typical behavior.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lifflander What are your thoughts on Matthew's concern?


// All successors must be released for an epoch to be released even if its in
lifflander marked this conversation as resolved.
Show resolved Hide resolved
// the release set. Epochs are put in the release set early as to reduce
// tracking of epoch "release chains"
bool const is_successors_released = epochSuccessorsReleased(epoch);
if (not is_successors_released) {
return false;
}

// Check the release set
auto iter = epoch_released_.find(epoch);
return iter != epoch_released_.end();
}

void TerminationDetector::cleanupReleasedEpoch(EpochType epoch) {
bool const is_dep = isDep(epoch);
if (is_dep) {
bool const is_term = theEpoch()->getTerminatedWindow(epoch)->isTerminated(
epoch
);
if (is_term) {
auto iter = epoch_released_.find(epoch);
if (iter != epoch_released_.end()) {
epoch_released_.erase(iter);
}
lifflander marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

void TerminationDetector::activateEpoch(EpochType const& epoch) {
Expand Down
Loading