Skip to content

Commit

Permalink
Merge 654875a into sapling-pr-archive-ktf
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf authored Sep 26, 2024
2 parents c16dcf8 + 654875a commit 4b54694
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
11 changes: 3 additions & 8 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
// or submit itself to any jurisdiction.
#include "Framework/AsyncQueue.h"
#include "Framework/DataProcessingDevice.h"
#include "Framework/ChannelMatching.h"
#include "Framework/ControlService.h"
#include "Framework/ComputingQuotaEvaluator.h"
#include "Framework/DataProcessingHeader.h"
Expand All @@ -28,7 +27,6 @@
#include "ConfigurationOptionsRetriever.h"
#include "Framework/FairMQDeviceProxy.h"
#include "Framework/CallbackService.h"
#include "Framework/TMessageSerializer.h"
#include "Framework/InputRecord.h"
#include "Framework/InputSpan.h"
#if defined(__APPLE__) || defined(NDEBUG)
Expand All @@ -37,23 +35,20 @@
#include "Framework/Signpost.h"
#include "Framework/TimingHelpers.h"
#include "Framework/SourceInfoHeader.h"
#include "Framework/Logger.h"
#include "Framework/DriverClient.h"
#include "Framework/Monitoring.h"
#include "Framework/TimesliceIndex.h"
#include "Framework/VariableContextHelpers.h"
#include "Framework/DataProcessingContext.h"
#include "Framework/DataProcessingHeader.h"
#include "Framework/DeviceContext.h"
#include "Framework/RawDeviceService.h"
#include "Framework/StreamContext.h"
#include "Framework/DefaultsHelpers.h"
#include "Framework/ServiceRegistryRef.h"

#include "PropertyTreeHelpers.h"
#include "DataProcessingStatus.h"
#include "DecongestionService.h"
#include "Framework/DataProcessingHelpers.h"
#include "DataRelayerHelpers.h"
#include "ProcessingPoliciesHelpers.h"
#include "Headers/DataHeader.h"
#include "Headers/DataHeaderHelpers.h"

Expand All @@ -66,6 +61,7 @@
#include <fairmq/ProgOptions.h>
#include <Configuration/ConfigurationInterface.h>
#include <Configuration/ConfigurationFactory.h>
#include <Monitoring/Monitoring.h>
#include <TMessage.h>
#include <TClonesArray.h>

Expand All @@ -74,7 +70,6 @@
#include <vector>
#include <numeric>
#include <memory>
#include <unordered_map>
#include <uv.h>
#include <execinfo.h>
#include <sstream>
Expand Down
27 changes: 24 additions & 3 deletions Framework/Core/src/DataRelayer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,12 @@ void DataRelayer::pruneCache(TimesliceSlot slot, OnDropCallback onDrop)
pruneCache(slot);
}

bool isCalibrationData(std::unique_ptr<fair::mq::Message>& first)
{
auto* dh = o2::header::get<DataHeader*>(first->GetData());
return dh->flagsDerivedHeader & DataProcessingHeader::KEEP_AT_EOS_FLAG;
}

DataRelayer::RelayChoice
DataRelayer::relay(void const* rawHeader,
std::unique_ptr<fair::mq::Message>* messages,
Expand Down Expand Up @@ -456,7 +462,7 @@ DataRelayer::RelayChoice
&nPayloads,
&cache = mCache,
&services = mContext,
numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) {
numInputTypes = mDistinctRoutesIndex.size()](TimesliceId timeslice, int input, TimesliceSlot slot, InputInfo const& info) -> size_t {
O2_SIGNPOST_ID_GENERATE(aid, data_relayer);
O2_SIGNPOST_EVENT_EMIT(data_relayer, aid, "saveInSlot", "saving %{public}s@%zu in slot %zu from %{public}s",
fmt::format("{:x}", *o2::header::get<DataHeader*>(messages[0]->GetData())).c_str(),
Expand All @@ -468,11 +474,20 @@ DataRelayer::RelayChoice
// TODO: make sure that multiple parts can only be added within the same call of
// DataRelayer::relay
assert(nPayloads > 0);
size_t saved = 0;
for (size_t mi = 0; mi < nMessages; ++mi) {
assert(mi + nPayloads < nMessages);
// We are in calibration mode and the data does not have the calibration bit set.
// We do not store it.
if (services.get<DeviceState>().allowedProcessing == DeviceState::ProcessingType::Any || isCalibrationData(messages[mi])) {
mi += nPayloads;
continue;
}
target.add([&messages, &mi](size_t i) -> fair::mq::MessagePtr& { return messages[mi + i]; }, nPayloads + 1);
mi += nPayloads;
saved += nPayloads;
}
return saved;
};

auto updateStatistics = [ref = mContext](TimesliceIndex::ActionTaken action) {
Expand Down Expand Up @@ -551,7 +566,10 @@ DataRelayer::RelayChoice
this->pruneCache(slot, onDrop);
mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
}
saveInSlot(timeslice, input, slot, info);
size_t saved = saveInSlot(timeslice, input, slot, info);
if (saved == 0) {
return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
}
index.publishSlot(slot);
index.markAsDirty(slot, true);
stats.updateStats({static_cast<short>(ProcessingStatsId::RELAYED_MESSAGES), DataProcessingStats::Op::Add, (int)1});
Expand Down Expand Up @@ -633,7 +651,10 @@ DataRelayer::RelayChoice
// cache still holds the old data, so we prune it.
this->pruneCache(slot, onDrop);
mPruneOps.erase(std::remove_if(mPruneOps.begin(), mPruneOps.end(), [slot](const auto& x) { return x.slot == slot; }), mPruneOps.end());
saveInSlot(timeslice, input, slot, info);
size_t saved = saveInSlot(timeslice, input, slot, info);
if (saved == 0) {
return RelayChoice{.type = RelayChoice::Type::Dropped, .timeslice = timeslice};
}
index.publishSlot(slot);
index.markAsDirty(slot, true);
return RelayChoice{.type = RelayChoice::Type::WillRelay};
Expand Down

0 comments on commit 4b54694

Please sign in to comment.