Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce Alpaka event synchronization calls via EDMetadata #44841

Merged
merged 3 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion HeterogeneousCore/AlpakaCore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Also note that the `fillDescription()` function must have the same content for a
* All Event data products in the host memory space are guaranteed to be accessible for all operations (after the data product has been obtained from the `edm::Event` or `device::Event`).
* All EventSetup data products in the device memory space are guaranteed to be accessible only for operations enqueued in the `Queue` given by `device::Event::queue()` when accessed via the `device::EventSetup` (ED modules), or by `device::Record<TRecord>::queue()` when accessed via the `device::Record<TRecord>` (ESProducers).
* The EDM Stream does not proceed to the next Event until after all asynchronous work of the current Event has finished.
* **Note**: currently this guarantee does not hold if the job has any EDModule that launches asynchronous work but does not explicitly synchronize or produce any device-side data products.
* **Note**: this implies if an EDProducer in its `produce()` function uses the `Event::queue()` or gets a device-side data product, and does not produce any device-side data products, the `produce()` call will be synchronous (i.e. will block the CPU thread until the asynchronous work finishes)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This update in the README actually holds (and is stronger) even without this PR, I just hadn't realized it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My brain hurts, but I trust you on this :-)


## Examples

Expand Down
4 changes: 4 additions & 0 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
Queue& queue() const { return *queue_; }

void recordEvent() {}
void discardEvent() {}

private:
std::shared_ptr<Queue> queue_;
Expand All @@ -73,6 +74,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
void enqueueCallback(edm::WaitingTaskWithArenaHolder holder);

void recordEvent() { alpaka::enqueue(*queue_, *event_); }
void discardEvent() { event_.reset(); }

/**
* Synchronizes 'consumer' metadata wrt. 'this' in the event product
Expand All @@ -92,6 +94,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
// consumer or not. The goal is to have a "chain" of modules to
// queue their work to the same queue.
mutable std::atomic<bool> mayReuseQueue_ = true;
// Cache to potentially reduce alpaka::wait() calls
mutable std::atomic<bool> eventComplete_ = false;
};
#endif
} // namespace ALPAKA_ACCELERATOR_NAMESPACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {

std::shared_ptr<EDMetadata> metadata() { return metadata_; }

void finish();
// true if asynchronous work was (possibly) launched
void finish(bool launchedAsyncWork);

private:
std::shared_ptr<EDMetadata> metadata_;
Expand Down
4 changes: 4 additions & 0 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,17 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {
}
}

// implementation details
bool wasQueueUsed() const { return queueUsed_; }

private:
// Having both const and non-const here in order to serve the
// clients with one device::Event class
edm::Event const& constEvent_;
edm::Event* event_ = nullptr;

std::shared_ptr<EDMetadata> metadata_;

// device::Event is not supposed to be const-thread-safe, so no
// additional protection is needed.
mutable bool queueUsed_ = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
device::EventSetup const es(iSetup, ev.device());
produce(sid, ev, es);
this->putBackend(iEvent);
sentry.finish();
sentry.finish(ev.wasQueueUsed());
}

virtual void produce(edm::StreamID sid, device::Event& iEvent, device::EventSetup const& iSetup) const = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
device::EventSetup const es(iSetup, ev.device());
produce(ev, es);
this->putBackend(iEvent);
sentry.finish();
sentry.finish(ev.wasQueueUsed());
}

virtual void produce(device::Event& iEvent, device::EventSetup const& iSetup) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
device::EventSetup const es(iSetup, ev.device());
produce(ev, es);
this->putBackend(iEvent);
sentry.finish();
sentry.finish(ev.wasQueueUsed());
}

virtual void acquire(device::Event const& iEvent, device::EventSetup const& iSetup) = 0;
Expand Down
24 changes: 22 additions & 2 deletions HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
// TODO: a callback notifying a WaitingTaskHolder (or similar)
// would avoid blocking the CPU, but would also require more work.

if (event_) {
// If event_ is null, the EDMetadata was either
// default-constructed, or fully synchronized before leaving the
// produce() call, so no synchronization is needed.
// If the queue was re-used, then some other EDMetadata object in
// the same edm::Event records the event_ (in the same queue) and
// calls alpaka::wait(), and therefore this wait() call can be
// skipped).
if (event_ and not eventComplete_ and mayReuseQueue_) {
// Must not throw in a destructor, and if there were an
// exception could not really propagate it anyway.
CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) {
Expand Down Expand Up @@ -42,12 +49,25 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
}
}

if (eventComplete_) {
return;
}

// TODO: how necessary this check is?
if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) {
throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported";
}

if (not alpaka::isComplete(*event_)) {
// If the event has been discarded, the produce() function that
// constructed this EDMetadata object did not launch any
// asynchronous work.
if (not event_) {
return;
}

if (alpaka::isComplete(*event_)) {
eventComplete_ = true;
} else {
// Event not yet occurred, so need to add synchronization
// here. Sychronization is done by making the queue to wait
// for an event, so all subsequent work in the queue will run
Expand Down
11 changes: 10 additions & 1 deletion HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadataSentry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
#endif
}

void EDMetadataSentry::finish() { metadata_->recordEvent(); }
void EDMetadataSentry::finish(bool launchedAsyncWork) {
if (launchedAsyncWork) {
metadata_->recordEvent();
} else {
// If we are certain no asynchronous work was launched (i.e.
// the Queue was not used in any way), there is no need to
// synchronize, and the Event can be discarded.
metadata_->discardEvent();
}
}
} // namespace detail
} // namespace ALPAKA_ACCELERATOR_NAMESPACE
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include "DataFormats/PortableTestObjects/interface/alpaka/TestDeviceCollection.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Utilities/interface/InputTag.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/global/EDProducer.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDPutToken.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESGetToken.h"
#include "HeterogeneousCore/AlpakaInterface/interface/config.h"

namespace ALPAKA_ACCELERATOR_NAMESPACE {
/**
* This EDProducer only consumes a device EDProduct, and is intended
* only for testing purposes. Do not use it as an example.
*/
class TestAlpakaGlobalProducerNoOutput : public global::EDProducer<> {
public:
TestAlpakaGlobalProducerNoOutput(edm::ParameterSet const& config)
: getToken_(consumes(config.getParameter<edm::InputTag>("source"))) {}

void produce(edm::StreamID, device::Event& iEvent, device::EventSetup const& iSetup) const override {
[[maybe_unused]] auto const& input = iEvent.get(getToken_);
}

static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
desc.add("source", edm::InputTag{});

descriptions.addWithDefaultLabel(desc);
}

