Skip to content

Commit

Permalink
Merge 0a97f2e into sapling-pr-archive-ktf
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf authored Sep 22, 2024
2 parents c95ac3a + 0a97f2e commit c2adc07
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 33 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/clean-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ name: Clean PR checks
description: build/O2/o2-dataflow-cs8
type: boolean
default: true
'check_build/O2/o2/aarch64':
description: build/O2/o2/aarch64
type: boolean
default: true
'check_build/O2/o2_slc9':
description: build/O2/o2_slc9
type: boolean
default: true


permissions: {}

Expand Down
14 changes: 10 additions & 4 deletions Detectors/TOF/simulation/include/TOFSimulation/Digitizer.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,14 @@ class Digitizer : public WindowFiller
Float_t getEffZ(Float_t z);
Float_t getFractionOfCharge(Float_t x, Float_t z);

Float_t getTimeLastHit(Int_t idigit) const { return 0; }
Float_t getTotLastHit(Int_t idigit) const { return 0; }
Int_t getXshift(Int_t idigit) const { return 0; }
Int_t getZshift(Int_t idigit) const { return 0; }
Float_t getTimeLastHit(Int_t idigit) const { return mTimeLastHit[idigit]; }
Float_t getTotLastHit(Int_t idigit) const { return mTotLastHit[idigit]; }
Int_t getXshift(Int_t idigit) const { return mXLastShift[idigit]; }
Int_t getZshift(Int_t idigit) const { return mZLastShift[idigit]; }
void setEventID(Int_t id) { mEventID = id; }
void setSrcID(Int_t id) { mSrcID = id; }

void runFullTestExample(const char* geo = "");
void test(const char* geo = "");
void testFromHits(const char* geo = "", const char* hits = "AliceO2_TGeant3.tof.mc_10_event.root");

Expand Down Expand Up @@ -134,6 +135,11 @@ class Digitizer : public WindowFiller

void checkIfReuseFutureDigits();

int mNLastHit = 0;
float mTimeLastHit[10];
float mTotLastHit[10];
Int_t mXLastShift[10];
Int_t mZLastShift[10];
ClassDefNV(Digitizer, 1);
};
} // namespace tof
Expand Down
34 changes: 32 additions & 2 deletions Detectors/TOF/simulation/src/Digitizer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ int Digitizer::process(const std::vector<HitType>* hits, std::vector<Digit>* dig

Int_t Digitizer::processHit(const HitType& hit, Double_t event_time)
{
mNLastHit = 0;

Float_t pos[3] = {hit.GetX(), hit.GetY(), hit.GetZ()};
Float_t deltapos[3];
Int_t detInd[5];
Expand Down Expand Up @@ -169,6 +171,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time)
// check the fired PAD 1 (A)
if (isFired(xLocal, zLocal, charge)) {
ndigits++;
mXLastShift[mNLastHit] = 0;
mZLastShift[mNLastHit] = 0;
addDigit(channel, istrip, time, xLocal, zLocal, charge, 0, 0, detInd[3], trackID);
}

Expand All @@ -184,6 +188,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time)
}
if (isFired(xLocal, zLocal, charge)) {
ndigits++;
mXLastShift[mNLastHit] = 0;
mZLastShift[mNLastHit] = iZshift;
addDigit(channel, istrip, time, xLocal, zLocal, charge, 0, iZshift, detInd[3], trackID);
}

Expand All @@ -196,6 +202,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time)
zLocal = deltapos[2]; // recompute local coordinates
if (isFired(xLocal, zLocal, charge)) {
ndigits++;
mXLastShift[mNLastHit] = -1;
mZLastShift[mNLastHit] = 0;
addDigit(channel, istrip, time, xLocal, zLocal, charge, -1, 0, detInd[3], trackID);
}
}
Expand All @@ -209,6 +217,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time)
zLocal = deltapos[2]; // recompute local coordinates
if (isFired(xLocal, zLocal, charge)) {
ndigits++;
mXLastShift[mNLastHit] = 1;
mZLastShift[mNLastHit] = 0;
addDigit(channel, istrip, time, xLocal, zLocal, charge, 1, 0, detInd[3], trackID);
}
}
Expand All @@ -226,6 +236,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time)
}
if (isFired(xLocal, zLocal, charge)) {
ndigits++;
mXLastShift[mNLastHit] = -1;
mZLastShift[mNLastHit] = iZshift;
addDigit(channel, istrip, time, xLocal, zLocal, charge, -1, iZshift, detInd[3], trackID);
}
}
Expand All @@ -243,6 +255,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time)
}
if (isFired(xLocal, zLocal, charge)) {
ndigits++;
mXLastShift[mNLastHit] = 1;
mZLastShift[mNLastHit] = iZshift;
addDigit(channel, istrip, time, xLocal, zLocal, charge, 1, iZshift, detInd[3], trackID);
}
}
Expand Down Expand Up @@ -297,6 +311,11 @@ void Digitizer::addDigit(Int_t channel, UInt_t istrip, Double_t time, Float_t x,
LOG(error) << "Wrong de-calibration correction for ch = " << channel << ", tot = " << tot << " (Skip it)";
return;
}

