Skip to content

Commit

Permalink
Add new LWTracks to highlight UpdateCycle details (#9192)
Browse files Browse the repository at this point in the history
Co-authored-by: Vlad Kuznecov <va-kuznecov@nebius.com>
  • Loading branch information
va-kuznecov and Vlad Kuznecov authored Sep 13, 2024
1 parent 7277322 commit f315577
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 11 deletions.
6 changes: 6 additions & 0 deletions ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ struct TEventTypeField {
PROBE(PDiskEnqueueAllDetails, GROUPS("PDisk"), \
TYPES(ui64, size_t, size_t, size_t, double), \
NAMES("pdisk", "initialQueueSize", "processedReqs", "pushedToForsetiReqs", "spentTimeMs")) \
PROBE(PDiskUpdateStarted, GROUPS("PDisk"), TYPES(ui64), NAMES("pdisk")) \
PROBE(PDiskProcessLogWriteQueue, GROUPS("PDisk"), TYPES(ui64, size_t, size_t), NAMES("pdisk", "logQueueSize", "commitQueueSize")) \
PROBE(PDiskProcessChunkReadQueue, GROUPS("PDisk"), TYPES(ui64, size_t), NAMES("pdisk", "queueSize")) \
PROBE(PDiskProcessChunkWriteQueue, GROUPS("PDisk"), TYPES(ui64, size_t), NAMES("pdisk", "queueSize")) \
PROBE(PDiskStartWaiting, GROUPS("PDisk"), TYPES(ui64), NAMES("pdisk")) \
PROBE(PDiskUpdateEnded, GROUPS("PDisk"), TYPES(ui64), NAMES("pdisk")) \
PROBE(DSProxyGetEnqueue, GROUPS("DSProxy", "LWTrackStart"), TYPES(), NAMES()) \
PROBE(DSProxyGetBootstrap, GROUPS("DSProxy"), TYPES(), NAMES()) \
PROBE(DSProxyGetHandle, GROUPS("DSProxy", "LWTrackStart"), TYPES(), NAMES()) \
Expand Down
21 changes: 11 additions & 10 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2240,14 +2240,11 @@ void TPDisk::ProcessChunkWriteQueue() {
Y_FAIL_S("Unexpected request type# " << ui64(req->GetType()) << " in JointChunkWrites");
}
}
LWTRACK(PDiskProcessChunkWriteQueue, UpdateCycleOrbit, PCtx->PDiskId, JointChunkWrites.size());
JointChunkWrites.clear();
}

void TPDisk::ProcessChunkReadQueue() {
if (JointChunkReads.empty()) {
return;
}

NHPTimer::STime now = HPNow();
// Size (bytes) of elementary sectors block, it is useless to read/write less than that blockSize
ui64 bufferSize = BufferPool->GetBufferSize() / Format.SectorSize * Format.SectorSize;
Expand Down Expand Up @@ -2298,6 +2295,7 @@ void TPDisk::ProcessChunkReadQueue() {
Y_FAIL_S("Unexpected request type# " << ui64(req->GetType()) << " in JointChunkReads");
}
}
LWTRACK(PDiskProcessChunkReadQueue, UpdateCycleOrbit, PCtx->PDiskId, JointChunkReads.size());
JointChunkReads.clear();
}

Expand Down Expand Up @@ -3394,14 +3392,13 @@ void TPDisk::ProcessYardInitSet() {
}

void TPDisk::EnqueueAll() {
TInstant start = TInstant::Now();
auto start = TMonotonic::Now();

TGuard<TMutex> guard(StateMutex);
size_t initialQueueSize = InputQueue.GetWaitingSize();
size_t processedReqs = 0;
size_t pushedToForsetiReqs = 0;


while (InputQueue.GetWaitingSize() > 0) {
TRequestBase* request = InputQueue.Pop();
AtomicSub(InputQueueCost, request->Cost);
Expand Down Expand Up @@ -3453,12 +3450,13 @@ void TPDisk::EnqueueAll() {
}
}

double spentTimeMs = (TInstant::Now() - start).MillisecondsFloat();
LWPROBE(PDiskEnqueueAllDetails, PCtx->PDiskId, initialQueueSize, processedReqs, pushedToForsetiReqs, spentTimeMs);
double spentTimeMs = (TMonotonic::Now() - start).MillisecondsFloat();
LWTRACK(PDiskEnqueueAllDetails, UpdateCycleOrbit, PCtx->PDiskId, initialQueueSize, processedReqs, pushedToForsetiReqs, spentTimeMs);
}