private:
const device::EDGetToken<portabletest::TestDeviceCollection> getToken_;
};
} // namespace ALPAKA_ACCELERATOR_NAMESPACE

#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h"
DEFINE_FWK_ALPAKA_MODULE(TestAlpakaGlobalProducerNoOutput);
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include "DataFormats/PortableTestObjects/interface/alpaka/TestDeviceCollection.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Utilities/interface/InputTag.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/stream/SynchronizingEDProducer.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDPutToken.h"
#include "HeterogeneousCore/AlpakaInterface/interface/config.h"

#include "TestAlgo.h"

namespace ALPAKA_ACCELERATOR_NAMESPACE {
/**
* This class demonstrates a stream EDProducer that
* - produces a device EDProduct (that can get transferred to host automatically)
* - synchronizes in a non-blocking way with the ExternalWork module
* ability (via the SynchronizingEDProcucer base class)
*/
class TestAlpakaStreamSynchronizingProducerToDevice : public stream::SynchronizingEDProducer<> {
public:
TestAlpakaStreamSynchronizingProducerToDevice(edm::ParameterSet const& iConfig)
: putToken_{produces()},
size_{iConfig.getParameter<edm::ParameterSet>("size").getParameter<int32_t>(
EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE))} {}

void acquire(device::Event const& iEvent, device::EventSetup const& iSetup) override {
deviceProduct_ = std::make_unique<portabletest::TestDeviceCollection>(size_, iEvent.queue());

// run the algorithm, potentially asynchronously
algo_.fill(iEvent.queue(), *deviceProduct_);
}

void produce(device::Event& iEvent, device::EventSetup const& iSetup) override {
iEvent.put(putToken_, std::move(deviceProduct_));
}

static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;

edm::ParameterSetDescription psetSize;
psetSize.add<int32_t>("alpaka_serial_sync");
psetSize.add<int32_t>("alpaka_cuda_async");
psetSize.add<int32_t>("alpaka_rocm_async");
desc.add("size", psetSize);

descriptions.addWithDefaultLabel(desc);
}

private:
const device::EDPutToken<portabletest::TestDeviceCollection> putToken_;
const int32_t size_;

