Skip to content

Commit

Permalink
DPL: improved calibration mode
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Jul 25, 2024
1 parent cbf54d0 commit 742a0e7
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 15 deletions.
10 changes: 10 additions & 0 deletions Framework/Core/include/Framework/DeviceContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,18 @@ struct ComputingQuotaStats;
struct DeviceContext {
ComputingQuotaStats* quotaStats = nullptr;
uv_timer_t* gracePeriodTimer = nullptr;
uv_timer_t* dataProcessingGracePeriodTimer = nullptr;
uv_signal_t* sigusr1Handle = nullptr;
int expectedRegionCallbacks = 0;
// The timeout for the data processing to stop on this device.
// After this is reached, incoming data not marked to be kept will
// be dropped and the data processing will be stopped. However the
// calibrations will still be done and objects resulting from calibrations
// will be marked to be kept.
int dataProcessingTimeout = 0;
// The timeout for the whole processing to stop on this device.
// This includes the grace period for processing and the time
// for the calibrations to be done.
int exitTransitionTimeout = 0;
};

Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/include/Framework/DeviceStateEnums.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ enum struct TransitionHandlingState {
NoTransition,
/// A transition was notified to be requested
Requested,
/// Only calibrations can be done
DataProcessingExpired,
/// A transition needs to be fullfilled ASAP
Expired
};
Expand Down
4 changes: 4 additions & 0 deletions Framework/Core/src/DataProcessingContext.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@

#include "Framework/DataProcessingContext.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/EndOfStreamContext.h"
#include "Framework/TimingInfo.h"
#include "Framework/Signpost.h"

O2_DECLARE_DYNAMIC_LOG(data_processor_context);
O2_DECLARE_DYNAMIC_LOG(calibration);

namespace o2::framework
{

Expand Down
71 changes: 58 additions & 13 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ struct formatter<o2::framework::CompletionPolicy::CompletionOp> : ostream_format
O2_DECLARE_DYNAMIC_LOG(device);
// Special log to keep track of the lifetime of the parts
O2_DECLARE_DYNAMIC_LOG(parts);
// Stream which keeps track of the calibration lifetime logic
O2_DECLARE_DYNAMIC_LOG(calibration);
// Special log to track the async queue behavior
O2_DECLARE_DYNAMIC_LOG(async_queue);
// Special log to track the forwarding requests
Expand Down Expand Up @@ -131,11 +133,28 @@ bool hasOnlyGenerated(DeviceSpec const& spec)

void on_transition_requested_expired(uv_timer_t* handle)
{
auto* state = (DeviceState*)handle->data;
state->loopReason |= DeviceState::TIMER_EXPIRED;
auto* ref = (ServiceRegistryRef*)handle->data;
auto& state = ref->get<DeviceState>();
state.loopReason |= DeviceState::TIMER_EXPIRED;
// Check if this is a source device
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
auto& spec = ref->get<DeviceSpec const>();
if (hasOnlyGenerated(spec)) {
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for source expired. Exiting.");
} else {
O2_SIGNPOST_EVENT_EMIT_ERROR(calibration, cid, "callback", "Grace period for calibration expired. Exiting.");
}
state.transitionHandling = TransitionHandlingState::Expired;
}

void on_data_processing_grace_expired(uv_timer_t* handle)
{
auto* ref = (ServiceRegistryRef*)handle->data;
auto& state = ref->get<DeviceState>();
state.loopReason |= DeviceState::TIMER_EXPIRED;
O2_SIGNPOST_ID_FROM_POINTER(cid, device, handle);
O2_SIGNPOST_EVENT_EMIT_WARN(device, cid, "callback", "Exit transition timer expired. Exiting.");
state->transitionHandling = TransitionHandlingState::Expired;
O2_SIGNPOST_EVENT_EMIT_INFO(calibration, cid, "callback", "Grace period for Data Processing expired. Waiting for calibration");
state.transitionHandling = TransitionHandlingState::DataProcessingExpired;
}

void on_communication_requested(uv_async_t* s)
Expand Down Expand Up @@ -928,8 +947,13 @@ void DataProcessingDevice::startPollers()
}

