diff --git a/Framework/Core/src/DataProcessingContext.cxx b/Framework/Core/src/DataProcessingContext.cxx index 86409bf5434bb..394f50064bb37 100644 --- a/Framework/Core/src/DataProcessingContext.cxx +++ b/Framework/Core/src/DataProcessingContext.cxx @@ -11,9 +11,13 @@ #include "Framework/DataProcessingContext.h" #include "Framework/DataProcessorSpec.h" +#include "Framework/EndOfStreamContext.h" +#include "Framework/TimingInfo.h" #include "Framework/Signpost.h" O2_DECLARE_DYNAMIC_LOG(data_processor_context); +O2_DECLARE_DYNAMIC_LOG(calibration); + namespace o2::framework { diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index 57010c27ffb09..7c45cd7e7a707 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -92,6 +92,8 @@ struct formatter : ostream_format O2_DECLARE_DYNAMIC_LOG(device); // Special log to keep track of the lifetime of the parts O2_DECLARE_DYNAMIC_LOG(parts); +// Stream which keeps track of the calibration lifetime logic +O2_DECLARE_DYNAMIC_LOG(calibration); // Special log to track the async queue behavior O2_DECLARE_DYNAMIC_LOG(async_queue); // Special log to track the forwarding requests @@ -131,11 +133,18 @@ bool hasOnlyGenerated(DeviceSpec const& spec) void on_transition_requested_expired(uv_timer_t* handle) { - auto* state = (DeviceState*)handle->data; - state->loopReason |= DeviceState::TIMER_EXPIRED; + auto* ref = (ServiceRegistryRef*)handle->data; + auto& state = ref->get(); + state.loopReason |= DeviceState::TIMER_EXPIRED; + // Check if this is a source device O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle); - O2_SIGNPOST_EVENT_EMIT_WARN(device, cid, "callback", "Exit transition timer expired. Exiting."); - state->transitionHandling = TransitionHandlingState::Expired; + auto& spec = ref->get(); + if (hasOnlyGenerated(spec)) { + O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting."); + } else { + O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "Grace period for data / calibration expired. Exiting."); + } + state.transitionHandling = TransitionHandlingState::Expired; } void on_communication_requested(uv_async_t* s) @@ -928,7 +937,7 @@ void DataProcessingDevice::startPollers() } deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t)); - deviceContext.gracePeriodTimer->data = &state; + deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry); uv_timer_init(state.loop, deviceContext.gracePeriodTimer); } @@ -958,6 +967,7 @@ void DataProcessingDevice::stopPollers() } uv_timer_stop(deviceContext.gracePeriodTimer); + delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data; free(deviceContext.gracePeriodTimer); deviceContext.gracePeriodTimer = nullptr; } @@ -1306,17 +1316,18 @@ void DataProcessingDevice::Run() if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) { state.transitionHandling = TransitionHandlingState::Requested; auto& deviceContext = ref.get(); - auto timeout = deviceContext.exitTransitionTimeout; // Check if we only have timers auto& spec = ref.get(); if (hasOnlyTimers(spec)) { state.streaming = StreamingState::EndOfStreaming; } - if (timeout != 0 && state.streaming != StreamingState::Idle) { + + if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) { state.transitionHandling = TransitionHandlingState::Requested; ref.get().call(ServiceRegistryRef{ref}); uv_update_time(state.loop); - uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, timeout * 1000, 0); + O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", deviceContext.exitTransitionTimeout); + uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0); if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout); } else { @@ -1331,7 +1342,7 @@ void DataProcessingDevice::Run() } else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) { O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy"); } else { - O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "runb_loop", "New state pending and we are already idle, switching to READY immediately."); + O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately."); } } } @@ -1721,6 +1732,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) // We should keep the data generated at end of stream only for those // which are not sources. timingInfo.keepAtEndOfStream = shouldProcess; + O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream); EndOfStreamContext eosContext{*context.registry, ref.get()}; @@ -2348,7 +2360,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v *context.registry}; ProcessingContext processContext{record, ref, ref.get()}; { - O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Invoking preProcessingCallbacks"); // Notice this should be thread safe and reentrant // as it is called from many threads. streamContext.preProcessingCallbacks(processContext);