Skip to content

Commit

Permalink
DPL Analysis: add option to artificially slow down AOD reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Sep 30, 2024
1 parent 1de0de3 commit 56d0945
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 3 deletions.
13 changes: 11 additions & 2 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>

#include <thread>

using namespace o2;
using namespace o2::aod;

Expand Down Expand Up @@ -140,6 +138,8 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()

auto filename = options.get<std::string>("aod-file-private");

auto maxRate = options.get<float>("aod-max-io-rate");

std::string parentFileReplacement;
if (options.isSet("aod-parent-base-path-replacement")) {
parentFileReplacement = options.get<std::string>("aod-parent-base-path-replacement");
Expand Down Expand Up @@ -192,6 +192,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
fileCounter,
numTF,
watchdog,
maxRate,
didir, reportTFN, reportTFFileName](Monitoring& monitoring, DataAllocator& outputs, ControlService& control, DeviceSpec const& device) {
// Each parallel reader device.inputTimesliceId reads the files fileCounter*device.maxInputTimeslices+device.inputTimesliceId
// the TF to read is numTF
Expand Down Expand Up @@ -222,6 +223,7 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
return;
}

int64_t startTime = uv_hrtime();
for (auto& route : requestedTables) {
if ((device.inputTimesliceId % route.maxTimeslices) != route.timeslice) {
continue;
Expand Down Expand Up @@ -278,6 +280,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback()
}
first = false;
}
int64_t stopTime = uv_hrtime();
int64_t currentDelta = (stopTime - startTime) / 1000000000; // in s
float extraTime = (totalSizeUncompressed / 1000000 - currentDelta * maxRate) / maxRate;
// We only sleep if we read faster than the max-read-rate.
if (extraTime > 0.) {
uv_sleep(extraTime * 1000); // in milliseconds
}
totalDFSent++;
monitoring.send(Metric{(uint64_t)totalDFSent, "df-sent"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
monitoring.send(Metric{(uint64_t)totalSizeUncompressed / 1000, "aod-bytes-read-uncompressed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/WorkflowCustomizationHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ std::vector<ConfigParamSpec> WorkflowCustomizationHelpers::requiredWorkflowOptio
{"aod-writer-keep", VariantType::String, "", {"Comma separated list of ORIGIN/DESCRIPTION/SUBSPECIFICATION:treename:col1/col2/..:filename"}},

{"fairmq-rate-logging", VariantType::Int, 0, {"Rate logging for FairMQ channels"}},
{"aod-max-io-rate", VariantType::Float, 0.0f, {"Maximum I/O throughput in MB/s"}},
{"fairmq-recv-buffer-size", VariantType::Int, 4, {"recvBufferSize option for FairMQ channels"}},
{"fairmq-send-buffer-size", VariantType::Int, 4, {"sendBufferSize option for FairMQ channels"}},
/// Find out a place where we can write the sockets
Expand Down
1 change: 0 additions & 1 deletion Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include <utility>
#include <vector>
#include <climits>
#include <thread>

O2_DECLARE_DYNAMIC_LOG(workflow_helpers);

Expand Down

0 comments on commit 56d0945

Please sign in to comment.