deviceContext.gracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
deviceContext.gracePeriodTimer->data = &state;
deviceContext.gracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
uv_timer_init(state.loop, deviceContext.gracePeriodTimer);

// Prepare the timers if needed
deviceContext.dataProcessingGracePeriodTimer = (uv_timer_t*)malloc(sizeof(uv_timer_t));
deviceContext.dataProcessingGracePeriodTimer->data = new ServiceRegistryRef(mServiceRegistry);
uv_timer_init(state.loop, deviceContext.dataProcessingGracePeriodTimer);
}

void DataProcessingDevice::stopPollers()
Expand Down Expand Up @@ -960,6 +984,10 @@ void DataProcessingDevice::stopPollers()
uv_timer_stop(deviceContext.gracePeriodTimer);
free(deviceContext.gracePeriodTimer);
deviceContext.gracePeriodTimer = nullptr;

uv_timer_stop(deviceContext.dataProcessingGracePeriodTimer);
free(deviceContext.dataProcessingGracePeriodTimer);
deviceContext.dataProcessingGracePeriodTimer = nullptr;
}

void DataProcessingDevice::InitTask()
Expand Down Expand Up @@ -994,6 +1022,7 @@ void DataProcessingDevice::InitTask()
}

deviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));
deviceContext.dataProcessingTimeout = std::stoi(fConfig->GetValue<std::string>("data-processing-timeout"));
deviceContext.exitTransitionTimeout = std::stoi(fConfig->GetValue<std::string>("exit-transition-timeout"));

for (auto& channel : GetChannels()) {
Expand Down Expand Up @@ -1306,17 +1335,28 @@ void DataProcessingDevice::Run()
if (state.transitionHandling == TransitionHandlingState::NoTransition && NewStatePending()) {
state.transitionHandling = TransitionHandlingState::Requested;
auto& deviceContext = ref.get<DeviceContext>();
auto timeout = deviceContext.exitTransitionTimeout;
// Check if we only have timers
auto& spec = ref.get<DeviceSpec const>();
if (hasOnlyTimers(spec)) {
state.streaming = StreamingState::EndOfStreaming;
}
if (timeout != 0 && state.streaming != StreamingState::Idle) {

if (deviceContext.exitTransitionTimeout != 0 && state.streaming != StreamingState::Idle) {
state.transitionHandling = TransitionHandlingState::Requested;
ref.get<CallbackService>().call<CallbackService::Id::ExitRequested>(ServiceRegistryRef{ref});
uv_update_time(state.loop);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, timeout * 1000, 0);
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for exitTransitionTimeout.", deviceContext.exitTransitionTimeout);
uv_timer_start(deviceContext.gracePeriodTimer, on_transition_requested_expired, deviceContext.exitTransitionTimeout * 1000, 0);
// In case we have a calibration grace period it will always be longer than the data processing timeout
if (hasOnlyGenerated(spec)) {
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Source devices should never start calibration timer.");
} else if (deviceContext.dataProcessingTimeout != deviceContext.exitTransitionTimeout) {
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "Starting %d s timer for dataProcessingTimeout.", deviceContext.dataProcessingTimeout);
uv_timer_start(deviceContext.dataProcessingGracePeriodTimer, on_data_processing_grace_expired, deviceContext.dataProcessingTimeout * 1000, 0);
} else {
O2_SIGNPOST_EVENT_EMIT(calibration, lid, "timer_setup", "No special timer requested for calibration.");
}

if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state requested. Waiting for %d seconds before quitting.", (int)deviceContext.exitTransitionTimeout);
} else {
Expand All @@ -1331,12 +1371,12 @@ void DataProcessingDevice::Run()
} else if (mProcessingPolicies.termination == TerminationPolicy::QUIT) {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, quitting immediately as per --completion-policy");
} else {
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "runb_loop", "New state pending and we are already idle, switching to READY immediately.");
O2_SIGNPOST_EVENT_EMIT_INFO(device, lid, "run_loop", "New state pending and we are already idle, switching to READY immediately.");
}
}
}
// If we are Idle, we can then consider the transition to be expired.
if (state.transitionHandling == TransitionHandlingState::Requested && state.streaming == StreamingState::Idle) {
// If we are Idle, we can then consider the transition to be expired when it was requested or when the data processing timeout expired.
if ((state.transitionHandling == TransitionHandlingState::Requested || state.transitionHandling == TransitionHandlingState::DataProcessingExpired) && state.streaming == StreamingState::Idle) {
O2_SIGNPOST_EVENT_EMIT(device, lid, "run_loop", "State transition requested and we are now in Idle. We can consider it to be completed.");
state.transitionHandling = TransitionHandlingState::Expired;
}
Expand Down Expand Up @@ -1702,8 +1742,13 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
*context.wasActive = true;
}

