Skip to content

Commit

Permalink
Merge 1e812e1 into c661c11
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga authored Feb 21, 2024
2 parents c661c11 + 1e812e1 commit 99d56c8
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 240 deletions.
9 changes: 9 additions & 0 deletions ydb/core/tablet_flat/flat_part_index_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ class TPartIndexIt : public IIndexIter, public IStatsPartGroupIterator {
return bool(Iter);
}

void AddLastDeltaDataSize(TChanneledDataSize& dataSize) override {
Y_DEBUG_ABORT_UNLESS(Index);
Y_DEBUG_ABORT_UNLESS(Iter.Off());
TPageId pageId = (Iter - 1)->GetPageId();
ui64 delta = Part->GetPageSize(pageId, GroupId);
ui8 channel = Part->GetGroupChannel(GroupId);
dataSize.Add(delta, channel);
}

// for precharge and TForward only
TIndex* TryLoadRaw() {
return TryGetIndex();
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/tablet_flat/flat_part_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ class TPartStore : public TPart, public IBundle {
return EPage(PageCollections[groupId.Index]->PageCollection->Page(id).Type);
}

ui8 GetPageChannel(NPage::TPageId id, NPage::TGroupId groupId) const override
ui8 GetGroupChannel(NPage::TGroupId groupId) const override
{
Y_UNUSED(id);
Y_ABORT_UNLESS(groupId.Index < PageCollections.size());
return PageCollections[groupId.Index]->Id.Channel();
}
Expand Down
54 changes: 16 additions & 38 deletions ydb/core/tablet_flat/flat_stat_part.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,6 @@
namespace NKikimr {
namespace NTable {

struct TPartDataSize {
ui64 Size = 0;
TVector<ui64> ByChannel = { };

void Add(ui64 size, ui8 channel) {
Size += size;
if (!(channel < ByChannel.size())) {
ByChannel.resize(channel + 1);
}
ByChannel[channel] += size;
}
};

struct TPartDataStats {
ui64 RowCount = 0;
TPartDataSize DataSize = { };
};

// Iterates over part index and calculates total row count and data size
// This iterator skips pages that are screened. Currently the logic is simple:
// if page start key is screened then we assume that the whole previous page is screened
Expand Down Expand Up @@ -82,59 +64,62 @@ class TStatsScreenedPartIterator {
return Groups[0]->IsValid();
}

EReady Next(TPartDataStats& stats) {
EReady Next(TDataStats& stats) {
Y_ABORT_UNLESS(IsValid());

auto curPageId = Groups[0]->GetPageId();
LastRowId = Groups[0]->GetRowId();
auto ready = Groups[0]->Next();
if (ready == EReady::Page) {
Y_DEBUG_ABORT_UNLESS(false, "Shouldn't really happen");
return ready;
}

ui64 rowCount = CountUnscreenedRows(GetLastRowId(), GetCurrentRowId());
stats.RowCount += rowCount;
if (rowCount) {
AddPageSize(stats.DataSize, curPageId, TGroupId(0));
Groups[0]->AddLastDeltaDataSize(stats.DataSize);
}

TRowId nextRowId = ready == EReady::Data ? Groups[0]->GetRowId() : Max<TRowId>();
for (auto groupIndex : xrange<ui32>(1, Groups.size())) {
while (Groups[groupIndex]->IsValid() && Groups[groupIndex]->GetRowId() < nextRowId) {
// eagerly include all data up to the next row id
if (rowCount) {
AddPageSize(stats.DataSize, Groups[groupIndex]->GetPageId(), TGroupId(groupIndex));
}
if (Groups[groupIndex]->Next() == EReady::Page) {
Y_DEBUG_ABORT_UNLESS(false, "Shouldn't really happen");
ready = EReady::Page;
break;
}
if (rowCount) {
Groups[groupIndex]->AddLastDeltaDataSize(stats.DataSize);
}
}
}

if (HistoricGroups) {
Y_DEBUG_ABORT_UNLESS(Part->Scheme->HistoryGroup.ColsKeyIdx.size() == 3);
while (HistoricGroups[0]->IsValid() && (!HistoricGroups[0]->GetKeyCellsCount() || HistoricGroups[0]->GetKeyCell(0).AsValue<TRowId>() < nextRowId)) {
// eagerly include all history up to the next row id
if (rowCount) {
AddPageSize(stats.DataSize, HistoricGroups[0]->GetPageId(), TGroupId(0, true));
}
if (HistoricGroups[0]->Next() == EReady::Page) {
Y_DEBUG_ABORT_UNLESS(false, "Shouldn't really happen");
ready = EReady::Page;
break;
}
if (rowCount) {
HistoricGroups[0]->AddLastDeltaDataSize(stats.DataSize);
}
}
TRowId nextHistoryRowId = HistoricGroups[0]->IsValid() ? HistoricGroups[0]->GetRowId() : Max<TRowId>();
for (auto groupIndex : xrange<ui32>(1, Groups.size())) {
while (HistoricGroups[groupIndex]->IsValid() && HistoricGroups[groupIndex]->GetRowId() < nextHistoryRowId) {
// eagerly include all data up to the next row id
if (rowCount) {
AddPageSize(stats.DataSize, HistoricGroups[groupIndex]->GetPageId(), TGroupId(groupIndex, true));
}
if (HistoricGroups[groupIndex]->Next() == EReady::Page) {
Y_DEBUG_ABORT_UNLESS(false, "Shouldn't really happen");
ready = EReady::Page;
break;
}
if (rowCount) {
HistoricGroups[groupIndex]->AddLastDeltaDataSize(stats.DataSize);
}
}
}
}
Expand Down Expand Up @@ -174,13 +159,6 @@ class TStatsScreenedPartIterator {
return LastRowId;
}

void AddPageSize(TPartDataSize& stats, TPageId pageId, TGroupId groupId) const {
// TODO: move to IStatsPartGroupIterator
ui64 size = Part->GetPageSize(pageId, groupId);
ui8 channel = Part->GetPageChannel(pageId, groupId);
stats.Add(size, channel);
}

void FillKey() {
CurrentKey.clear();

Expand Down Expand Up @@ -229,7 +207,7 @@ class TStatsScreenedPartIterator {
return rowCount;
}

void AddBlobsSize(TPartDataSize& stats, const TFrames* frames, ELargeObj lob, ui32 &prevPage) noexcept {
void AddBlobsSize(TChanneledDataSize& stats, const TFrames* frames, ELargeObj lob, ui32 &prevPage) noexcept {
const auto row = GetLastRowId();
const auto end = GetCurrentRowId();

Expand Down
Loading

0 comments on commit 99d56c8

Please sign in to comment.