Skip to content

Commit

Permalink
DPL: improve exitTransitionTimeout handling (#13331)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf authored Jul 26, 2024
1 parent 5ba7ba4 commit 8d21174
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
4 changes: 4 additions & 0 deletions Framework/Core/src/DataProcessingContext.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@

#include "Framework/DataProcessingContext.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/EndOfStreamContext.h"
#include "Framework/TimingInfo.h"
#include "Framework/Signpost.h"

O2_DECLARE_DYNAMIC_LOG(data_processor_context);
O2_DECLARE_DYNAMIC_LOG(calibration);

namespace o2::framework
{

Expand Down
31 changes: 21 additions & 10 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ struct formatter<o2::framework::CompletionPolicy::CompletionOp> : ostream_format
O2_DECLARE_DYNAMIC_LOG(device);
// Special log to keep track of the lifetime of the parts
O2_DECLARE_DYNAMIC_LOG(parts);
// Stream which keeps track of the calibration lifetime logic
O2_DECLARE_DYNAMIC_LOG(calibration);
// Special log to track the async queue behavior
O2_DECLARE_DYNAMIC_LOG(async_queue);
// Special log to track the forwarding requests
Expand Down Expand Up @@ -131,11 +133,18 @@ bool hasOnlyGenerated(DeviceSpec const& spec)

void on_transition_requested_expired(uv_timer_t* handle)
{
auto* state = (DeviceState*)handle->data;
state->loopReason |= DeviceState::TIMER_EXPIRED;
auto* ref = (ServiceRegistryRef*)handle->data;
auto& state = ref->get<DeviceState>();
state.loopReason |= DeviceState::TIMER_EXPIRED;
// Check if this is a source device
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
O2_SIGNPOST_EVENT_EMIT_WARN(device, cid, "callback", "Exit transition timer expired. Exiting.");
state->transitionHandling = TransitionHandlingState::Expired;
auto& spec = ref->get<DeviceSpec const>();
if (hasOnlyGenerated(spec)) {
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting.");
} else {
O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "Grace period for data / calibration expired. Exiting.");
}
state.transitionHandling = TransitionHandlingState::Expired;
}

void on_communication_requested(uv_async_t* s)
Expand Down Expand Up @@ -928,7 +937,7 @@ void DataProcessingDevice::startPollers()
}

deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
deviceContext.gracePeriodTimer->data = &state;
deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
}

Expand Down Expand Up @@ -958,6 +967,7 @@ void DataProcessingDevice::stopPollers()
}

uv_timer_stop(deviceContext.gracePeriodTimer);
delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
free(deviceContext.gracePeriodTimer);
deviceContext.gracePeriodTimer = nullptr;
}
Expand Down Expand Up @@ -1306,17 +1316,18 @@ void DataProcessingDevice::Run()
if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
state.transitionHandling = TransitionHandlingState::Requested;
auto& deviceContext = ref.get<DeviceContext>();
auto timeout = deviceContext.exitTransitionTimeout;
// Check if we only have timers
auto& spec = ref.get<DeviceSpec const>();
if (hasOnlyTimers(spec)) {
state.streaming = StreamingState::EndOfStreaming;
}
if (timeout != 0 && state.streaming != StreamingState::Idle) {

if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
state.transitionHandling = TransitionHandlingState::Requested;
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
uv_update_time(state.loop);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, timeout * 1000, 0);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", deviceContext.exitTransitionTimeout);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout);
} else {
Expand All @@ -1331,7 +1342,7 @@ void DataProcessingDevice::Run()
} else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "runb_loop", "New state pending and we are already idle, switching to READY immediately.");
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
}
}
}
Expand Down Expand Up @@ -1721,6 +1732,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
// We should keep the data generated at end of stream only for those
// which are not sources.
timingInfo.keepAtEndOfStream = shouldProcess;
O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);

EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};

Expand Down Expand Up @@ -2348,7 +2360,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
*context.registry};
ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
{
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Invoking preProcessingCallbacks");
// Notice this should be thread safe and reentrant
// as it is called from many threads.
streamContext.preProcessingCallbacks(processContext);
Expand Down

0 comments on commit 8d21174

Please sign in to comment.