Skip to content

Commit

Permalink
DPL: improve exitTransitionTimeout handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Jul 25, 2024
1 parent cbf54d0 commit 9417227
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
4 changes: 4 additions & 0 deletions Framework/Core/src/DataProcessingContext.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@

#include "Framework/DataProcessingContext.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/EndOfStreamContext.h"
#include "Framework/TimingInfo.h"
#include "Framework/Signpost.h"

O2_DECLARE_DYNAMIC_LOG(data_processor_context);
O2_DECLARE_DYNAMIC_LOG(calibration);

namespace o2::framework
{

Expand Down
30 changes: 22 additions & 8 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ struct formatter<o2::framework::CompletionPolicy::CompletionOp> : ostream_format
O2_DECLARE_DYNAMIC_LOG(device);
// Special log to keep track of the lifetime of the parts
O2_DECLARE_DYNAMIC_LOG(parts);
// Stream which keeps track of the calibration lifetime logic
O2_DECLARE_DYNAMIC_LOG(calibration);
// Special log to track the async queue behavior
O2_DECLARE_DYNAMIC_LOG(async_queue);
// Special log to track the forwarding requests
Expand Down Expand Up @@ -131,8 +133,18 @@ bool hasOnlyGenerated(DeviceSpec const& spec)

void on_transition_requested_expired(uv_timer_t* handle)
{
auto* state = (DeviceState*)handle->data;
state->loopReason |= DeviceState::TIMER_EXPIRED;
auto* ref = (ServiceRegistryRef*)handle->data;
auto& state = ref->get<DeviceState>();
state.loopReason |= DeviceState::TIMER_EXPIRED;
// Check if this is a source device
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
auto& spec = ref->get<DeviceSpec const>();
if (hasOnlyGenerated(spec)) {
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting.");
} else {
O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "Grace period for data / calibration expired. Exiting.");
}
state.transitionHandling = TransitionHandlingState::Expired;
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
O2_SIGNPOST_EVENT_EMIT_WARN(device, cid, "callback", "Exit transition timer expired. Exiting.");
state->transitionHandling = TransitionHandlingState::Expired;
Expand Down Expand Up @@ -928,7 +940,7 @@ void DataProcessingDevice::startPollers()
}

deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
deviceContext.gracePeriodTimer->data = &state;
deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
uv_timer_init(state.loop, deviceContext.gracePeriodTimer);
}

Expand Down Expand Up @@ -958,6 +970,7 @@ void DataProcessingDevice::stopPollers()
}

uv_timer_stop(deviceContext.gracePeriodTimer);
delete (ServiceRegistryRef*)deviceContext.gracePeriodTimer->data;
free(deviceContext.gracePeriodTimer);
deviceContext.gracePeriodTimer = nullptr;
}
Expand Down Expand Up @@ -1306,17 +1319,18 @@ void DataProcessingDevice::Run()
if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
state.transitionHandling = TransitionHandlingState::Requested;
auto& deviceContext = ref.get<DeviceContext>();
auto timeout = deviceContext.exitTransitionTimeout;
// Check if we only have timers
auto& spec = ref.get<DeviceSpec const>();
if (hasOnlyTimers(spec)) {
state.streaming = StreamingState::EndOfStreaming;
}
if (timeout != 0 && state.streaming != StreamingState::Idle) {

if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
state.transitionHandling = TransitionHandlingState::Requested;
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
uv_update_time(state.loop);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, timeout * 1000, 0);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", deviceContext.exitTransitionTimeout);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout);
} else {
Expand All @@ -1331,7 +1345,7 @@ void DataProcessingDevice::Run()
} else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "runb_loop", "New state pending and we are already idle, switching to READY immediately.");
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
}
}
}
Expand Down Expand Up @@ -1721,6 +1735,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
// We should keep the data generated at end of stream only for those
// which are not sources.
timingInfo.keepAtEndOfStream = shouldProcess;
O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);

EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};

Expand Down Expand Up @@ -2348,7 +2363,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
*context.registry};
ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
{
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Invoking preProcessingCallbacks");
// Notice this should be thread safe and reentrant
// as it is called from many threads.
streamContext.preProcessingCallbacks(processContext);
Expand Down

0 comments on commit 9417227

Please sign in to comment.