Skip to content

Commit

Permalink
DPL: add combine / concat methods for topology creation
Browse files Browse the repository at this point in the history
* Use concat to join multiple WorkflowSpec.
* Use combine to (optionally) merge all the DataProcessors
  into a single one.
  • Loading branch information
ktf committed Apr 21, 2022
1 parent 143c7ff commit c89da59
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 48 deletions.
31 changes: 23 additions & 8 deletions Framework/Core/include/Framework/WorkflowSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_WORKFLOWSPEC_H
#define FRAMEWORK_WORKFLOWSPEC_H
#ifndef O2_FRAMEWORK_WORKFLOWSPEC_H_
#define O2_FRAMEWORK_WORKFLOWSPEC_H_

#include "Framework/DataProcessorSpec.h"
#include "Framework/AlgorithmSpec.h"
Expand All @@ -18,9 +18,7 @@
#include <functional>
#include <cstddef>

namespace o2
{
namespace framework
namespace o2::framework
{
using WorkflowSpec = std::vector<DataProcessorSpec>;

Expand Down Expand Up @@ -112,7 +110,24 @@ DataProcessorSpec timePipeline(DataProcessorSpec original,
/// Each ; delimits an InputSpec.
std::vector<InputSpec> select(char const* matcher = "");

} // namespace framework
} // namespace o2
namespace workflow
{
WorkflowSpec combine(const char* name, std::vector<DataProcessorSpec> const& specs, bool doIt);

template <typename T, typename... ARGS>
WorkflowSpec concat(T&& t, ARGS&&... args)
{
if constexpr (sizeof...(args) == 0) {
return t;
} else {
auto rest = concat(std::forward<ARGS>(args)...);
// insert rest at the end of t
t.insert(t.end(), rest.begin(), rest.end());
return t;
}
}
} // namespace workflow

} // namespace o2::framework

#endif // FRAMEWORK_WORKFLOWSPEC_H
#endif // O2_FRAMEWORK_WORKFLOWSPEC_H_
6 changes: 3 additions & 3 deletions Framework/Core/include/Framework/WorkflowSpecNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_WORKFLOWSPEC_H_
#define O2_FRAMEWORK_WORKFLOWSPEC_H_
#ifndef O2_FRAMEWORK_WORKFLOWSPECNODE_H_
#define O2_FRAMEWORK_WORKFLOWSPECNODE_H_
#include "Framework/DataProcessorSpec.h"
#include <vector>

Expand All @@ -19,4 +19,4 @@ struct WorkflowSpecNode {
std::vector<DataProcessorSpec>& specs;
};
} // namespace o2::framework
#endif // O2_FRAMEWORK_WORKFLOWSPEC_H_
#endif // O2_FRAMEWORK_WORKFLOWSPECNODE_H_
67 changes: 62 additions & 5 deletions Framework/Core/src/WorkflowSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
#include <functional>
#include <string>

