diff --git a/Framework/Core/include/Framework/DeviceContext.h b/Framework/Core/include/Framework/DeviceContext.h index 3777e7f608b75..4593e5e819ccf 100644 --- a/Framework/Core/include/Framework/DeviceContext.h +++ b/Framework/Core/include/Framework/DeviceContext.h @@ -28,9 +28,11 @@ struct ComputingQuotaStats; struct DeviceContext { ComputingQuotaStats* quotaStats = nullptr; uv_timer_t* gracePeriodTimer = nullptr; + uv_timer_t* dataProcessingGracePeriodTimer = nullptr; uv_signal_t* sigusr1Handle = nullptr; int expectedRegionCallbacks = 0; int exitTransitionTimeout = 0; + int dataProcessingTimeout = 0; }; } // namespace o2::framework diff --git a/Framework/Core/include/Framework/DeviceState.h b/Framework/Core/include/Framework/DeviceState.h index f6d863064ee66..5644d85a904d4 100644 --- a/Framework/Core/include/Framework/DeviceState.h +++ b/Framework/Core/include/Framework/DeviceState.h @@ -68,8 +68,18 @@ struct DeviceState { STREAM_CONTEXT_LOG = 1 << 4, // Log for the StreamContext callbacks }; + enum ProcessingType : int { + Any, // Any kind of processing is allowed + CalibrationOnly, // Only calibrations are allowed to be processed / produced + }; + std::vector inputChannelInfos; StreamingState streaming = StreamingState::Streaming; + // What kind of processing is allowed. By default we allow any. + // If we are past the data processing timeout, this will be + // CalibrationOnly. We need to reset it at every start. + ProcessingType allowedProcessing = ProcessingType::Any; + bool quitRequested = false; std::atomic cleanupCount = -1; diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 169613b18a2ee..fe594bdeb7ed1 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -142,11 +142,28 @@ void on_transition_requested_expired(uv_timer_t* handle) if (hasOnlyGenerated(spec)) { O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting."); } else { - O2_SIGNPOST_EVENT_EMIT_WARN(calibration, cid, "callback", "Grace period for data / calibration expired. Exiting."); + O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for %{public}s expired. Exiting.", + state.allowedProcessing == DeviceState::CalibrationOnly ? "calibration" : "data & calibration"); } state.transitionHandling = TransitionHandlingState::Expired; } +void on_data_processing_expired(uv_timer_t* handle) +{ + auto* ref = (ServiceRegistryRef*)handle->data; + auto& state = ref->get(); + state.loopReason |= DeviceState::TIMER_EXPIRED; + + // Check if this is a source device + O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle); + + // Source devices should never end up in this callback, since the exitTransitionTimeout should + // be reset to the dataProcessingTimeout and the timers cohalesced. + assert(hasOnlyGenerated(ref->get()) == false); + O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for data processing expired. Only calibrations from this point onwards."); + state.allowedProcessing = DeviceState::CalibrationOnly; +} + void on_communication_requested(uv_async_t* s) { auto* state = (DeviceState*)s->data; @@ -949,6 +966,10 @@ void DataProcessingDevice::startPollers() deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry); uv_timer_init(state.loop, deviceContext.gracePeriodTimer); + + deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); + deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry); + uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer); } void DataProcessingDevice::stopPollers() @@ -980,6 +1001,11 @@ void DataProcessingDevice::stopPollers() delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data; free(deviceContext.gracePeriodTimer); deviceContext.gracePeriodTimer = nullptr; + + uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer); + delete (ServiceRegistryRef*)deviceContext.dataProcessingGracePeriodTimer->data; + free(deviceContext.dataProcessingGracePeriodTimer); + deviceContext.dataProcessingGracePeriodTimer = nullptr; } void DataProcessingDevice::InitTask() @@ -1015,6 +1041,7 @@ void DataProcessingDevice::InitTask() deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue("expected-region-callbacks")); deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue("exit-transition-timeout")); + deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue("data-processing-timeout")); for (auto& channel : GetChannels()) { channel.second.at(0).Transport()->SubscribeToRegionEvents([&context = deviceContext, @@ -1209,6 +1236,7 @@ void DataProcessingDevice::PreRun() O2_SIGNPOST_START(device, cid, "PreRun", "Entering PreRun callback."); state.quitRequested = false; state.streaming = StreamingState::Streaming; + state.allowedProcessing = DeviceState::Any; for (auto& info : state.inputChannelInfos) { if (info.state != InputChannelState::Pull) { info.state = InputChannelState::Running; @@ -1339,6 +1367,19 @@ void DataProcessingDevice::Run() state.streaming = StreamingState::EndOfStreaming; } + // If this is a source device, dataTransitionTimeout and dataProcessingTimeout are effectively + // the same (because source devices are not allowed to produce any calibration). + // should be the same. + if (hasOnlyGenerated(spec) && deviceContext.dataProcessingTimeout > 0) { + deviceContext.exitTransitionTimeout = deviceContext.dataProcessingTimeout; + } + + // We do not do anything in particular if the data processing timeout would go past the exitTransitionTimeout + if (deviceContext.dataProcessingTimeout > 0 && deviceContext.dataProcessingTimeout < deviceContext.exitTransitionTimeout) { + uv_update_time(state.loop); + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout); + uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_expired, deviceContext.dataProcessingTimeout * 1000, 0); + } if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) { state.transitionHandling = TransitionHandlingState::Requested; ref.get().call(ServiceRegistryRef{ref}); diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index 8076242bd274e..d1fad2bb66f8b 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1537,6 +1537,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, realOdesc.add_options()("child-driver", bpo::value()); realOdesc.add_options()("rate", bpo::value()); realOdesc.add_options()("exit-transition-timeout", bpo::value()); + realOdesc.add_options()("data-processing-timeout", bpo::value()); realOdesc.add_options()("expected-region-callbacks", bpo::value()); realOdesc.add_options()("timeframes-rate-limit", bpo::value()); realOdesc.add_options()("environment", bpo::value()); @@ -1723,6 +1724,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("control-port", bpo::value(), "Utility port to be used by O2 Control") // ("rate", bpo::value(), "rate for a data source device (Hz)") // ("exit-transition-timeout", bpo::value(), "timeout before switching to READY state") // + ("data-processing-timeout", bpo::value(), "timeout after which only calibration can happen") // ("expected-region-callbacks", bpo::value(), "region callbacks to expect before starting") // ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframes can be in fly") // ("shm-monitor", bpo::value(), "whether to use the shared memory monitor") // diff --git a/Framework/Core/src/O2ControlHelpers.cxx b/Framework/Core/src/O2ControlHelpers.cxx index ad539dc4af03f..46b1eb12ea26a 100644 --- a/Framework/Core/src/O2ControlHelpers.cxx +++ b/Framework/Core/src/O2ControlHelpers.cxx @@ -262,6 +262,8 @@ void dumpCommand(std::ostream& dumpOut, const DeviceExecution& execution, std::s dumpOut << indLevel << indScheme << "- \"-b\"\n"; dumpOut << indLevel << indScheme << "- \"--exit-transition-timeout\"\n"; dumpOut << indLevel << indScheme << "- \"'{{ exit_transition_timeout }}'\"\n"; + dumpOut << indLevel << indScheme << "- \"--data-processing-timeout\"\n"; + dumpOut << indLevel << indScheme << "- \"'{{ data_processing_timeout }}'\"\n"; dumpOut << indLevel << indScheme << "- \"--monitoring-backend\"\n"; dumpOut << indLevel << indScheme << "- \"'{{ monitoring_dpl_url }}'\"\n"; dumpOut << indLevel << indScheme << "- \"--session\"\n"; @@ -393,15 +395,20 @@ void dumpTask(std::ostream& dumpOut, const DeviceSpec& spec, const DeviceExecuti dumpOut << indLevel << "defaults:\n"; dumpOut << indLevel << indScheme << "log_task_stdout: none\n"; dumpOut << indLevel << indScheme << "log_task_stderr: none\n"; - std::string exitTransitionTimeout = "15"; + std::string exitTransitionTimeout = "15"; // Allow 15 seconds to finish processing and calibrations + std::string dataProcessingTimeout = "10"; // Allow only ten seconds to finish processing if (execution.args.size() > 2) { for (size_t i = 0; i < execution.args.size() - 1; ++i) { if (strcmp(execution.args[i], "--exit-transition-timeout") == 0) { exitTransitionTimeout = execution.args[i + 1]; } + if (strcmp(execution.args[i], "--data-processing-timeout") == 0) { + dataProcessingTimeout = execution.args[i + 1]; + } } } dumpOut << indLevel << indScheme << "exit_transition_timeout: " << exitTransitionTimeout << "\n"; + dumpOut << indLevel << indScheme << "data_processing_timeout: " << dataProcessingTimeout << "\n"; if (bfs::path(execution.args[0]).filename().string() != execution.args[0]) { LOG(warning) << "The workflow template generation was started with absolute or relative executables paths." diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 7d298998d0563..0cf775a33de18 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1036,6 +1036,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, // declared in the workflow definition are allowed. runner.AddHook([&spec, driverConfig, defaultDriverClient](fair::mq::DeviceRunner& r) { std::string defaultExitTransitionTimeout = "0"; + std::string defaultDataProcessingTimeout = "0"; std::string defaultInfologgerMode = ""; o2::framework::DeploymentMode deploymentMode = o2::framework::DefaultsHelpers::deploymentMode(); if (deploymentMode == o2::framework::DeploymentMode::OnlineDDS) { @@ -1047,15 +1048,16 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, boost::program_options::options_description optsDesc; ConfigParamsHelper::populateBoostProgramOptions(optsDesc, spec.options, gHiddenDeviceOptions); char const* defaultSignposts = getenv("DPL_SIGNPOSTS"); - optsDesc.add_options()("monitoring-backend", bpo::value()->default_value("default"), "monitoring backend info") // - ("driver-client-backend", bpo::value()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") // - ("infologger-severity", bpo::value()->default_value(""), "minimum FairLogger severity to send to InfoLogger") // - ("dpl-tracing-flags", bpo::value()->default_value(""), "pipe `|` separate list of events to be traced") // - ("signposts", bpo::value()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") // - ("expected-region-callbacks", bpo::value()->default_value("0"), "how many region callbacks we are expecting") // - ("exit-transition-timeout", bpo::value()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") // - ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") // - ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // + optsDesc.add_options()("monitoring-backend", bpo::value()->default_value("default"), "monitoring backend info") // + ("driver-client-backend", bpo::value()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") // + ("infologger-severity", bpo::value()->default_value(""), "minimum FairLogger severity to send to InfoLogger") // + ("dpl-tracing-flags", bpo::value()->default_value(""), "pipe `|` separate list of events to be traced") // + ("signposts", bpo::value()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") // + ("expected-region-callbacks", bpo::value()->default_value("0"), "how many region callbacks we are expecting") // + ("exit-transition-timeout", bpo::value()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") // + ("data-processing-timeout", bpo::value()->default_value(defaultDataProcessingTimeout), "how many second to wait before stopping data processing and allowing data calibration") // + ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") // + ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // ("infologger-mode", bpo::value()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override"); r.fConfig.AddToCmdLineOptions(optsDesc, true); }); diff --git a/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx b/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx index 4cda5fd76336a..d5f402aa16caa 100644 --- a/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx +++ b/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx @@ -15,10 +15,8 @@ #include "../src/DeviceSpecHelpers.h" #include "../src/SimpleResourceManager.h" #include "../src/ComputingResourceHelpers.h" -#include "Framework/DataAllocator.h" #include "Framework/DeviceControl.h" #include "Framework/DeviceSpec.h" -#include "Framework/ProcessingContext.h" #include "Framework/WorkflowSpec.h" #include "Framework/DriverConfig.h" #include "Framework/O2ControlParameters.h" @@ -141,6 +139,7 @@ const std::vector expectedTasks{ log_task_stdout: none log_task_stderr: none exit_transition_timeout: 15 + data_processing_timeout: 10 _module_cmdline: >- source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin && {{ dpl_command }} | bcsadc/foo @@ -173,6 +172,8 @@ const std::vector expectedTasks{ - "-b" - "--exit-transition-timeout" - "'{{ exit_transition_timeout }}'" + - "--data-processing-timeout" + - "'{{ data_processing_timeout }}'" - "--monitoring-backend" - "'{{ monitoring_dpl_url }}'" - "--session" @@ -236,6 +237,7 @@ const std::vector expectedTasks{ log_task_stdout: none log_task_stderr: none exit_transition_timeout: 15 + data_processing_timeout: 10 _module_cmdline: >- source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin && {{ dpl_command }} | foo @@ -270,6 +272,8 @@ const std::vector expectedTasks{ - "-b" - "--exit-transition-timeout" - "'{{ exit_transition_timeout }}'" + - "--data-processing-timeout" + - "'{{ data_processing_timeout }}'" - "--monitoring-backend" - "'{{ monitoring_dpl_url }}'" - "--session" @@ -333,6 +337,7 @@ const std::vector expectedTasks{ log_task_stdout: none log_task_stderr: none exit_transition_timeout: 15 + data_processing_timeout: 10 _module_cmdline: >- source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin && {{ dpl_command }} | foo @@ -367,6 +372,8 @@ const std::vector expectedTasks{ - "-b" - "--exit-transition-timeout" - "'{{ exit_transition_timeout }}'" + - "--data-processing-timeout" + - "'{{ data_processing_timeout }}'" - "--monitoring-backend" - "'{{ monitoring_dpl_url }}'" - "--session" @@ -430,6 +437,7 @@ const std::vector expectedTasks{ log_task_stdout: none log_task_stderr: none exit_transition_timeout: 15 + data_processing_timeout: 10 _module_cmdline: >- source /etc/profile.d/modules.sh && MODULEPATH={{ modulepath }} module load O2 QualityControl Control-OCCPlugin && {{ dpl_command }} | foo @@ -461,6 +469,8 @@ const std::vector expectedTasks{ - "-b" - "--exit-transition-timeout" - "'{{ exit_transition_timeout }}'" + - "--data-processing-timeout" + - "'{{ data_processing_timeout }}'" - "--monitoring-backend" - "'{{ monitoring_dpl_url }}'" - "--session"