Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPL: add combine method to (optionally) run DataProcessors as a single #8548

Merged
merged 1 commit into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions Framework/Core/include/Framework/WorkflowSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_WORKFLOWSPEC_H
#define FRAMEWORK_WORKFLOWSPEC_H
#ifndef O2_FRAMEWORK_WORKFLOWSPEC_H_
#define O2_FRAMEWORK_WORKFLOWSPEC_H_

#include "Framework/DataProcessorSpec.h"
#include "Framework/AlgorithmSpec.h"
Expand All @@ -18,9 +18,7 @@
#include <functional>
#include <cstddef>

namespace o2
{
namespace framework
namespace o2::framework
{
using WorkflowSpec = std::vector<DataProcessorSpec>;

Expand Down Expand Up @@ -112,7 +110,24 @@ DataProcessorSpec timePipeline(DataProcessorSpec original,
/// Each ; delimits an InputSpec.
std::vector<InputSpec> select(char const* matcher = "");

} // namespace framework
} // namespace o2
namespace workflow
{
WorkflowSpec combine(const char* name, std::vector<DataProcessorSpec> const& specs, bool doIt);

template <typename T, typename... ARGS>
WorkflowSpec concat(T&& t, ARGS&&... args)
{
if constexpr (sizeof...(args) == 0) {
return t;
} else {
auto rest = concat(std::forward<ARGS>(args)...);
// insert rest at the end of t
t.insert(t.end(), rest.begin(), rest.end());
return t;
}
}
} // namespace workflow

} // namespace o2::framework

#endif // FRAMEWORK_WORKFLOWSPEC_H
#endif // O2_FRAMEWORK_WORKFLOWSPEC_H_
6 changes: 3 additions & 3 deletions Framework/Core/include/Framework/WorkflowSpecNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_WORKFLOWSPEC_H_
#define O2_FRAMEWORK_WORKFLOWSPEC_H_
#ifndef O2_FRAMEWORK_WORKFLOWSPECNODE_H_
#define O2_FRAMEWORK_WORKFLOWSPECNODE_H_
#include "Framework/DataProcessorSpec.h"
#include <vector>

Expand All @@ -19,4 +19,4 @@ struct WorkflowSpecNode {
std::vector<DataProcessorSpec>& specs;
};
} // namespace o2::framework
#endif // O2_FRAMEWORK_WORKFLOWSPEC_H_
#endif // O2_FRAMEWORK_WORKFLOWSPECNODE_H_
67 changes: 62 additions & 5 deletions Framework/Core/src/WorkflowSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
#include <functional>
#include <string>

