diff --git a/.github/workflows/clean-test.yml b/.github/workflows/clean-test.yml index c926d5a7dea46..cbc524910c33e 100644 --- a/.github/workflows/clean-test.yml +++ b/.github/workflows/clean-test.yml @@ -43,6 +43,15 @@ name: Clean PR checks description: build/O2/o2-dataflow-cs8 type: boolean default: true + 'check_build/O2/o2/aarch64': + description: build/O2/o2/aarch64 + type: boolean + default: true + 'check_build/O2/o2_slc9': + description: build/O2/o2_slc9 + type: boolean + default: true + permissions: {} diff --git a/Detectors/TOF/simulation/include/TOFSimulation/Digitizer.h b/Detectors/TOF/simulation/include/TOFSimulation/Digitizer.h index 91ae26f2c9451..feb35c769d6ba 100644 --- a/Detectors/TOF/simulation/include/TOFSimulation/Digitizer.h +++ b/Detectors/TOF/simulation/include/TOFSimulation/Digitizer.h @@ -61,13 +61,14 @@ class Digitizer : public WindowFiller Float_t getEffZ(Float_t z); Float_t getFractionOfCharge(Float_t x, Float_t z); - Float_t getTimeLastHit(Int_t idigit) const { return 0; } - Float_t getTotLastHit(Int_t idigit) const { return 0; } - Int_t getXshift(Int_t idigit) const { return 0; } - Int_t getZshift(Int_t idigit) const { return 0; } + Float_t getTimeLastHit(Int_t idigit) const { return mTimeLastHit[idigit]; } + Float_t getTotLastHit(Int_t idigit) const { return mTotLastHit[idigit]; } + Int_t getXshift(Int_t idigit) const { return mXLastShift[idigit]; } + Int_t getZshift(Int_t idigit) const { return mZLastShift[idigit]; } void setEventID(Int_t id) { mEventID = id; } void setSrcID(Int_t id) { mSrcID = id; } + void runFullTestExample(const char* geo = ""); void test(const char* geo = ""); void testFromHits(const char* geo = "", const char* hits = "AliceO2_TGeant3.tof.mc_10_event.root"); @@ -134,6 +135,11 @@ class Digitizer : public WindowFiller void checkIfReuseFutureDigits(); + int mNLastHit = 0; + float mTimeLastHit[10]; + float mTotLastHit[10]; + Int_t mXLastShift[10]; + Int_t mZLastShift[10]; ClassDefNV(Digitizer, 1); }; } // namespace tof diff --git a/Detectors/TOF/simulation/src/Digitizer.cxx b/Detectors/TOF/simulation/src/Digitizer.cxx index cb2d36fa654a1..d8ef99ae1bd22 100644 --- a/Detectors/TOF/simulation/src/Digitizer.cxx +++ b/Detectors/TOF/simulation/src/Digitizer.cxx @@ -128,6 +128,8 @@ int Digitizer::process(const std::vector* hits, std::vector* dig Int_t Digitizer::processHit(const HitType& hit, Double_t event_time) { + mNLastHit = 0; + Float_t pos[3] = {hit.GetX(), hit.GetY(), hit.GetZ()}; Float_t deltapos[3]; Int_t detInd[5]; @@ -169,6 +171,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time) // check the fired PAD 1 (A) if (isFired(xLocal, zLocal, charge)) { ndigits++; + mXLastShift[mNLastHit] = 0; + mZLastShift[mNLastHit] = 0; addDigit(channel, istrip, time, xLocal, zLocal, charge, 0, 0, detInd[3], trackID); } @@ -184,6 +188,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time) } if (isFired(xLocal, zLocal, charge)) { ndigits++; + mXLastShift[mNLastHit] = 0; + mZLastShift[mNLastHit] = iZshift; addDigit(channel, istrip, time, xLocal, zLocal, charge, 0, iZshift, detInd[3], trackID); } @@ -196,6 +202,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time) zLocal = deltapos[2]; // recompute local coordinates if (isFired(xLocal, zLocal, charge)) { ndigits++; + mXLastShift[mNLastHit] = -1; + mZLastShift[mNLastHit] = 0; addDigit(channel, istrip, time, xLocal, zLocal, charge, -1, 0, detInd[3], trackID); } } @@ -209,6 +217,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time) zLocal = deltapos[2]; // recompute local coordinates if (isFired(xLocal, zLocal, charge)) { ndigits++; + mXLastShift[mNLastHit] = 1; + mZLastShift[mNLastHit] = 0; addDigit(channel, istrip, time, xLocal, zLocal, charge, 1, 0, detInd[3], trackID); } } @@ -226,6 +236,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time) } if (isFired(xLocal, zLocal, charge)) { ndigits++; + mXLastShift[mNLastHit] = -1; + mZLastShift[mNLastHit] = iZshift; addDigit(channel, istrip, time, xLocal, zLocal, charge, -1, iZshift, detInd[3], trackID); } } @@ -243,6 +255,8 @@ Int_t Digitizer::processHit(const HitType& hit, Double_t event_time) } if (isFired(xLocal, zLocal, charge)) { ndigits++; + mXLastShift[mNLastHit] = 1; + mZLastShift[mNLastHit] = iZshift; addDigit(channel, istrip, time, xLocal, zLocal, charge, 1, iZshift, detInd[3], trackID); } } @@ -297,6 +311,11 @@ void Digitizer::addDigit(Int_t channel, UInt_t istrip, Double_t time, Float_t x, LOG(error) << "Wrong de-calibration correction for ch = " << channel << ", tot = " << tot << " (Skip it)"; return; } + + mTimeLastHit[mNLastHit] = time; + mTotLastHit[mNLastHit] = tot; + mNLastHit++; + time -= tsCorr; // TODO: to be checked that "-" is correct, and we did not need "+" instead :-) // let's move from time to bc, tdc @@ -590,7 +609,17 @@ void Digitizer::printParameters() printf("Time walk ON = %f ps/cm\n", mTimeWalkeSlope); } } - +//______________________________________________________________________ +void Digitizer::runFullTestExample(const char* geo) +{ + initParameters(); + o2::tof::CalibTOFapi* api = new o2::tof::CalibTOFapi(); + api->setTimeStamp(0); + api->readLHCphase(); + api->readTimeSlewingParam(); + setCalibApi(api); + test(geo); +} //______________________________________________________________________ void Digitizer::test(const char* geo) { @@ -696,7 +725,8 @@ void Digitizer::test(const char* geo) hit->SetEnergyLoss(0.0001); - Int_t ndigits = processHit(*hit, mEventTime.getTimeOffsetWrtBC()); + processHit(*hit, mEventTime.getTimeOffsetWrtBC()); + Int_t ndigits = mNLastHit; h3->Fill(ndigits); hpadAll->Fill(xlocal, zlocal); diff --git a/Framework/Core/include/Framework/DataProcessingContext.h b/Framework/Core/include/Framework/DataProcessingContext.h index d71ad203b1580..9b7cbc238c942 100644 --- a/Framework/Core/include/Framework/DataProcessingContext.h +++ b/Framework/Core/include/Framework/DataProcessingContext.h @@ -26,9 +26,7 @@ struct DataProcessorSpec; struct DataProcessorContext { DataProcessorContext(DataProcessorContext const&) = delete; DataProcessorContext() = default; - // These are specific of a given context and therefore - // not shared by threads. - bool* wasActive = nullptr; + bool allDone = false; /// Latest run number we processed globally for this DataProcessor. int64_t lastRunNumberProcessed = -1; diff --git a/Framework/Core/include/Framework/DataProcessingDevice.h b/Framework/Core/include/Framework/DataProcessingDevice.h index d9565ebef84a2..67edaa99e532b 100644 --- a/Framework/Core/include/Framework/DataProcessingDevice.h +++ b/Framework/Core/include/Framework/DataProcessingDevice.h @@ -113,7 +113,6 @@ class DataProcessingDevice : public fair::mq::Device std::vector mPendingRegionInfos; /// A list of the region infos not yet notified. std::mutex mRegionInfoMutex; ProcessingPolicies mProcessingPolicies; /// User policies related to data processing - bool mWasActive = false; /// Whether or not the device was active at last iteration. std::vector mHandles; /// Handles to use to schedule work. std::vector mStreams; /// Information about the task running in the associated mHandle. /// Handle to wake up the main loop from other threads diff --git a/Framework/Core/include/Framework/DeviceState.h b/Framework/Core/include/Framework/DeviceState.h index 89961b3e92dc7..f6d863064ee66 100644 --- a/Framework/Core/include/Framework/DeviceState.h +++ b/Framework/Core/include/Framework/DeviceState.h @@ -30,6 +30,8 @@ typedef struct uv_async_s uv_async_t; namespace o2::framework { +struct DataProcessorContext; + /// Running state information of a given device struct DeviceState { /// Motivation for the loop being triggered. @@ -108,6 +110,11 @@ struct DeviceState { /// the bits we are interested in. std::vector severityStack; TransitionHandlingState transitionHandling = TransitionHandlingState::NoTransition; + + // The DataProcessorContext which was most recently active. + // We use this to determine if we should trigger the loop without + // waiting for some events. + std::atomic lastActiveDataProcessor = nullptr; }; } // namespace o2::framework diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index f3fe328e78a06..169613b18a2ee 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -1061,7 +1061,11 @@ void DataProcessingDevice::InitTask() // Whenever we InitTask, we consider as if the previous iteration // was successful, so that even if there is no timer or receiving // channel, we can still start an enumeration. - mWasActive = true; + DataProcessorContext* initialContext = nullptr; + bool idle = state.lastActiveDataProcessor.compare_exchange_strong(initialContext, (DataProcessorContext*)-1); + if (!idle) { + LOG(error) << "DataProcessor " << state.lastActiveDataProcessor.load()->spec->name << " was unexpectedly active"; + } // We should be ready to run here. Therefore we copy all the // required parts in the DataProcessorContext. Eventually we should @@ -1093,8 +1097,6 @@ void DataProcessingDevice::InitTask() void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceContext& deviceContext) { - context.wasActive = &mWasActive; - context.isSink = false; // If nothing is a sink, the rate limiting simply does not trigger. bool enableRateLimiting = std::stoi(fConfig->GetValue("timeframes-rate-limit")); @@ -1308,14 +1310,19 @@ void DataProcessingDevice::Run() { ServiceRegistryRef ref{mServiceRegistry}; ref.get().flushPending(mServiceRegistry); - auto shouldNotWait = (mWasActive && + DataProcessorContext* lastActive = state.lastActiveDataProcessor.load(); + // Reset to zero unless some other DataPorcessorContext completed in the meanwhile. + // In such case we will take care of it at next iteration. + state.lastActiveDataProcessor.compare_exchange_strong(lastActive, nullptr); + + auto shouldNotWait = (lastActive != nullptr && (state.streaming != StreamingState::Idle) && (state.activeSignals.empty())) || (state.streaming == StreamingState::EndOfStreaming); if (firstLoop) { shouldNotWait = true; firstLoop = false; } - if (mWasActive) { + if (lastActive != nullptr) { state.loopReason |= DeviceState::LoopReason::PREVIOUSLY_ACTIVE; } if (NewStatePending()) { @@ -1485,10 +1492,7 @@ void DataProcessingDevice::Run() } else { auto ref = ServiceRegistryRef{mServiceRegistry}; ref.get().handleExpired(reportExpiredOffer); - mWasActive = false; } - } else { - mWasActive = false; } } @@ -1510,7 +1514,6 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) O2_SIGNPOST_ID_FROM_POINTER(dpid, device, &context); O2_SIGNPOST_START(device, dpid, "do_prepare", "Starting DataProcessorContext::doPrepare."); - *context.wasActive = false; { ref.get().call(); } @@ -1669,7 +1672,10 @@ void DataProcessingDevice::doPrepare(ServiceRegistryRef ref) socket.Events(&info.hasPendingEvents); if (info.hasPendingEvents) { info.readPolled = false; - *context.wasActive |= newMessages; + // In case there were messages, we consider it as activity + if (newMessages) { + state.lastActiveDataProcessor.store(&context); + } } O2_SIGNPOST_END(device, cid, "channels", "Done processing channel %{public}s (%d).", channelSpec.name.c_str(), info.id.value); @@ -1693,24 +1699,29 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) auto& spec = ref.get(); if (state.streaming == StreamingState::Idle) { - *context.wasActive = false; return; } context.completed.clear(); context.completed.reserve(16); - *context.wasActive |= DataProcessingDevice::tryDispatchComputation(ref, context.completed); + if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) { + state.lastActiveDataProcessor.store(&context); + } DanglingContext danglingContext{*context.registry}; context.preDanglingCallbacks(danglingContext); - if (*context.wasActive == false) { + if (state.lastActiveDataProcessor.load() == nullptr) { ref.get().call(); } auto activity = ref.get().processDanglingInputs(context.expirationHandlers, *context.registry, true); - *context.wasActive |= activity.expiredSlots > 0; + if (activity.expiredSlots > 0) { + state.lastActiveDataProcessor = &context; + } context.completed.clear(); - *context.wasActive |= DataProcessingDevice::tryDispatchComputation(ref, context.completed); + if (DataProcessingDevice::tryDispatchComputation(ref, context.completed)) { + state.lastActiveDataProcessor = &context; + } context.postDanglingCallbacks(danglingContext); @@ -1720,7 +1731,7 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) // framework itself. if (context.allDone == true && state.streaming == StreamingState::Streaming) { switchState(StreamingState::EndOfStreaming); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; } if (state.streaming == StreamingState::EndOfStreaming) { @@ -1766,7 +1777,10 @@ void DataProcessingDevice::doRun(ServiceRegistryRef ref) // This is needed because the transport is deleted before the device. relayer.clear(); switchState(StreamingState::Idle); - *context.wasActive = shouldProcess; + // In case we should process, note the data processor responsible for it + if (shouldProcess) { + state.lastActiveDataProcessor = &context; + } // On end of stream we shut down all output pollers. O2_SIGNPOST_EVENT_EMIT(device, dpid, "state", "Shutting down output pollers."); for (auto& poller : state.activeOutputPollers) { @@ -1834,6 +1848,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& O2_SIGNPOST_ID_FROM_POINTER(cid, device, &info); auto ref = ServiceRegistryRef{*context.registry}; auto& stats = ref.get(); + auto& state = ref.get(); auto& parts = info.parts; stats.updateStats({(int)ProcessingStatsId::TOTAL_INPUTS, DataProcessingStats::Op::Set, (int64_t)parts.Size()}); @@ -1856,14 +1871,14 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got SourceInfoHeader with state %d", (int)sih->state); info.state = sih->state; insertInputInfo(pi, 2, InputType::SourceInfo, info.id); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; continue; } auto dih = o2::header::get(headerData); if (dih) { O2_SIGNPOST_EVENT_EMIT(device, cid, "handle_data", "Got DomainInfoHeader with oldestPossibleTimeslice %d", (int)dih->oldestPossibleTimeslice); insertInputInfo(pi, 2, InputType::DomainInfo, info.id); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; continue; } auto dh = o2::header::get(headerData); @@ -1925,6 +1940,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto handleValidMessages = [&info, ref, &reportError](std::vector const& inputInfos) { auto& relayer = ref.get(); + auto& state = ref.get(); static WaitBackpressurePolicy policy; auto& parts = info.parts; // We relay execution to make sure we have a complete set of parts @@ -2012,7 +2028,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& case InputType::SourceInfo: { LOGP(detail, "Received SourceInfo"); auto& context = ref.get(); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; auto headerIndex = input.position; auto payloadIndex = input.position + 1; assert(payloadIndex < parts.Size()); @@ -2030,7 +2046,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& /// We have back pressure, therefore we do not process DomainInfo anymore. /// until the previous message are processed. auto& context = ref.get(); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; auto headerIndex = input.position; auto payloadIndex = input.position + 1; assert(payloadIndex < parts.Size()); @@ -2058,7 +2074,7 @@ void DataProcessingDevice::handleData(ServiceRegistryRef ref, InputChannelInfo& auto& context = ref.get(); context.domainInfoUpdatedCallback(*context.registry, oldestPossibleTimeslice, info.id); ref.get().call((ServiceRegistryRef)*context.registry, (size_t)oldestPossibleTimeslice, (ChannelIndex)info.id); - *context.wasActive = true; + state.lastActiveDataProcessor = &context; } auto it = std::remove_if(parts.fParts.begin(), parts.fParts.end(), [](auto& msg) -> bool { return msg.get() == nullptr; }); parts.fParts.erase(it, parts.end()); diff --git a/prodtests/full-system-test/aggregator-workflow.sh b/prodtests/full-system-test/aggregator-workflow.sh index aedd9f7b3bc2f..4e5f6f2a4c8ad 100755 --- a/prodtests/full-system-test/aggregator-workflow.sh +++ b/prodtests/full-system-test/aggregator-workflow.sh @@ -113,10 +113,12 @@ if [[ $BEAMTYPE == "PbPb" ]]; then : ${LHCPHASE_TF_PER_SLOT:=100000} : ${TOF_CHANNELOFFSETS_UPDATE:=300000} : ${TOF_CHANNELOFFSETS_DELTA_UPDATE:=50000} + : ${FT0_TIMEOFFSET_TRG_BITS:=384} # min bias and data validity else : ${LHCPHASE_TF_PER_SLOT:=100000} : ${TOF_CHANNELOFFSETS_UPDATE:=300000} : ${TOF_CHANNELOFFSETS_DELTA_UPDATE:=50000} + : ${FT0_TIMEOFFSET_TRG_BITS:=144} # vertex and data validity fi # special settings for aggregator workflows @@ -361,7 +363,7 @@ fi if [[ $AGGREGATOR_TASKS == FORWARD_TF || $AGGREGATOR_TASKS == ALL ]]; then # FT0 if [[ $CALIB_FT0_TIMEOFFSET == 1 ]]; then - add_W o2-calibration-ft0-time-offset-calib "--tf-per-slot $FT0_TIMEOFFSET_TF_PER_SLOT --max-delay 0" "FT0CalibParam.mNExtraSlots=0;FT0CalibParam.mRebinFactorPerChID[180]=4;" + add_W o2-calibration-ft0-time-offset-calib "--tf-per-slot $FT0_TIMEOFFSET_TF_PER_SLOT --max-delay 0" "FT0CalibParam.mNExtraSlots=0;FT0CalibParam.mRebinFactorPerChID[180]=4;FT0DigitFilterParam.mTrgBitsGood=${FT0_TIMEOFFSET_TRG_BITS};FT0DigitFilterParam.mTrgBitsToCheck=${FT0_TIMEOFFSET_TRG_BITS};" fi fi