// implementation of the algorithm
TestAlgo algo_;

std::unique_ptr<portabletest::TestDeviceCollection> deviceProduct_;
};

} // namespace ALPAKA_ACCELERATOR_NAMESPACE

#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h"
DEFINE_FWK_ALPAKA_MODULE(TestAlpakaStreamSynchronizingProducerToDevice);
29 changes: 27 additions & 2 deletions HeterogeneousCore/AlpakaTest/test/testAlpakaModules_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,22 @@
intSource = cms.InputTag("intProduct"),
expectedInt = cms.int32(84) # sum of intProduct and esProducerA
)
process.alpakaStreamSynchronizingProducerToDevice = cms.EDProducer("TestAlpakaStreamSynchronizingProducerToDevice@alpaka",
size = cms.PSet(
alpaka_serial_sync = cms.int32(1),
alpaka_cuda_async = cms.int32(2),
alpaka_rocm_async = cms.int32(3),
)
)

process.alpakaGlobalConsumer = cms.EDAnalyzer("TestAlpakaAnalyzer",
source = cms.InputTag("alpakaGlobalProducer"),
expectSize = cms.int32(10),
expectBackend = cms.string("SerialSync")
)
process.alpakaGlobalDeviceConsumer = cms.EDProducer("TestAlpakaGlobalProducerNoOutput@alpaka",
source = cms.InputTag("alpakaGlobalProducer")
)
process.alpakaGlobalConsumerE = process.alpakaGlobalConsumer.clone(
source = "alpakaGlobalProducerE",
expectXvalues = cms.vdouble([(i%2)*10+1 + abs(27)+i*2 for i in range(0,5)] + [0]*5)
Expand All @@ -112,6 +122,9 @@
expectSize = cms.int32(5),
expectBackend = cms.string("SerialSync")
)
process.alpakaStreamDeviceConsumer = process.alpakaGlobalDeviceConsumer.clone(
source = "alpakaStreamProducer"
)
process.alpakaStreamInstanceConsumer = cms.EDAnalyzer("TestAlpakaAnalyzer",
source = cms.InputTag("alpakaStreamInstanceProducer", "testInstance"),
expectSize = cms.int32(6),
Expand All @@ -122,6 +135,10 @@
expectSize = cms.int32(10),
expectBackend = cms.string("SerialSync")
)
process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer1 = process.alpakaGlobalDeviceConsumer.clone(
source = "alpakaStreamSynchronizingProducerToDevice"
)
process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer2 = process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer1.clone()
process.alpakaNullESConsumer = cms.EDProducer("TestAlpakaGlobalProducerNullES@alpaka",
eventSetupSource = cms.ESInputTag("", "null")
)
Expand All @@ -132,7 +149,10 @@
for name in ["ESProducerA", "ESProducerB", "ESProducerC", "ESProducerD", "ESProducerE",
"ESProducerNull",
"GlobalProducer", "GlobalProducerE",
"StreamProducer", "StreamInstanceProducer", "StreamSynchronizingProducer",
"StreamProducer", "StreamInstanceProducer",
"StreamSynchronizingProducer", "StreamSynchronizingProducerToDevice",
"GlobalDeviceConsumer", "StreamDeviceConsumer",
"StreamSynchronizingProducerToDeviceDeviceConsumer1", "StreamSynchronizingProducerToDeviceDeviceConsumer2",
"NullESConsumer"]:
mod = getattr(process, "alpaka"+name)
mod.alpaka = cms.untracked.PSet(backend = cms.untracked.string(args.moduleBackend))
Expand Down Expand Up @@ -173,14 +193,19 @@ def setExpect(m, size):
process.alpakaGlobalProducerE,
process.alpakaStreamProducer,
process.alpakaStreamInstanceProducer,
process.alpakaStreamSynchronizingProducer
process.alpakaStreamSynchronizingProducer,
process.alpakaStreamSynchronizingProducerToDevice
)
process.p = cms.Path(
process.alpakaGlobalConsumer+
process.alpakaGlobalDeviceConsumer+
process.alpakaGlobalConsumerE+
process.alpakaStreamConsumer+
process.alpakaStreamDeviceConsumer+
process.alpakaStreamInstanceConsumer+
process.alpakaStreamSynchronizingConsumer+
process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer1+
process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer2+
process.alpakaNullESConsumer,
process.t
)
Expand Down