void TPDisk::Update() {
Mon.UpdateDurationTracker.UpdateStarted();
LWTRACK(PDiskUpdateStarted, UpdateCycleOrbit, PCtx->PDiskId);

// ui32 userSectorSize = 0;

Expand Down Expand Up @@ -3554,9 +3552,9 @@ void TPDisk::Update() {
}
}
ui64 totalCost = totalLogCost + totalNonLogCost;
LWPROBE(PDiskForsetiCycle, PCtx->PDiskId, nowCycles, prevForsetiTimeNs, ForsetiPrevTimeNs, timeCorrection,
LWTRACK(PDiskForsetiCycle, UpdateCycleOrbit, PCtx->PDiskId, nowCycles, prevForsetiTimeNs, ForsetiPrevTimeNs, timeCorrection,
realDuration, virtualDuration, ForsetiTimeNs, totalCost, virtualDeadline);
LWPROBE(PDiskMilliBatchSize, PCtx->PDiskId, totalLogCost, totalNonLogCost, totalLogReqs, totalNonLogReqs);
LWTRACK(PDiskMilliBatchSize, UpdateCycleOrbit, PCtx->PDiskId, totalLogCost, totalNonLogCost, totalLogReqs, totalNonLogReqs);
ForsetiRealTimeCycles = nowCycles;


Expand Down Expand Up @@ -3636,6 +3634,7 @@ void TPDisk::Update() {


Mon.UpdateDurationTracker.WaitingStart(isNothingToDo);
LWTRACK(PDiskStartWaiting, UpdateCycleOrbit, PCtx->PDiskId);

if (Cfg->SectorMap) {
auto diskModeParams = Cfg->SectorMap->GetDiskModeParams();
Expand Down Expand Up @@ -3668,6 +3667,8 @@ void TPDisk::Update() {
InputQueue.ProducedWait(TDuration::MilliSeconds(10));
}

LWTRACK(PDiskUpdateEnded, UpdateCycleOrbit, PCtx->PDiskId);
UpdateCycleOrbit.Reset();
Mon.UpdateDurationTracker.UpdateEnded();
*Mon.PDiskThreadCPU = ThreadCPUTime();
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ class TPDisk : public IPDisk {
// Metadata storage
NMeta::TInfo Meta;

NLWTrace::TOrbit UpdateCycleOrbit;

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Initialization
TPDisk(std::shared_ptr<TPDiskCtx> pCtx, const TIntrusivePtr<TPDiskConfig> cfg, const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters);
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -707,8 +707,10 @@ void TPDisk::WriteSysLogRestorePoint(TCompletionAction *action, TReqId reqId, NW
// Common log writing
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void TPDisk::ProcessLogWriteQueueAndCommits() {
if (JointLogWrites.empty())
if (JointLogWrites.empty()) {
LWTRACK(PDiskProcessLogWriteQueue, UpdateCycleOrbit, PCtx->PDiskId, JointLogWrites.size(), JointCommits.size());
return;
}

NHPTimer::STime now = HPNow();
for (TLogWrite *logCommit : JointCommits) {
Expand Down Expand Up @@ -748,6 +750,7 @@ void TPDisk::ProcessLogWriteQueueAndCommits() {
double(logWrite->Cost) / 1000000.0, HPSecondsFloat(logWrite->Deadline),
logWrite->Owner, logWrite->IsFast, logWrite->PriorityClass);
}
LWTRACK(PDiskProcessLogWriteQueue, UpdateCycleOrbit, PCtx->PDiskId, JointLogWrites.size(), JointCommits.size());
TReqId reqId = JointLogWrites.back()->ReqId;
auto write = MakeHolder<TCompletionLogWrite>(
this, std::move(JointLogWrites), std::move(JointCommits), std::move(logChunksToCommit));
Expand Down

0 comments on commit f315577

Please sign in to comment.