Skip to content

Commit

Permalink
DPL: introduce a --data-processing-timeout
Browse files Browse the repository at this point in the history
Used to force stop the data processing and start the calibration.
In case it is larger than the exit-transition-timeout, the two will be coalesced
into a single transition.

For the time being the only side effect is to set the DeviceState::allowedProcessing and
to terminate the source devices earlier than the data processing ones.

Next PR will make sure that only calibrations are treated when DeviceState::allowedProcessing is set to ProcessingType::Calibration.
  • Loading branch information
ktf committed Sep 27, 2024
1 parent c5fbdd2 commit e76d45b
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/DeviceContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ struct ComputingQuotaStats;
struct DeviceContext {
ComputingQuotaStats* quotaStats = nullptr;
uv_timer_t* gracePeriodTimer = nullptr;
uv_timer_t* dataProcessingGracePeriodTimer = nullptr;
uv_signal_t* sigusr1Handle = nullptr;
int expectedRegionCallbacks = 0;
int exitTransitionTimeout = 0;
int dataProcessingTimeout = 0;
};

} // namespace o2::framework
Expand Down
10 changes: 10 additions & 0 deletions Framework/Core/include/Framework/DeviceState.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,18 @@ struct DeviceState {
STREAM_CONTEXT_LOG = 1 << 4, // Log for the StreamContext callbacks
};

enum ProcessingType : int {
Any, // Any kind of processing is allowed
CalibrationOnly, // Only calibrations are allowed to be processed / produced
};

std::vector<InputChannelInfo> inputChannelInfos;
StreamingState streaming = StreamingState::Streaming;
// What kind of processing is allowed. By default we allow any.
// If we are past the data processing timeout, this will be
// CalibrationOnly. We need to reset it at every start.
ProcessingType allowedProcessing = ProcessingType::Any;

bool quitRequested = false;
std::atomic<int64_t> cleanupCount = -1;

Expand Down
43 changes: 42 additions & 1 deletion Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,28 @@ void on_transition_requested_expired(uv_timer_t* handle)
if (hasOnlyGenerated(spec)) {
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting.");
} else {
O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "Grace period for data / calibration expired. Exiting.");
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for %{public}s expired. Exiting.",
state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration");
}
state.transitionHandling = TransitionHandlingState::Expired;
}

void on_data_processing_expired(uv_timer_t* handle)
{
auto* ref = (ServiceRegistryRef*)handle->data;
auto& state = ref->get<DeviceState>();
state.loopReason |= DeviceState::TIMER_EXPIRED;

// Check if this is a source device
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);

// Source devices should never end up in this callback, since the exitTransitionTimeout should
// be reset to the dataProcessingTimeout and the timers cohalesced.
assert(hasOnlyGenerated(ref->get<DeviceSpec const>()) == false);
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards.");
state.allowedProcessing = DeviceState::CalibrationOnly;
}

void on_communication_requested(uv_async_t* s)
{
auto* state = (DeviceState*)s->data;
Expand Down Expand Up @@ -949,6 +966,10 @@ void DataProcessingDevice::startPollers()
deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
uv_timer_init(state.loop, deviceContext.gracePeriodTimer);

deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
}

void DataProcessingDevice::stopPollers()
Expand Down Expand Up @@ -980,6 +1001,11 @@ void DataProcessingDevice::stopPollers()
delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
free(deviceContext.gracePeriodTimer);
deviceContext.gracePeriodTimer = nullptr;

uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data;
free(deviceContext.dataProcessingGracePeriodTimer);
deviceContext.dataProcessingGracePeriodTimer = nullptr;
}

void DataProcessingDevice::InitTask()
Expand Down Expand Up @@ -1015,6 +1041,7 @@ void DataProcessingDevice::InitTask()

deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));
deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));