auto& deviceContext = ref.get<DeviceContext>();

if (state.streaming == StreamingState::EndOfStreaming) {
O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "We are in EndOfStreaming. Flushing queues.");
O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Flushing queues.");
O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "state", "EndOfStreaming reached. %{public}s %{public}s",
uv_timer_get_due_in(deviceContext.gracePeriodTimer) ? fmt::format("grace timer still running for {}", uv_timer_get_due_in(deviceContext.gracePeriodTimer)).c_str() : "Grace timer expired",
uv_timer_get_due_in(deviceContext.dataProcessingGracePeriodTimer) ? fmt::format("data processing timer still running for {}", uv_timer_get_due_in(deviceContext.dataProcessingGracePeriodTimer)).c_str() : "DataProcessingTimer timer expired");
// We keep processing data until we are Idle.
// FIXME: not sure this is the correct way to drain the queues, but
// I guess we will see.
Expand All @@ -1721,6 +1766,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
// We should keep the data generated at end of stream only for those
// which are not sources.
timingInfo.keepAtEndOfStream = shouldProcess;
O2_SIGNPOST_EVENT_EMIT(calibration, dpid, "calibration", "TimingInfo.keepAtEndOfStream %d", timingInfo.keepAtEndOfStream);

EndOfStreamContext eosContext{*context.registry, ref.get<DataAllocator>()};