namespace o2
{
namespace framework
namespace o2::framework
{

WorkflowSpec parallel(DataProcessorSpec original,
Expand Down Expand Up @@ -156,5 +154,64 @@ std::vector<InputSpec> select(const char* matcher)
return DataDescriptorQueryBuilder::parse(matcher);
}

} // namespace framework
} // namespace o2
namespace workflow
{
WorkflowSpec combine(char const* name, std::vector<DataProcessorSpec> const& specs, bool doIt)
{
if (!doIt) {
return specs;
}

DataProcessorSpec combined;
combined.name = name;
// add all the inputs to combined
for (auto& spec : specs) {
for (auto& input : spec.inputs) {
combined.inputs.push_back(input);
}
for (auto& output : spec.outputs) {
combined.outputs.push_back(output);
}
for (auto& option : spec.options) {
combined.options.push_back(option);
}
for (auto& label : spec.labels) {
combined.labels.push_back(label);
}
for (auto& service : spec.requiredServices) {
// Insert in the final list of services
// only if a spec with the same name is not there
// already.
bool found = false;
for (auto& existing : combined.requiredServices) {
if (existing.name == service.name) {
found = true;
break;
}
}
if (!found) {
combined.requiredServices.push_back(service);
}
}
}

combined.algorithm = AlgorithmSpec{[specs](InitContext& ctx) {
std::vector<AlgorithmSpec::ProcessCallback> callbacks;
for (auto& spec : specs) {
if (spec.algorithm.onInit) {
callbacks.push_back(spec.algorithm.onInit(ctx));
} else if (spec.algorithm.onProcess) {
callbacks.push_back(spec.algorithm.onProcess);
}
}
return [callbacks](ProcessingContext& context) {
for (auto& callback : callbacks) {
callback(context);
}
};
}};
return {combined};
}
} // namespace workflow

} // namespace o2::framework
67 changes: 35 additions & 32 deletions Framework/TestWorkflows/src/o2DiamondWorkflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ AlgorithmSpec simplePipe(std::string const& what, int minDelay)
srand(getpid());
LOG(info) << "There are " << runningWorkflow.devices.size() << " devices in the workflow";
return adaptStateless([what, minDelay](DataAllocator& outputs, RawDeviceService& device) {
LOGP(info, "Invoked {}", what);
device.device()->WaitFor(std::chrono::milliseconds(minDelay));
auto& bData = outputs.make<int>(OutputRef{what}, 1);
});
Expand All @@ -71,36 +72,38 @@ AlgorithmSpec simplePipe(std::string const& what, int minDelay)
// This is how you can define your processing in a declarative way
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
{
return WorkflowSpec{
{"A",
Inputs{},
{OutputSpec{{"a1"}, "TST", "A1"},
OutputSpec{{"a2"}, "TST", "A2"}},
AlgorithmSpec{adaptStateless(
[](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context) {
device.device()->WaitFor(std::chrono::seconds(rand() % 2));
auto& aData = outputs.make<int>(OutputRef{"a1"}, 1);
auto& bData = outputs.make<int>(OutputRef{"a2"}, 1);
})},
{ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}}}},
{"B",
{InputSpec{"x", "TST", "A1", Lifetime::Timeframe, {ConfigParamSpec{"somestring", VariantType::String, "", {"Some input param"}}}}},
{OutputSpec{{"b1"}, "TST", "B1"}},
simplePipe("b1", 5000)},
{"C",
Inputs{InputSpec{"x", "TST", "A2"}},
Outputs{OutputSpec{{"c1"}, "TST", "C1"}},
simplePipe("c1", 5000)},
{"D",
Inputs{
InputSpec{"a", "TST", "A1"},
InputSpec{"b", "TST", "B1"},
InputSpec{"c", "TST", "C1"},
},
Outputs{},
AlgorithmSpec{adaptStateless([](InputRecord& inputs) {
auto ref = inputs.get("b");
auto header = o2::header::get<const DataProcessingHeader*>(ref.header);
LOG(debug) << "Start time: " << header->startTime;
})}}};
DataProcessorSpec a{
.name = "A",
.outputs = {OutputSpec{{"a1"}, "TST", "A1"},
OutputSpec{{"a2"}, "TST", "A2"}},
.algorithm = AlgorithmSpec{adaptStateless(
[](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context) {
device.device()->WaitFor(std::chrono::seconds(rand() % 2));
auto& aData = outputs.make<int>(OutputRef{"a1"}, 1);
auto& bData = outputs.make<int>(OutputRef{"a2"}, 1);
})},
.options = {ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}}}};
DataProcessorSpec b{
.name = "B",
.inputs = {InputSpec{"x", "TST", "A1", Lifetime::Timeframe, {ConfigParamSpec{"somestring", VariantType::String, "", {"Some input param"}}}}},
.outputs = {OutputSpec{{"b1"}, "TST", "B1"}},
.algorithm = simplePipe("b1", 1000)};
DataProcessorSpec c{.name = "C",
.inputs = {InputSpec{"x", "TST", "A2"}},
.outputs = {OutputSpec{{"c1"}, "TST", "C1"}},
.algorithm = simplePipe("c1", 2000)};
DataProcessorSpec d{.name = "D",
.inputs = {InputSpec{"a", "TST", "A1"},
InputSpec{"b", "TST", "B1"},
InputSpec{"c", "TST", "C1"}},
.algorithm = AlgorithmSpec{adaptStateless(
[](InputRecord& inputs) {
auto ref = inputs.get("b");
auto header = o2::header::get<const DataProcessingHeader*>(ref.header);
LOG(info) << "Start time: " << header->startTime;
})}};

return workflow::concat(WorkflowSpec{a},
workflow::combine("combined", {b, c}, false),
WorkflowSpec{d});
}