diff --git a/Framework/Core/include/Framework/DeviceContext.h b/Framework/Core/include/Framework/DeviceContext.h index 3777e7f608b75..f0c3021d86dc8 100644 --- a/Framework/Core/include/Framework/DeviceContext.h +++ b/Framework/Core/include/Framework/DeviceContext.h @@ -28,8 +28,18 @@ struct ComputingQuotaStats; struct DeviceContext { ComputingQuotaStats* quotaStats = nullptr; uv_timer_t* gracePeriodTimer = nullptr; + uv_timer_t* dataProcessingGracePeriodTimer = nullptr; uv_signal_t* sigusr1Handle = nullptr; int expectedRegionCallbacks = 0; + // The timeout for the data processing to stop on this device. + // After this is reached, incoming data not marked to be kept will + // be dropped and the data processing will be stopped. However the + // calibrations will still be done and objects resulting from calibrations + // will be marked to be kept. + int dataProcessingTimeout = 0; + // The timeout for the whole processing to stop on this device. + // This includes the grace period for processing and the time + // for the calibrations to be done. int exitTransitionTimeout = 0; }; diff --git a/Framework/Core/include/Framework/DeviceStateEnums.h b/Framework/Core/include/Framework/DeviceStateEnums.h index 291faac0ac982..a4c02c70c2bf6 100644 --- a/Framework/Core/include/Framework/DeviceStateEnums.h +++ b/Framework/Core/include/Framework/DeviceStateEnums.h @@ -30,6 +30,8 @@ enum struct TransitionHandlingState { NoTransition, /// A transition was notified to be requested Requested, + /// Only calibrations can be done + DataProcessingExpired, /// A transition needs to be fullfilled ASAP Expired }; diff --git a/Framework/Core/src/DataProcessingContext.cxx b/Framework/Core/src/DataProcessingContext.cxx index 86409bf5434bb..394f50064bb37 100644 --- a/Framework/Core/src/DataProcessingContext.cxx +++ b/Framework/Core/src/DataProcessingContext.cxx @@ -11,9 +11,13 @@ #include "Framework/DataProcessingContext.h" #include "Framework/DataProcessorSpec.h" +#include "Framework/EndOfStreamContext.h" +#include "Framework/TimingInfo.h" #include "Framework/Signpost.h" O2_DECLARE_DYNAMIC_LOG(data_processor_context); +O2_DECLARE_DYNAMIC_LOG(calibration); + namespace o2::framework { diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index e0a0d1aad16e2..4180ba9ff4965 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -92,6 +92,8 @@ struct formatter : ostream_format O2_DECLARE_DYNAMIC_LOG(device); // Special log to keep track of the lifetime of the parts O2_DECLARE_DYNAMIC_LOG(parts); +// Stream which keeps track of the calibration lifetime logic +O2_DECLARE_DYNAMIC_LOG(calibration); // Special log to track the async queue behavior O2_DECLARE_DYNAMIC_LOG(async_queue); // Special log to track the forwarding requests @@ -131,11 +133,28 @@ bool hasOnlyGenerated(DeviceSpec const& spec) void on_transition_requested_expired(uv_timer_t* handle) { - auto* state = (DeviceState*)handle->data; - state->loopReason |= DeviceState::TIMER_EXPIRED; + auto* ref = (ServiceRegistryRef*)handle->data; + auto& state = ref->get(); + state.loopReason |= DeviceState::TIMER_EXPIRED; + // Check if this is a source device + O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle); + auto& spec = ref->get(); + if (hasOnlyGenerated(spec)) { + O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting."); + } else { + O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "Grace period for calibration expired. Exiting."); + } + state.transitionHandling = TransitionHandlingState::Expired; +} + +void on_data_processing_grace_expired(uv_timer_t* handle) +{ + auto* ref = (ServiceRegistryRef*)handle->data; + auto& state = ref->get(); + state.loopReason |= DeviceState::TIMER_EXPIRED; O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle); - O2_SIGNPOST_EVENT_EMIT_WARN(device, cid, "callback", "Exit transition timer expired. Exiting."); - state->transitionHandling = TransitionHandlingState::Expired; + O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for Data Processing expired. Waiting for calibration"); + state.transitionHandling = TransitionHandlingState::DataProcessingExpired; } void on_communication_requested(uv_async_t* s) @@ -928,8 +947,13 @@ void DataProcessingDevice::startPollers() } deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); - deviceContext.gracePeriodTimer->data = &state; + deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry); uv_timer_init(state.loop, deviceContext.gracePeriodTimer); + + // Prepare the timers if needed + deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); + deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry); + uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer); } void DataProcessingDevice::stopPollers() @@ -960,6 +984,10 @@ void DataProcessingDevice::stopPollers() uv_timer_stop(deviceContext.gracePeriodTimer); free(deviceContext.gracePeriodTimer); deviceContext.gracePeriodTimer = nullptr; + + uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer); + free(deviceContext.dataProcessingGracePeriodTimer); + deviceContext.dataProcessingGracePeriodTimer = nullptr; } void DataProcessingDevice::InitTask() @@ -994,6 +1022,7 @@ void DataProcessingDevice::InitTask() } deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue("expected-region-callbacks")); + deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue("data-processing-timeout")); deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue("exit-transition-timeout")); for (auto& channel : GetChannels()) { @@ -1306,17 +1335,28 @@ void DataProcessingDevice::Run() if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) { state.transitionHandling = TransitionHandlingState::Requested; auto& deviceContext = ref.get(); - auto timeout = deviceContext.exitTransitionTimeout; // Check if we only have timers auto& spec = ref.get(); if (hasOnlyTimers(spec)) { state.streaming = StreamingState::EndOfStreaming; } - if (timeout != 0 && state.streaming != StreamingState::Idle) { + + if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) { state.transitionHandling = TransitionHandlingState::Requested; ref.get().call(ServiceRegistryRef{ref}); uv_update_time(state.loop); - uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, timeout * 1000, 0); + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", deviceContext.exitTransitionTimeout); + uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0); + // In case we have a calibration grace period it will always be longer than the data processing timeout + if (hasOnlyGenerated(spec)) { + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Source devices should never start calibration timer."); + } else if (deviceContext.dataProcessingTimeout != deviceContext.exitTransitionTimeout) { + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout); + uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_grace_expired, deviceContext.dataProcessingTimeout * 1000, 0); + } else { + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "No special timer requested for calibration."); + } + if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout); } else { @@ -1331,12 +1371,12 @@ void DataProcessingDevice::Run() } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy"); } else { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "runb_loop", "New state pending and we are already idle, switching to READY immediately."); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately."); } } } - // If we are Idle, we can then consider the transition to be expired. - if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) { + // If we are Idle, we can then consider the transition to be expired when it was requested or when the data processing timeout expired. + if ((state.transitionHandling == TransitionHandlingState::Requested || state.transitionHandling == TransitionHandlingState::DataProcessingExpired) && state.streaming == StreamingState::Idle) { O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed."); state.transitionHandling = TransitionHandlingState::Expired; } @@ -1702,8 +1742,13 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) *context.wasActive = true; } + auto& deviceContext = ref.get(); + if (state.streaming == StreamingState::EndOfStreaming) { - O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues."); + O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Flushing queues."); + O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "state", "EndOfStreaming reached. %{public}s %{public}s", + uv_timer_get_due_in(deviceContext.gracePeriodTimer) ? fmt::format("grace timer still running for {}", uv_timer_get_due_in(deviceContext.gracePeriodTimer)).c_str() : "Grace timer expired", + uv_timer_get_due_in(deviceContext.dataProcessingGracePeriodTimer) ? fmt::format("data processing timer still running for {}", uv_timer_get_due_in(deviceContext.dataProcessingGracePeriodTimer)).c_str() : "DataProcessingTimer timer expired"); // We keep processing data until we are Idle. // FIXME: not sure this is the correct way to drain the queues, but // I guess we will see. @@ -1721,6 +1766,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) // We should keep the data generated at end of stream only for those // which are not sources. timingInfo.keepAtEndOfStream = shouldProcess; + O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream); EndOfStreamContext eosContext{*context.registry, ref.get()}; @@ -2348,7 +2394,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v *context.registry}; ProcessingContext processContext{record, ref, ref.get()}; { - O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Invoking preProcessingCallbacks"); // Notice this should be thread safe and reentrant // as it is called from many threads. streamContext.preProcessingCallbacks(processContext); diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index b9552dc093115..5218de0828e89 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1515,6 +1515,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, realOdesc.add_options()("child-driver", bpo::value()); realOdesc.add_options()("rate", bpo::value()); realOdesc.add_options()("exit-transition-timeout", bpo::value()); + realOdesc.add_options()("data-processing-timeout", bpo::value()); realOdesc.add_options()("expected-region-callbacks", bpo::value()); realOdesc.add_options()("timeframes-rate-limit", bpo::value()); realOdesc.add_options()("environment", bpo::value()); @@ -1701,6 +1702,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("control-port", bpo::value(), "Utility port to be used by O2 Control") // ("rate", bpo::value(), "rate for a data source device (Hz)") // ("exit-transition-timeout", bpo::value(), "timeout before switching to READY state") // + ("data-processing-timeout", bpo::value(), "timeout before switching to calibration processing mode") // ("expected-region-callbacks", bpo::value(), "region callbacks to expect before starting") // ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframes can be in fly") // ("shm-monitor", bpo::value(), "whether to use the shared memory monitor") // diff --git a/Framework/Core/src/O2ControlHelpers.cxx b/Framework/Core/src/O2ControlHelpers.cxx index ad539dc4af03f..a75799a4d9712 100644 --- a/Framework/Core/src/O2ControlHelpers.cxx +++ b/Framework/Core/src/O2ControlHelpers.cxx @@ -262,6 +262,8 @@ void dumpCommand(std::ostream& dumpOut, const DeviceExecution& execution, std::s dumpOut << indLevel << indScheme << "- \"-b\"\n"; dumpOut << indLevel << indScheme << "- \"--exit-transition-timeout\"\n"; dumpOut << indLevel << indScheme << "- \"'{{ exit_transition_timeout }}'\"\n"; + dumpOut << indLevel << indScheme << "- \"--data-processing-timeout\"\n"; + dumpOut << indLevel << indScheme << "- \"'{{ data_processing_timeout }}'\"\n"; dumpOut << indLevel << indScheme << "- \"--monitoring-backend\"\n"; dumpOut << indLevel << indScheme << "- \"'{{ monitoring_dpl_url }}'\"\n"; dumpOut << indLevel << indScheme << "- \"--session\"\n"; diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 89737483fc813..a89abfba33204 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1037,6 +1037,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, // declared in the workflow definition are allowed. runner.AddHook([&spec, driverConfig, defaultDriverClient](fair::mq::DeviceRunner& r) { std::string defaultExitTransitionTimeout = "0"; + std::string defaultDataProcessingTimeout = "0"; std::string defaultInfologgerMode = ""; o2::framework::DeploymentMode deploymentMode = o2::framework::DefaultsHelpers::deploymentMode(); if (deploymentMode == o2::framework::DeploymentMode::OnlineDDS) { @@ -1055,6 +1056,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, ("signposts", bpo::value()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") // ("expected-region-callbacks", bpo::value()->default_value("0"), "how many region callbacks we are expecting") // ("exit-transition-timeout", bpo::value()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") // + ("data-processing-timeout", bpo::value()->default_value(defaultDataProcessingTimeout), "how many second to wait before switching from RUN to CALIBRATION") // ("timeframes-rate-limit", bpo::value()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") // ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // ("infologger-mode", bpo::value()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override"); diff --git a/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx b/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx index 4cda5fd76336a..a05ce5f4c3d1a 100644 --- a/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx +++ b/Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx @@ -173,6 +173,8 @@ const std::vector expectedTasks{ - "-b" - "--exit-transition-timeout" - "'{{ exit_transition_timeout }}'" + - "--data-processing-timeout" + - "'{{ data_processing_timeout }}'" - "--monitoring-backend" - "'{{ monitoring_dpl_url }}'" - "--session" @@ -270,6 +272,8 @@ const std::vector expectedTasks{ - "-b" - "--exit-transition-timeout" - "'{{ exit_transition_timeout }}'" + - "--data-processing-timeout" + - "'{{ data_processing_timeout }}'" - "--monitoring-backend" - "'{{ monitoring_dpl_url }}'" - "--session" @@ -367,6 +371,8 @@ const std::vector expectedTasks{ - "-b" - "--exit-transition-timeout" - "'{{ exit_transition_timeout }}'" + - "--data-processing-timeout" + - "'{{ data_processing_timeout }}'" - "--monitoring-backend" - "'{{ monitoring_dpl_url }}'" - "--session" @@ -461,6 +467,8 @@ const std::vector expectedTasks{ - "-b" - "--exit-transition-timeout" - "'{{ exit_transition_timeout }}'" + - "--data-processing-timeout" + - "'{{ data_processing_timeout }}'" - "--monitoring-backend" - "'{{ monitoring_dpl_url }}'" - "--session" diff --git a/Framework/TestWorkflows/src/o2DummyCalibrationWorkflow.cxx b/Framework/TestWorkflows/src/o2DummyCalibrationWorkflow.cxx index 94916cfa36984..0ece9da5aa7d0 100644 --- a/Framework/TestWorkflows/src/o2DummyCalibrationWorkflow.cxx +++ b/Framework/TestWorkflows/src/o2DummyCalibrationWorkflow.cxx @@ -37,8 +37,9 @@ WorkflowSpec defineDataProcessing(ConfigContext const& specs) static int counter = 0; auto& aData = outputs.make(OutputRef{"counter"}); aData = counter++; - if (counter == 10) { - pcx.services().get().endOfStream(); + sleep(1); + if (counter == 5) { + pcx.services().get().readyToQuit(QuitRequest::All); } })}, }; @@ -57,6 +58,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& specs) callbacks.set(eosCallback); return adaptStateless([](Input<"x", int> const& x) { + sleep(1); sum += x; std::cout << "Sum: " << sum << std::endl; }); })};