Skip to content

Commit

Permalink
Merge pull request #44841 from makortel/alpakaReducedEventSynchronize
Browse files Browse the repository at this point in the history
Reduce Alpaka event synchronization calls via EDMetadata
  • Loading branch information
cmsbuild authored Apr 29, 2024
2 parents 3e31782 + c237b97 commit e6283d3
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 10 deletions.
2 changes: 1 addition & 1 deletion HeterogeneousCore/AlpakaCore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ Also note that the `fillDescription()` function must have the same content for a
* All Event data products in the host memory space are guaranteed to be accessible for all operations (after the data product has been obtained from the `edm::Event` or `device::Event`).
* All EventSetup data products in the device memory space are guaranteed to be accessible only for operations enqueued in the `Queue` given by `device::Event::queue()` when accessed via the `device::EventSetup` (ED modules), or by `device::Record<TRecord>::queue()` when accessed via the `device::Record<TRecord>` (ESProducers).
* The EDM Stream does not proceed to the next Event until after all asynchronous work of the current Event has finished.
* **Note**: currently this guarantee does not hold if the job has any EDModule that launches asynchronous work but does not explicitly synchronize or produce any device-side data products.
* **Note**: this implies if an EDProducer in its `produce()` function uses the `Event::queue()` or gets a device-side data product, and does not produce any device-side data products, the `produce()` call will be synchronous (i.e. will block the CPU thread until the asynchronous work finishes)

## Examples

Expand Down
4 changes: 4 additions & 0 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/EDMetadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
Queue& queue() const { return *queue_; }

void recordEvent() {}
void discardEvent() {}

private:
std::shared_ptr<Queue> queue_;
Expand All @@ -73,6 +74,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
void enqueueCallback(edm::WaitingTaskWithArenaHolder holder);

void recordEvent() { alpaka::enqueue(*queue_, *event_); }
void discardEvent() { event_.reset(); }

/**
* Synchronizes 'consumer' metadata wrt. 'this' in the event product
Expand All @@ -92,6 +94,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
// consumer or not. The goal is to have a "chain" of modules to
// queue their work to the same queue.
mutable std::atomic<bool> mayReuseQueue_ = true;
// Cache to potentially reduce alpaka::wait() calls
mutable std::atomic<bool> eventComplete_ = false;
};
#endif
} // namespace ALPAKA_ACCELERATOR_NAMESPACE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {

std::shared_ptr<EDMetadata> metadata() { return metadata_; }

void finish();
// true if asynchronous work was (possibly) launched
void finish(bool launchedAsyncWork);

private:
std::shared_ptr<EDMetadata> metadata_;
Expand Down
4 changes: 4 additions & 0 deletions HeterogeneousCore/AlpakaCore/interface/alpaka/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,17 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE::device {
}
}

// implementation details
bool wasQueueUsed() const { return queueUsed_; }

private:
// Having both const and non-const here in order to serve the
// clients with one device::Event class
edm::Event const& constEvent_;
edm::Event* event_ = nullptr;

std::shared_ptr<EDMetadata> metadata_;

// device::Event is not supposed to be const-thread-safe, so no
// additional protection is needed.
mutable bool queueUsed_ = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
device::EventSetup const es(iSetup, ev.device());
produce(sid, ev, es);
this->putBackend(iEvent);
sentry.finish();
sentry.finish(ev.wasQueueUsed());
}

virtual void produce(edm::StreamID sid, device::Event& iEvent, device::EventSetup const& iSetup) const = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
device::EventSetup const es(iSetup, ev.device());
produce(ev, es);
this->putBackend(iEvent);
sentry.finish();
sentry.finish(ev.wasQueueUsed());
}

virtual void produce(device::Event& iEvent, device::EventSetup const& iSetup) = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
device::EventSetup const es(iSetup, ev.device());
produce(ev, es);
this->putBackend(iEvent);
sentry.finish();
sentry.finish(ev.wasQueueUsed());
}

virtual void acquire(device::Event const& iEvent, device::EventSetup const& iSetup) = 0;
Expand Down
24 changes: 22 additions & 2 deletions HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,14 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
// TODO: a callback notifying a WaitingTaskHolder (or similar)
// would avoid blocking the CPU, but would also require more work.

if (event_) {
// If event_ is null, the EDMetadata was either
// default-constructed, or fully synchronized before leaving the
// produce() call, so no synchronization is needed.
// If the queue was re-used, then some other EDMetadata object in
// the same edm::Event records the event_ (in the same queue) and
// calls alpaka::wait(), and therefore this wait() call can be
// skipped).
if (event_ and not eventComplete_ and mayReuseQueue_) {
// Must not throw in a destructor, and if there were an
// exception could not really propagate it anyway.
CMS_SA_ALLOW try { alpaka::wait(*event_); } catch (...) {
Expand Down Expand Up @@ -42,12 +49,25 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
}
}

if (eventComplete_) {
return;
}

// TODO: how necessary this check is?
if (alpaka::getDev(*queue_) != alpaka::getDev(*consumer.queue_)) {
throw edm::Exception(edm::errors::LogicError) << "Handling data from multiple devices is not yet supported";
}

if (not alpaka::isComplete(*event_)) {
// If the event has been discarded, the produce() function that
// constructed this EDMetadata object did not launch any
// asynchronous work.
if (not event_) {
return;
}

if (alpaka::isComplete(*event_)) {
eventComplete_ = true;
} else {
// Event not yet occurred, so need to add synchronization
// here. Sychronization is done by making the queue to wait
// for an event, so all subsequent work in the queue will run
Expand Down
11 changes: 10 additions & 1 deletion HeterogeneousCore/AlpakaCore/src/alpaka/EDMetadataSentry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ namespace ALPAKA_ACCELERATOR_NAMESPACE {
#endif
}

void EDMetadataSentry::finish() { metadata_->recordEvent(); }
void EDMetadataSentry::finish(bool launchedAsyncWork) {
if (launchedAsyncWork) {
metadata_->recordEvent();
} else {
// If we are certain no asynchronous work was launched (i.e.
// the Queue was not used in any way), there is no need to
// synchronize, and the Event can be discarded.
metadata_->discardEvent();
}
}
} // namespace detail
} // namespace ALPAKA_ACCELERATOR_NAMESPACE
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include "DataFormats/PortableTestObjects/interface/alpaka/TestDeviceCollection.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Utilities/interface/InputTag.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/global/EDProducer.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDPutToken.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/ESGetToken.h"
#include "HeterogeneousCore/AlpakaInterface/interface/config.h"

namespace ALPAKA_ACCELERATOR_NAMESPACE {
/**
* This EDProducer only consumes a device EDProduct, and is intended
* only for testing purposes. Do not use it as an example.
*/
class TestAlpakaGlobalProducerNoOutput : public global::EDProducer<> {
public:
TestAlpakaGlobalProducerNoOutput(edm::ParameterSet const& config)
: getToken_(consumes(config.getParameter<edm::InputTag>("source"))) {}

void produce(edm::StreamID, device::Event& iEvent, device::EventSetup const& iSetup) const override {
[[maybe_unused]] auto const& input = iEvent.get(getToken_);
}

static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;
desc.add("source", edm::InputTag{});

descriptions.addWithDefaultLabel(desc);
}