for (auto& channel : GetChannels()) {
channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext,
Expand Down Expand Up @@ -1209,6 +1236,7 @@ void DataProcessingDevice::PreRun()
O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback.");
state.quitRequested = false;
state.streaming = StreamingState::Streaming;
state.allowedProcessing = DeviceState::Any;
for (auto& info : state.inputChannelInfos) {
if (info.state != InputChannelState::Pull) {
info.state = InputChannelState::Running;
Expand Down Expand Up @@ -1339,6 +1367,19 @@ void DataProcessingDevice::Run()
state.streaming = StreamingState::EndOfStreaming;
}

// If this is a source device, dataTransitionTimeout and dataProcessingTimeout are effectively
// the same (because source devices are not allowed to produce any calibration).
// should be the same.
if (hasOnlyGenerated(spec) && deviceContext.dataProcessingTimeout > 0) {
deviceContext.exitTransitionTimeout = deviceContext.dataProcessingTimeout;
}

// We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout
if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) {
uv_update_time(state.loop);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0);
}
if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
state.transitionHandling = TransitionHandlingState::Requested;
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
realOdesc.add_options()("child-driver", bpo::value<std::string>());
realOdesc.add_options()("rate", bpo::value<std::string>());
realOdesc.add_options()("exit-transition-timeout", bpo::value<std::string>());
realOdesc.add_options()("data-processing-timeout", bpo::value<std::string>());
realOdesc.add_options()("expected-region-callbacks", bpo::value<std::string>());
realOdesc.add_options()("timeframes-rate-limit", bpo::value<std::string>());
realOdesc.add_options()("environment", bpo::value<std::string>());
Expand Down Expand Up @@ -1723,6 +1724,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
("control-port", bpo::value<std::string>(), "Utility port to be used by O2 Control") //
("rate", bpo::value<std::string>(), "rate for a data source device (Hz)") //
("exit-transition-timeout", bpo::value<std::string>(), "timeout before switching to READY state") //
("data-processing-timeout", bpo::value<std::string>(), "timeout after which only calibration can happen") //
("expected-region-callbacks", bpo::value<std::string>(), "region callbacks to expect before starting") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframes can be in fly") //
("shm-monitor", bpo::value<std::string>(), "whether to use the shared memory monitor") //
Expand Down
9 changes: 8 additions & 1 deletion Framework/Core/src/O2ControlHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ void dumpCommand(std::ostream& dumpOut, const DeviceExecution& execution, std::s
dumpOut << indLevel << indScheme << "- \"-b\"\n";
dumpOut << indLevel << indScheme << "- \"--exit-transition-timeout\"\n";
dumpOut << indLevel << indScheme << "- \"'{{ exit_transition_timeout }}'\"\n";
dumpOut << indLevel << indScheme << "- \"--data-processing-timeout\"\n";
dumpOut << indLevel << indScheme << "- \"'{{ data_processing_timeout }}'\"\n";
dumpOut << indLevel << indScheme << "- \"--monitoring-backend\"\n";
dumpOut << indLevel << indScheme << "- \"'{{ monitoring_dpl_url }}'\"\n";
dumpOut << indLevel << indScheme << "- \"--session\"\n";
Expand Down Expand Up @@ -393,15 +395,20 @@ void dumpTask(std::ostream& dumpOut, const DeviceSpec& spec, const DeviceExecuti
dumpOut << indLevel << "defaults:\n";
dumpOut << indLevel << indScheme << "log_task_stdout: none\n";
dumpOut << indLevel << indScheme << "log_task_stderr: none\n";
std::string exitTransitionTimeout = "15";
std::string exitTransitionTimeout = "15"; // Allow 15 seconds to finish processing and calibrations
std::string dataProcessingTimeout = "10"; // Allow only ten seconds to finish processing
if (execution.args.size() > 2) {
for (size_t i = 0; i < execution.args.size() - 1; ++i) {
if (strcmp(execution.args[i], "--exit-transition-timeout") == 0) {
exitTransitionTimeout = execution.args[i + 1];
}
if (strcmp(execution.args[i], "--data-processing-timeout") == 0) {
dataProcessingTimeout = execution.args[i + 1];
}
}
}
dumpOut << indLevel << indScheme << "exit_transition_timeout: " << exitTransitionTimeout << "\n";
dumpOut << indLevel << indScheme << "data_processing_timeout: " << dataProcessingTimeout << "\n";

if (bfs::path(execution.args[0]).filename().string() != execution.args[0]) {
LOG(warning) << "The workflow template generation was started with absolute or relative executables paths."
Expand Down
20 changes: 11 additions & 9 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
// declared in the workflow definition are allowed.
runner.AddHook<fair::mq::hooks::SetCustomCmdLineOptions>([&spec, driverConfig, defaultDriverClient](fair::mq::DeviceRunner& r) {
std::string defaultExitTransitionTimeout = "0";
std::string defaultDataProcessingTimeout = "0";
std::string defaultInfologgerMode = "";
o2::framework::DeploymentMode deploymentMode = o2::framework::DefaultsHelpers::deploymentMode();
if (deploymentMode == o2::framework::DeploymentMode::OnlineDDS) {
Expand All @@ -1047,15 +1048,16 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
boost::program_options::options_description optsDesc;
ConfigParamsHelper::populateBoostProgramOptions(optsDesc, spec.options, gHiddenDeviceOptions);
char const* defaultSignposts = getenv("DPL_SIGNPOSTS");
optsDesc.add_options()("monitoring-backend", bpo::value<std::string>()->default_value("default"), "monitoring backend info") //
("driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") //
("infologger-severity", bpo::value<std::string>()->default_value(""), "minimum FairLogger severity to send to InfoLogger") //
("dpl-tracing-flags", bpo::value<std::string>()->default_value(""), "pipe `|` separate list of events to be traced") //
("signposts", bpo::value<std::string>()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") //
("expected-region-callbacks", bpo::value<std::string>()->default_value("0"), "how many region callbacks we are expecting") //
("exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") //
("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
optsDesc.add_options()("monitoring-backend", bpo::value<std::string>()->default_value("default"), "monitoring backend info") //
("driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") //
("infologger-severity", bpo::value<std::string>()->default_value(""), "minimum FairLogger severity to send to InfoLogger") //
("dpl-tracing-flags", bpo::value<std::string>()->default_value(""), "pipe `|` separate list of events to be traced") //
("signposts", bpo::value<std::string>()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") //
("expected-region-callbacks", bpo::value<std::string>()->default_value("0"), "how many region callbacks we are expecting") //
("exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") //
("data-processing-timeout", bpo::value<std::string>()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") //
("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
("infologger-mode", bpo::value<std::string>()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override");
r.fConfig.AddToCmdLineOptions(optsDesc, true);
});
Expand Down
14 changes: 12 additions & 2 deletions Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
#include "../src/DeviceSpecHelpers.h"
#include "../src/SimpleResourceManager.h"
#include "../src/ComputingResourceHelpers.h"
#include "Framework/DataAllocator.h"
#include "Framework/DeviceControl.h"
#include "Framework/DeviceSpec.h"
#include "Framework/ProcessingContext.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/DriverConfig.h"
#include "Framework/O2ControlParameters.h"
Expand Down Expand Up @@ -141,6 +139,7 @@ const std::vector expectedTasks{
log_task_stdout: none
log_task_stderr: none
exit_transition_timeout: 15
data_processing_timeout: 10
_module_cmdline: >-
source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin &&
{{ dpl_command }} | bcsadc/foo
Expand Down Expand Up @@ -173,6 +172,8 @@ const std::vector expectedTasks{
- "-b"
- "--exit-transition-timeout"
- "'{{ exit_transition_timeout }}'"
- "--data-processing-timeout"
- "'{{ data_processing_timeout }}'"
- "--monitoring-backend"
- "'{{ monitoring_dpl_url }}'"
- "--session"
Expand Down Expand Up @@ -236,6 +237,7 @@ const std::vector expectedTasks{
log_task_stdout: none
log_task_stderr: none
exit_transition_timeout: 15
data_processing_timeout: 10
_module_cmdline: >-
source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin &&
{{ dpl_command }} | foo
Expand Down Expand Up @@ -270,6 +272,8 @@ const std::vector expectedTasks{
- "-b"
- "--exit-transition-timeout"
- "'{{ exit_transition_timeout }}'"
- "--data-processing-timeout"
- "'{{ data_processing_timeout }}'"
- "--monitoring-backend"
- "'{{ monitoring_dpl_url }}'"
- "--session"
Expand Down Expand Up @@ -333,6 +337,7 @@ const std::vector expectedTasks{
log_task_stdout: none
log_task_stderr: none
exit_transition_timeout: 15
data_processing_timeout: 10
_module_cmdline: >-
source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin &&
{{ dpl_command }} | foo
Expand Down Expand Up @@ -367,6 +372,8 @@ const std::vector expectedTasks{
- "-b"
- "--exit-transition-timeout"
- "'{{ exit_transition_timeout }}'"
- "--data-processing-timeout"
- "'{{ data_processing_timeout }}'"
- "--monitoring-backend"
- "'{{ monitoring_dpl_url }}'"
- "--session"
Expand Down Expand Up @@ -430,6 +437,7 @@ const std::vector expectedTasks{
log_task_stdout: none
log_task_stderr: none
exit_transition_timeout: 15
data_processing_timeout: 10
_module_cmdline: >-
source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin &&
{{ dpl_command }} | foo
Expand Down Expand Up @@ -461,6 +469,8 @@ const std::vector expectedTasks{
- "-b"
- "--exit-transition-timeout"
- "'{{ exit_transition_timeout }}'"
- "--data-processing-timeout"
- "'{{ data_processing_timeout }}'"
- "--monitoring-backend"
- "'{{ monitoring_dpl_url }}'"
- "--session"
Expand Down

0 comments on commit e76d45b

Please sign in to comment.