mTimeLastHit[mNLastHit] = time;
mTotLastHit[mNLastHit] = tot;
mNLastHit++;

time -= tsCorr; // TODO: to be checked that "-" is correct, and we did not need "+" instead :-)

// let's move from time to bc, tdc
Expand Down Expand Up @@ -590,7 +609,17 @@ void Digitizer::printParameters()
printf("Time walk ON = %f ps/cm\n", mTimeWalkeSlope);
}
}

//______________________________________________________________________
void Digitizer::runFullTestExample(const char* geo)
{
initParameters();
o2::tof::CalibTOFapi* api = new o2::tof::CalibTOFapi();
api->setTimeStamp(0);
api->readLHCphase();
api->readTimeSlewingParam();
setCalibApi(api);
test(geo);
}
//______________________________________________________________________
void Digitizer::test(const char* geo)
{
Expand Down Expand Up @@ -696,7 +725,8 @@ void Digitizer::test(const char* geo)

hit->SetEnergyLoss(0.0001);

Int_t ndigits = processHit(*hit, mEventTime.getTimeOffsetWrtBC());
processHit(*hit, mEventTime.getTimeOffsetWrtBC());
Int_t ndigits = mNLastHit;

h3->Fill(ndigits);
hpadAll->Fill(xlocal, zlocal);
Expand Down
4 changes: 1 addition & 3 deletions Framework/Core/include/Framework/DataProcessingContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ struct DataProcessorSpec;
struct DataProcessorContext {
DataProcessorContext(DataProcessorContext const&) = delete;
DataProcessorContext() = default;
// These are specific of a given context and therefore
// not shared by threads.
bool* wasActive = nullptr;

bool allDone = false;
/// Latest run number we processed globally for this DataProcessor.
int64_t lastRunNumberProcessed = -1;
Expand Down
1 change: 0 additions & 1 deletion Framework/Core/include/Framework/DataProcessingDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ class DataProcessingDevice : public fair::mq::Device
std::vector<fair::mq::RegionInfo> mPendingRegionInfos; /// A list of the region infos not yet notified.
std::mutex mRegionInfoMutex;
ProcessingPolicies mProcessingPolicies; /// User policies related to data processing
bool mWasActive = false; /// Whether or not the device was active at last iteration.
std::vector<uv_work_t> mHandles; /// Handles to use to schedule work.
std::vector<TaskStreamInfo> mStreams; /// Information about the task running in the associated mHandle.
/// Handle to wake up the main loop from other threads
Expand Down
7 changes: 7 additions & 0 deletions Framework/Core/include/Framework/DeviceState.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ typedef struct uv_async_s uv_async_t;
namespace o2::framework
{

struct DataProcessorContext;

/// Running state information of a given device
struct DeviceState {
/// Motivation for the loop being triggered.
Expand Down Expand Up @@ -108,6 +110,11 @@ struct DeviceState {
/// the bits we are interested in.
std::vector<int> severityStack;
TransitionHandlingState transitionHandling = TransitionHandlingState::NoTransition;

// The DataProcessorContext which was most recently active.
// We use this to determine if we should trigger the loop without
// waiting for some events.
std::atomic<DataProcessorContext*> lastActiveDataProcessor = nullptr;
};

} // namespace o2::framework
Expand Down
60 changes: 38 additions & 22 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,11 @@ void DataProcessingDevice::InitTask()
// Whenever we InitTask, we consider as if the previous iteration
// was successful, so that even if there is no timer or receiving
// channel, we can still start an enumeration.
mWasActive = true;
DataProcessorContext* initialContext = nullptr;
bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1);
if (!idle) {
LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active";
}

// We should be ready to run here. Therefore we copy all the
// required parts in the DataProcessorContext. Eventually we should
Expand Down Expand Up @@ -1093,8 +1097,6 @@ void DataProcessingDevice::InitTask()

void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceContext& deviceContext)
{
context.wasActive = &mWasActive;

context.isSink = false;
// If nothing is a sink, the rate limiting simply does not trigger.
bool enableRateLimiting = std::stoi(fConfig->GetValue<std::string>("timeframes-rate-limit"));
Expand Down Expand Up @@ -1308,14 +1310,19 @@ void DataProcessingDevice::Run()
{
ServiceRegistryRef ref{mServiceRegistry};
ref.get<DriverClient>().flushPending(mServiceRegistry);
auto shouldNotWait = (mWasActive &&
DataProcessorContext* lastActive = state.lastActiveDataProcessor.load();
// Reset to zero unless some other DataPorcessorContext completed in the meanwhile.
// In such case we will take care of it at next iteration.
state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr);

auto shouldNotWait = (lastActive != nullptr &&
(state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) ||
(state.streaming == StreamingState::EndOfStreaming);
if (firstLoop) {
shouldNotWait = true;
firstLoop = false;
}
if (mWasActive) {
if (lastActive != nullptr) {
state.loopReason |= DeviceState::LoopReason::PREVIOUSLY_ACTIVE;
}
if (NewStatePending()) {
Expand Down Expand Up @@ -1485,10 +1492,7 @@ void DataProcessingDevice::Run()
} else {
auto ref = ServiceRegistryRef{mServiceRegistry};
ref.get<ComputingQuotaEvaluator>().handleExpired(reportExpiredOffer);
mWasActive = false;
}
} else {
mWasActive = false;
}
}

Expand All @@ -1510,7 +1514,6 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context);
O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare.");

*context.wasActive = false;
{
ref.get<CallbackService>().call<CallbackService::Id::ClockTick>();
}
Expand Down Expand Up @@ -1669,7 +1672,10 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref)
socket.Events(&info.hasPendingEvents);
if (info.hasPendingEvents) {
info.readPolled = false;
*context.wasActive |= newMessages;
// In case there were messages, we consider it as activity
if (newMessages) {
state.lastActiveDataProcessor.store(&context);
}
}
O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).",
channelSpec.name.c_str(), info.id.value);
Expand All @@ -1693,24 +1699,29 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
auto& spec = ref.get<DeviceSpec const>();