private:
const device::EDGetToken<portabletest::TestDeviceCollection> getToken_;
};
} // namespace ALPAKA_ACCELERATOR_NAMESPACE

#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h"
DEFINE_FWK_ALPAKA_MODULE(TestAlpakaGlobalProducerNoOutput);
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#include "DataFormats/PortableTestObjects/interface/alpaka/TestDeviceCollection.h"
#include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h"
#include "FWCore/ParameterSet/interface/ParameterSet.h"
#include "FWCore/ParameterSet/interface/ParameterSetDescription.h"
#include "FWCore/Utilities/interface/InputTag.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/stream/SynchronizingEDProducer.h"
#include "HeterogeneousCore/AlpakaCore/interface/alpaka/EDPutToken.h"
#include "HeterogeneousCore/AlpakaInterface/interface/config.h"

#include "TestAlgo.h"

namespace ALPAKA_ACCELERATOR_NAMESPACE {
/**
* This class demonstrates a stream EDProducer that
* - produces a device EDProduct (that can get transferred to host automatically)
* - synchronizes in a non-blocking way with the ExternalWork module
* ability (via the SynchronizingEDProcucer base class)
*/
class TestAlpakaStreamSynchronizingProducerToDevice : public stream::SynchronizingEDProducer<> {
public:
TestAlpakaStreamSynchronizingProducerToDevice(edm::ParameterSet const& iConfig)
: putToken_{produces()},
size_{iConfig.getParameter<edm::ParameterSet>("size").getParameter<int32_t>(
EDM_STRINGIZE(ALPAKA_ACCELERATOR_NAMESPACE))} {}

void acquire(device::Event const& iEvent, device::EventSetup const& iSetup) override {
deviceProduct_ = std::make_unique<portabletest::TestDeviceCollection>(size_, iEvent.queue());

// run the algorithm, potentially asynchronously
algo_.fill(iEvent.queue(), *deviceProduct_);
}

void produce(device::Event& iEvent, device::EventSetup const& iSetup) override {
iEvent.put(putToken_, std::move(deviceProduct_));
}

static void fillDescriptions(edm::ConfigurationDescriptions& descriptions) {
edm::ParameterSetDescription desc;

edm::ParameterSetDescription psetSize;
psetSize.add<int32_t>("alpaka_serial_sync");
psetSize.add<int32_t>("alpaka_cuda_async");
psetSize.add<int32_t>("alpaka_rocm_async");
desc.add("size", psetSize);

descriptions.addWithDefaultLabel(desc);
}

private:
const device::EDPutToken<portabletest::TestDeviceCollection> putToken_;
const int32_t size_;

// implementation of the algorithm
TestAlgo algo_;

std::unique_ptr<portabletest::TestDeviceCollection> deviceProduct_;
};

} // namespace ALPAKA_ACCELERATOR_NAMESPACE