Expand Down Expand Up @@ -2348,7 +2394,6 @@ bool DataProcessingDevice::tryDispatchComputation(ServiceRegistryRef ref, std::v
*context.registry};
ProcessingContext processContext{record, ref, ref.get<DataAllocator>()};
{
O2_SIGNPOST_EVENT_EMIT(device, aid, "device", "Invoking preProcessingCallbacks");
// Notice this should be thread safe and reentrant
// as it is called from many threads.
streamContext.preProcessingCallbacks(processContext);
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1515,6 +1515,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
realOdesc.add_options()("child-driver", bpo::value<std::string>());
realOdesc.add_options()("rate", bpo::value<std::string>());
realOdesc.add_options()("exit-transition-timeout", bpo::value<std::string>());
realOdesc.add_options()("data-processing-timeout", bpo::value<std::string>());
realOdesc.add_options()("expected-region-callbacks", bpo::value<std::string>());
realOdesc.add_options()("timeframes-rate-limit", bpo::value<std::string>());
realOdesc.add_options()("environment", bpo::value<std::string>());
Expand Down Expand Up @@ -1701,6 +1702,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
("control-port", bpo::value<std::string>(), "Utility port to be used by O2 Control") //
("rate", bpo::value<std::string>(), "rate for a data source device (Hz)") //
("exit-transition-timeout", bpo::value<std::string>(), "timeout before switching to READY state") //
("data-processing-timeout", bpo::value<std::string>(), "timeout before switching to calibration processing mode") //
("expected-region-callbacks", bpo::value<std::string>(), "region callbacks to expect before starting") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframes can be in fly") //
("shm-monitor", bpo::value<std::string>(), "whether to use the shared memory monitor") //
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/O2ControlHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ void dumpCommand(std::ostream& dumpOut, const DeviceExecution& execution, std::s
dumpOut << indLevel << indScheme << "- \"-b\"\n";
dumpOut << indLevel << indScheme << "- \"--exit-transition-timeout\"\n";
dumpOut << indLevel << indScheme << "- \"'{{ exit_transition_timeout }}'\"\n";
dumpOut << indLevel << indScheme << "- \"--data-processing-timeout\"\n";
dumpOut << indLevel << indScheme << "- \"'{{ data_processing_timeout }}'\"\n";
dumpOut << indLevel << indScheme << "- \"--monitoring-backend\"\n";
dumpOut << indLevel << indScheme << "- \"'{{ monitoring_dpl_url }}'\"\n";
dumpOut << indLevel << indScheme << "- \"--session\"\n";
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
// declared in the workflow definition are allowed.
runner.AddHook<fair::mq::hooks::SetCustomCmdLineOptions>([&spec, driverConfig, defaultDriverClient](fair::mq::DeviceRunner& r) {
std::string defaultExitTransitionTimeout = "0";
std::string defaultDataProcessingTimeout = "0";
std::string defaultInfologgerMode = "";
o2::framework::DeploymentMode deploymentMode = o2::framework::DefaultsHelpers::deploymentMode();
if (deploymentMode == o2::framework::DeploymentMode::OnlineDDS) {
Expand All @@ -1055,6 +1056,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
("signposts", bpo::value<std::string>()->default_value(defaultSignposts ? defaultSignposts : ""), "comma separated list of signposts to enable") //
("expected-region-callbacks", bpo::value<std::string>()->default_value("0"), "how many region callbacks we are expecting") //
("exit-transition-timeout", bpo::value<std::string>()->default_value(defaultExitTransitionTimeout), "how many second to wait before switching from RUN to READY") //
("data-processing-timeout", bpo::value<std::string>()->default_value(defaultDataProcessingTimeout), "how many second to wait before switching from RUN to CALIBRATION") //
("timeframes-rate-limit", bpo::value<std::string>()->default_value("0"), "how many timeframe can be in fly at the same moment (0 disables)") //
("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
("infologger-mode", bpo::value<std::string>()->default_value(defaultInfologgerMode), "O2_INFOLOGGER_MODE override");
Expand Down
8 changes: 8 additions & 0 deletions Framework/Core/test/test_FrameworkDataFlowToO2Control.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ const std::vector expectedTasks{
- "-b"
- "--exit-transition-timeout"
- "'{{ exit_transition_timeout }}'"
- "--data-processing-timeout"
- "'{{ data_processing_timeout }}'"
- "--monitoring-backend"
- "'{{ monitoring_dpl_url }}'"
- "--session"
Expand Down Expand Up @@ -270,6 +272,8 @@ const std::vector expectedTasks{
- "-b"
- "--exit-transition-timeout"
- "'{{ exit_transition_timeout }}'"
- "--data-processing-timeout"
- "'{{ data_processing_timeout }}'"
- "--monitoring-backend"
- "'{{ monitoring_dpl_url }}'"
- "--session"
Expand Down Expand Up @@ -367,6 +371,8 @@ const std::vector expectedTasks{
- "-b"
- "--exit-transition-timeout"
- "'{{ exit_transition_timeout }}'"
- "--data-processing-timeout"
- "'{{ data_processing_timeout }}'"
- "--monitoring-backend"
- "'{{ monitoring_dpl_url }}'"
- "--session"
Expand Down Expand Up @@ -461,6 +467,8 @@ const std::vector expectedTasks{
- "-b"
- "--exit-transition-timeout"
- "'{{ exit_transition_timeout }}'"
- "--data-processing-timeout"
- "'{{ data_processing_timeout }}'"
- "--monitoring-backend"
- "'{{ monitoring_dpl_url }}'"
- "--session"
Expand Down
6 changes: 4 additions & 2 deletions Framework/TestWorkflows/src/o2DummyCalibrationWorkflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ WorkflowSpec defineDataProcessing(ConfigContext const& specs)
static int counter = 0;
auto& aData = outputs.make<int>(OutputRef{"counter"});
aData = counter++;
if (counter == 10) {
pcx.services().get<ControlService>().endOfStream();
sleep(1);
if (counter == 5) {
pcx.services().get<ControlService>().readyToQuit(QuitRequest::All);
}
})},
};
Expand All @@ -57,6 +58,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& specs)
callbacks.set<CallbackService::Id::EndOfStream>(eosCallback);
return adaptStateless([](Input<"x", int> const& x)
{
sleep(1);
sum += x;
std::cout << "Sum: " << sum << std::endl;
}); })};
Expand Down

0 comments on commit 742a0e7

Please sign in to comment.