if (state.streaming == StreamingState::Idle) {
*context.wasActive = false;
return;
}

context.completed.clear();
context.completed.reserve(16);
*context.wasActive |= DataProcessingDevice::tryDispatchComputation(ref, context.completed);
if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
state.lastActiveDataProcessor.store(&context);
}
DanglingContext danglingContext{*context.registry};

context.preDanglingCallbacks(danglingContext);
if (*context.wasActive == false) {
if (state.lastActiveDataProcessor.load() == nullptr) {
ref.get<CallbackService>().call<CallbackService::Id::Idle>();
}
auto activity = ref.get<DataRelayer>().processDanglingInputs(context.expirationHandlers, *context.registry, true);
*context.wasActive |= activity.expiredSlots > 0;
if (activity.expiredSlots > 0) {
state.lastActiveDataProcessor = &context;
}

context.completed.clear();
*context.wasActive |= DataProcessingDevice::tryDispatchComputation(ref, context.completed);
if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) {
state.lastActiveDataProcessor = &context;
}

context.postDanglingCallbacks(danglingContext);

Expand All @@ -1720,7 +1731,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
// framework itself.
if (context.allDone == true && state.streaming == StreamingState::Streaming) {
switchState(StreamingState::EndOfStreaming);
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
}

if (state.streaming == StreamingState::EndOfStreaming) {
Expand Down Expand Up @@ -1766,7 +1777,10 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref)
// This is needed because the transport is deleted before the device.
relayer.clear();
switchState(StreamingState::Idle);
*context.wasActive = shouldProcess;
// In case we should process, note the data processor responsible for it
if (shouldProcess) {
state.lastActiveDataProcessor = &context;
}
// On end of stream we shut down all output pollers.
O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers.");
for (auto& poller : state.activeOutputPollers) {
Expand Down Expand Up @@ -1834,6 +1848,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info);
auto ref = ServiceRegistryRef{*context.registry};
auto& stats = ref.get<DataProcessingStats>();
auto& state = ref.get<DeviceState>();
auto& parts = info.parts;
stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()});

Expand All @@ -1856,14 +1871,14 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state);
info.state = sih->state;
insertInputInfo(pi, 2, InputType::SourceInfo, info.id);
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
continue;
}
auto dih = o2::header::get<DomainInfoHeader*>(headerData);
if (dih) {
O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice);
insertInputInfo(pi, 2, InputType::DomainInfo, info.id);
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
continue;
}
auto dh = o2::header::get<DataHeader*>(headerData);
Expand Down Expand Up @@ -1925,6 +1940,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&

auto handleValidMessages = [&info, ref, &reportError](std::vector<InputInfo> const& inputInfos) {
auto& relayer = ref.get<DataRelayer>();
auto& state = ref.get<DeviceState>();
static WaitBackpressurePolicy policy;
auto& parts = info.parts;
// We relay execution to make sure we have a complete set of parts
Expand Down Expand Up @@ -2012,7 +2028,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
case InputType::SourceInfo: {
LOGP(detail, "Received SourceInfo");
auto& context = ref.get<DataProcessorContext>();
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
auto headerIndex = input.position;
auto payloadIndex = input.position + 1;
assert(payloadIndex < parts.Size());
Expand All @@ -2030,7 +2046,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
/// We have back pressure, therefore we do not process DomainInfo anymore.
/// until the previous message are processed.
auto& context = ref.get<DataProcessorContext>();
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
auto headerIndex = input.position;
auto payloadIndex = input.position + 1;
assert(payloadIndex < parts.Size());
Expand Down Expand Up @@ -2058,7 +2074,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo&
auto& context = ref.get<DataProcessorContext>();
context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id);
ref.get<CallbackService>().call<CallbackService::Id::DomainInfoUpdated>((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id);
*context.wasActive = true;
state.lastActiveDataProcessor = &context;
}
auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; });
parts.fParts.erase(it, parts.end());
Expand Down
Loading

0 comments on commit c2adc07

Please sign in to comment.