#include "HeterogeneousCore/AlpakaCore/interface/alpaka/MakerMacros.h"
DEFINE_FWK_ALPAKA_MODULE(TestAlpakaStreamSynchronizingProducerToDevice);
29 changes: 27 additions & 2 deletions HeterogeneousCore/AlpakaTest/test/testAlpakaModules_cfg.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,22 @@
intSource = cms.InputTag("intProduct"),
expectedInt = cms.int32(84) # sum of intProduct and esProducerA
)
process.alpakaStreamSynchronizingProducerToDevice = cms.EDProducer("TestAlpakaStreamSynchronizingProducerToDevice@alpaka",
size = cms.PSet(
alpaka_serial_sync = cms.int32(1),
alpaka_cuda_async = cms.int32(2),
alpaka_rocm_async = cms.int32(3),
)
)

process.alpakaGlobalConsumer = cms.EDAnalyzer("TestAlpakaAnalyzer",
source = cms.InputTag("alpakaGlobalProducer"),
expectSize = cms.int32(10),
expectBackend = cms.string("SerialSync")
)
process.alpakaGlobalDeviceConsumer = cms.EDProducer("TestAlpakaGlobalProducerNoOutput@alpaka",
source = cms.InputTag("alpakaGlobalProducer")
)
process.alpakaGlobalConsumerE = process.alpakaGlobalConsumer.clone(
source = "alpakaGlobalProducerE",
expectXvalues = cms.vdouble([(i%2)*10+1 + abs(27)+i*2 for i in range(0,5)] + [0]*5)
Expand All @@ -112,6 +122,9 @@
expectSize = cms.int32(5),
expectBackend = cms.string("SerialSync")
)
process.alpakaStreamDeviceConsumer = process.alpakaGlobalDeviceConsumer.clone(
source = "alpakaStreamProducer"
)
process.alpakaStreamInstanceConsumer = cms.EDAnalyzer("TestAlpakaAnalyzer",
source = cms.InputTag("alpakaStreamInstanceProducer", "testInstance"),
expectSize = cms.int32(6),
Expand All @@ -122,6 +135,10 @@
expectSize = cms.int32(10),
expectBackend = cms.string("SerialSync")
)
process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer1 = process.alpakaGlobalDeviceConsumer.clone(
source = "alpakaStreamSynchronizingProducerToDevice"
)
process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer2 = process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer1.clone()
process.alpakaNullESConsumer = cms.EDProducer("TestAlpakaGlobalProducerNullES@alpaka",
eventSetupSource = cms.ESInputTag("", "null")
)
Expand All @@ -132,7 +149,10 @@
for name in ["ESProducerA", "ESProducerB", "ESProducerC", "ESProducerD", "ESProducerE",
"ESProducerNull",
"GlobalProducer", "GlobalProducerE",
"StreamProducer", "StreamInstanceProducer", "StreamSynchronizingProducer",
"StreamProducer", "StreamInstanceProducer",
"StreamSynchronizingProducer", "StreamSynchronizingProducerToDevice",
"GlobalDeviceConsumer", "StreamDeviceConsumer",
"StreamSynchronizingProducerToDeviceDeviceConsumer1", "StreamSynchronizingProducerToDeviceDeviceConsumer2",
"NullESConsumer"]:
mod = getattr(process, "alpaka"+name)
mod.alpaka = cms.untracked.PSet(backend = cms.untracked.string(args.moduleBackend))
Expand Down Expand Up @@ -173,14 +193,19 @@ def setExpect(m, size):
process.alpakaGlobalProducerE,
process.alpakaStreamProducer,
process.alpakaStreamInstanceProducer,
process.alpakaStreamSynchronizingProducer
process.alpakaStreamSynchronizingProducer,
process.alpakaStreamSynchronizingProducerToDevice
)
process.p = cms.Path(
process.alpakaGlobalConsumer+
process.alpakaGlobalDeviceConsumer+
process.alpakaGlobalConsumerE+
process.alpakaStreamConsumer+
process.alpakaStreamDeviceConsumer+
process.alpakaStreamInstanceConsumer+
process.alpakaStreamSynchronizingConsumer+
process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer1+
process.alpakaStreamSynchronizingProducerToDeviceDeviceConsumer2+
process.alpakaNullESConsumer,
process.t
)
Expand Down

0 comments on commit e6283d3

Please sign in to comment.