From c89da599c1ab4e5d0c36f8a64b2cfd472278306a Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 11 Apr 2022 14:31:32 +0200 Subject: [PATCH] DPL: add combine / concat methods for topology creation * Use concat to join multiple WorkflowSpec. * Use combine to (optionally) merge all the DataProcessors into a single one. --- .../Core/include/Framework/WorkflowSpec.h | 31 ++++++--- .../Core/include/Framework/WorkflowSpecNode.h | 6 +- Framework/Core/src/WorkflowSpec.cxx | 67 +++++++++++++++++-- .../TestWorkflows/src/o2DiamondWorkflow.cxx | 67 ++++++++++--------- 4 files changed, 123 insertions(+), 48 deletions(-) diff --git a/Framework/Core/include/Framework/WorkflowSpec.h b/Framework/Core/include/Framework/WorkflowSpec.h index 49a7c5bf11820..b41f1d18d658c 100644 --- a/Framework/Core/include/Framework/WorkflowSpec.h +++ b/Framework/Core/include/Framework/WorkflowSpec.h @@ -8,8 +8,8 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef FRAMEWORK_WORKFLOWSPEC_H -#define FRAMEWORK_WORKFLOWSPEC_H +#ifndef O2_FRAMEWORK_WORKFLOWSPEC_H_ +#define O2_FRAMEWORK_WORKFLOWSPEC_H_ #include "Framework/DataProcessorSpec.h" #include "Framework/AlgorithmSpec.h" @@ -18,9 +18,7 @@ #include #include -namespace o2 -{ -namespace framework +namespace o2::framework { using WorkflowSpec = std::vector; @@ -112,7 +110,24 @@ DataProcessorSpec timePipeline(DataProcessorSpec original, /// Each ; delimits an InputSpec. std::vector select(char const* matcher = ""); -} // namespace framework -} // namespace o2 +namespace workflow +{ +WorkflowSpec combine(const char* name, std::vector const& specs, bool doIt); + +template +WorkflowSpec concat(T&& t, ARGS&&... args) +{ + if constexpr (sizeof...(args) == 0) { + return t; + } else { + auto rest = concat(std::forward(args)...); + // insert rest at the end of t + t.insert(t.end(), rest.begin(), rest.end()); + return t; + } +} +} // namespace workflow + +} // namespace o2::framework -#endif // FRAMEWORK_WORKFLOWSPEC_H +#endif // O2_FRAMEWORK_WORKFLOWSPEC_H_ diff --git a/Framework/Core/include/Framework/WorkflowSpecNode.h b/Framework/Core/include/Framework/WorkflowSpecNode.h index 5eb4210920f58..db21626fbaafc 100644 --- a/Framework/Core/include/Framework/WorkflowSpecNode.h +++ b/Framework/Core/include/Framework/WorkflowSpecNode.h @@ -8,8 +8,8 @@ // In applying this license CERN does not waive the privileges and immunities // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -#ifndef O2_FRAMEWORK_WORKFLOWSPEC_H_ -#define O2_FRAMEWORK_WORKFLOWSPEC_H_ +#ifndef O2_FRAMEWORK_WORKFLOWSPECNODE_H_ +#define O2_FRAMEWORK_WORKFLOWSPECNODE_H_ #include "Framework/DataProcessorSpec.h" #include @@ -19,4 +19,4 @@ struct WorkflowSpecNode { std::vector& specs; }; } // namespace o2::framework -#endif // O2_FRAMEWORK_WORKFLOWSPEC_H_ +#endif // O2_FRAMEWORK_WORKFLOWSPECNODE_H_ diff --git a/Framework/Core/src/WorkflowSpec.cxx b/Framework/Core/src/WorkflowSpec.cxx index 55ccd2e1e441b..5ac38a5638fe6 100644 --- a/Framework/Core/src/WorkflowSpec.cxx +++ b/Framework/Core/src/WorkflowSpec.cxx @@ -17,9 +17,7 @@ #include #include -namespace o2 -{ -namespace framework +namespace o2::framework { WorkflowSpec parallel(DataProcessorSpec original, @@ -156,5 +154,64 @@ std::vector select(const char* matcher) return DataDescriptorQueryBuilder::parse(matcher); } -} // namespace framework -} // namespace o2 +namespace workflow +{ +WorkflowSpec combine(char const* name, std::vector const& specs, bool doIt) +{ + if (!doIt) { + return specs; + } + + DataProcessorSpec combined; + combined.name = name; + // add all the inputs to combined + for (auto& spec : specs) { + for (auto& input : spec.inputs) { + combined.inputs.push_back(input); + } + for (auto& output : spec.outputs) { + combined.outputs.push_back(output); + } + for (auto& option : spec.options) { + combined.options.push_back(option); + } + for (auto& label : spec.labels) { + combined.labels.push_back(label); + } + for (auto& service : spec.requiredServices) { + // Insert in the final list of services + // only if a spec with the same name is not there + // already. + bool found = false; + for (auto& existing : combined.requiredServices) { + if (existing.name == service.name) { + found = true; + break; + } + } + if (!found) { + combined.requiredServices.push_back(service); + } + } + } + + combined.algorithm = AlgorithmSpec{[specs](InitContext& ctx) { + std::vector callbacks; + for (auto& spec : specs) { + if (spec.algorithm.onInit) { + callbacks.push_back(spec.algorithm.onInit(ctx)); + } else if (spec.algorithm.onProcess) { + callbacks.push_back(spec.algorithm.onProcess); + } + } + return [callbacks](ProcessingContext& context) { + for (auto& callback : callbacks) { + callback(context); + } + }; + }}; + return {combined}; +} +} // namespace workflow + +} // namespace o2::framework diff --git a/Framework/TestWorkflows/src/o2DiamondWorkflow.cxx b/Framework/TestWorkflows/src/o2DiamondWorkflow.cxx index 6326e8e388c9b..6603c0f07d97b 100644 --- a/Framework/TestWorkflows/src/o2DiamondWorkflow.cxx +++ b/Framework/TestWorkflows/src/o2DiamondWorkflow.cxx @@ -62,6 +62,7 @@ AlgorithmSpec simplePipe(std::string const& what, int minDelay) srand(getpid()); LOG(info) << "There are " << runningWorkflow.devices.size() << " devices in the workflow"; return adaptStateless([what, minDelay](DataAllocator& outputs, RawDeviceService& device) { + LOGP(info, "Invoked {}", what); device.device()->WaitFor(std::chrono::milliseconds(minDelay)); auto& bData = outputs.make(OutputRef{what}, 1); }); @@ -71,36 +72,38 @@ AlgorithmSpec simplePipe(std::string const& what, int minDelay) // This is how you can define your processing in a declarative way WorkflowSpec defineDataProcessing(ConfigContext const& specs) { - return WorkflowSpec{ - {"A", - Inputs{}, - {OutputSpec{{"a1"}, "TST", "A1"}, - OutputSpec{{"a2"}, "TST", "A2"}}, - AlgorithmSpec{adaptStateless( - [](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context) { - device.device()->WaitFor(std::chrono::seconds(rand() % 2)); - auto& aData = outputs.make(OutputRef{"a1"}, 1); - auto& bData = outputs.make(OutputRef{"a2"}, 1); - })}, - {ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}}}}, - {"B", - {InputSpec{"x", "TST", "A1", Lifetime::Timeframe, {ConfigParamSpec{"somestring", VariantType::String, "", {"Some input param"}}}}}, - {OutputSpec{{"b1"}, "TST", "B1"}}, - simplePipe("b1", 5000)}, - {"C", - Inputs{InputSpec{"x", "TST", "A2"}}, - Outputs{OutputSpec{{"c1"}, "TST", "C1"}}, - simplePipe("c1", 5000)}, - {"D", - Inputs{ - InputSpec{"a", "TST", "A1"}, - InputSpec{"b", "TST", "B1"}, - InputSpec{"c", "TST", "C1"}, - }, - Outputs{}, - AlgorithmSpec{adaptStateless([](InputRecord& inputs) { - auto ref = inputs.get("b"); - auto header = o2::header::get(ref.header); - LOG(debug) << "Start time: " << header->startTime; - })}}}; + DataProcessorSpec a{ + .name = "A", + .outputs = {OutputSpec{{"a1"}, "TST", "A1"}, + OutputSpec{{"a2"}, "TST", "A2"}}, + .algorithm = AlgorithmSpec{adaptStateless( + [](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context) { + device.device()->WaitFor(std::chrono::seconds(rand() % 2)); + auto& aData = outputs.make(OutputRef{"a1"}, 1); + auto& bData = outputs.make(OutputRef{"a2"}, 1); + })}, + .options = {ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}}}}; + DataProcessorSpec b{ + .name = "B", + .inputs = {InputSpec{"x", "TST", "A1", Lifetime::Timeframe, {ConfigParamSpec{"somestring", VariantType::String, "", {"Some input param"}}}}}, + .outputs = {OutputSpec{{"b1"}, "TST", "B1"}}, + .algorithm = simplePipe("b1", 1000)}; + DataProcessorSpec c{.name = "C", + .inputs = {InputSpec{"x", "TST", "A2"}}, + .outputs = {OutputSpec{{"c1"}, "TST", "C1"}}, + .algorithm = simplePipe("c1", 2000)}; + DataProcessorSpec d{.name = "D", + .inputs = {InputSpec{"a", "TST", "A1"}, + InputSpec{"b", "TST", "B1"}, + InputSpec{"c", "TST", "C1"}}, + .algorithm = AlgorithmSpec{adaptStateless( + [](InputRecord& inputs) { + auto ref = inputs.get("b"); + auto header = o2::header::get(ref.header); + LOG(info) << "Start time: " << header->startTime; + })}}; + + return workflow::concat(WorkflowSpec{a}, + workflow::combine("combined", {b, c}, false), + WorkflowSpec{d}); }