namespace o2
{
namespace framework
namespace o2::framework
{

WorkflowSpec parallel(DataProcessorSpec original,
Expand Down Expand Up @@ -156,5 +154,64 @@ std::vector<InputSpec> select(const char* matcher)
return DataDescriptorQueryBuilder::parse(matcher);
}

} // namespace framework
} // namespace o2
namespace workflow
{
WorkflowSpec combine(char const* name, std::vector<DataProcessorSpec> const& specs, bool doIt)
{
if (!doIt) {
return specs;
}

DataProcessorSpec combined;
combined.name = name;
// add all the inputs to combined
for (auto& spec : specs) {
for (auto& input : spec.inputs) {
combined.inputs.push_back(input);
}
for (auto& output : spec.outputs) {
combined.outputs.push_back(output);
}
for (auto& option : spec.options) {
combined.options.push_back(option);
}
for (auto& label : spec.labels) {
combined.labels.push_back(label);
}
for (auto& service : spec.requiredServices) {
// Insert in the final list of services
// only if a spec with the same name is not there
// already.
bool found = false;
for (auto& existing : combined.requiredServices) {
if (existing.name == service.name) {
found = true;
break;
}
}
if (!found) {
combined.requiredServices.push_back(service);
}
}
}

combined.algorithm = AlgorithmSpec{[specs](InitContext& ctx) {
std::vector<AlgorithmSpec::ProcessCallback> callbacks;
for (auto& spec : specs) {
if (spec.algorithm.onInit) {
callbacks.push_back(spec.algorithm.onInit(ctx));
} else if (spec.algorithm.onProcess) {
callbacks.push_back(spec.algorithm.onProcess);
}
}
return [callbacks](ProcessingContext& context) {
for (auto& callback : callbacks) {
callback(context);
}
};
}};
return {combined};
}
} // namespace workflow

} // namespace o2::framework
67 changes: 35 additions & 32 deletions Framework/TestWorkflows/src/o2DiamondWorkflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ AlgorithmSpec simplePipe(std::string const& what, int minDelay)
srand(getpid());
LOG(info) << "There are " << runningWorkflow.devices.size() << " devices in the workflow";
return adaptStateless([what, minDelay](DataAllocator& outputs, RawDeviceService& device) {
LOGP(info, "Invoked {}", what);
device.device()->WaitFor(std::chrono::milliseconds(minDelay));
auto& bData = outputs.make<int>(OutputRef{what}, 1);
});
Expand All @@ -71,36 +72,38 @@ AlgorithmSpec simplePipe(std::string const& what, int minDelay)
// This is how you can define your processing in a declarative way
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
{
return WorkflowSpec{
{"A",
Inputs{},
{OutputSpec{{"a1"}, "TST", "A1"},
OutputSpec{{"a2"}, "TST", "A2"}},
AlgorithmSpec{adaptStateless(
[](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context) {
device.device()->WaitFor(std::chrono::seconds(rand() % 2));
auto& aData = outputs.make<int>(OutputRef{"a1"}, 1);
auto& bData = outputs.make<int>(OutputRef{"a2"}, 1);
})},
{ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}}}},
{"B",
{InputSpec{"x", "TST", "A1", Lifetime::Timeframe, {ConfigParamSpec{"somestring", VariantType::String, "", {"Some input param"}}}}},
{OutputSpec{{"b1"}, "TST", "B1"}},
simplePipe("b1", 5000)},
{"C",
Inputs{InputSpec{"x", "TST", "A2"}},
Outputs{OutputSpec{{"c1"}, "TST", "C1"}},
simplePipe("c1", 5000)},
{"D",
Inputs{
InputSpec{"a", "TST", "A1"},
InputSpec{"b", "TST", "B1"},
InputSpec{"c", "TST", "C1"},
},
Outputs{},
AlgorithmSpec{adaptStateless([](InputRecord& inputs) {
auto ref = inputs.get("b");
auto header = o2::header::get<const DataProcessingHeader*>(ref.header);
LOG(debug) << "Start time: " << header->startTime;
})}}};
DataProcessorSpec a{
.name = "A",
.outputs = {OutputSpec{{"a1"}, "TST", "A1"},
OutputSpec{{"a2"}, "TST", "A2"}},
.algorithm = AlgorithmSpec{adaptStateless(
[](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context) {
device.device()->WaitFor(std::chrono::seconds(rand() % 2));
auto& aData = outputs.make<int>(OutputRef{"a1"}, 1);
auto& bData = outputs.make<int>(OutputRef{"a2"}, 1);
})},
.options = {ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}}}};
DataProcessorSpec b{
.name = "B",
.inputs = {InputSpec{"x", "TST", "A1", Lifetime::Timeframe, {ConfigParamSpec{"somestring", VariantType::String, "", {"Some input param"}}}}},
.outputs = {OutputSpec{{"b1"}, "TST", "B1"}},
.algorithm = simplePipe("b1", 1000)};
DataProcessorSpec c{.name = "C",
.inputs = {InputSpec{"x", "TST", "A2"}},
.outputs = {OutputSpec{{"c1"}, "TST", "C1"}},
.algorithm = simplePipe("c1", 2000)};
DataProcessorSpec d{.name = "D",
.inputs = {InputSpec{"a", "TST", "A1"},
InputSpec{"b", "TST", "B1"},
InputSpec{"c", "TST", "C1"}},
.algorithm = AlgorithmSpec{adaptStateless(
[](InputRecord& inputs) {
auto ref = inputs.get("b");
auto header = o2::header::get<const DataProcessingHeader*>(ref.header);
LOG(info) << "Start time: " << header->startTime;
})}};

return workflow::concat(WorkflowSpec{a},
workflow::combine("combined", {b, c}, false),
WorkflowSpec{d});
}

0 comments on commit c89da59

Please sign in to comment.