Skip to content

Commit

Permalink
KIKIMR-19139 Load index in TForward (temporary solution)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga committed Sep 26, 2023
1 parent 0ea90c0 commit 5ad1dcc
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 34 deletions.
5 changes: 3 additions & 2 deletions ydb/core/tablet_flat/flat_fwd_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace NTable {
namespace NFwd {

class TCache : public IPageLoadingLogic {
using TGroupId = NPage::TGroupId;

template<size_t Items>
struct TRound {
Expand Down Expand Up @@ -57,8 +58,8 @@ namespace NFwd {
public:
TCache() = delete;

TCache(const NPage::TIndex& index, const TIntrusiveConstPtr<TSlices>& bounds = nullptr)
: Index(index, 1, bounds)
TCache(const TPart* part, IPages* env, TGroupId groupId, const TIntrusiveConstPtr<TSlices>& bounds = nullptr)
: Index(part, env, groupId, 1, bounds)
{ }

~TCache()
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/tablet_flat/flat_fwd_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,18 +348,16 @@ namespace NFwd {
}
}

TEgg MakeCache(const TPart *part, NPage::TGroupId groupId, TIntrusiveConstPtr<TSlices> bounds) const noexcept
TEgg MakeCache(const TPart *part, NPage::TGroupId groupId, TIntrusiveConstPtr<TSlices> bounds) noexcept
{
auto *partStore = dynamic_cast<const TPartStore*>(part);

Y_VERIFY(groupId.Index < partStore->PageCollections.size(), "Got part without enough page collections");

auto& cache = partStore->PageCollections[groupId.Index];
auto& index = part->GetGroupIndex(groupId);
if (cache->PageCollection->Total() < index.UpperPage())
Y_FAIL("Part index last page is out of storage capacity");

return { new NFwd::TCache(index, bounds), cache->PageCollection };

auto* fwd = new NFwd::TCache(part, this, groupId, bounds);
return { fwd, cache->PageCollection };
}

TEgg MakeExtern(const TPart *part, TIntrusiveConstPtr<TSlices> bounds) const noexcept
Expand Down
72 changes: 50 additions & 22 deletions ydb/core/tablet_flat/flat_part_forward.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "flat_part_index_iter.h"
#include "flat_table_part.h"
#include "flat_page_index.h"
#include "flat_part_slice.h"
Expand All @@ -9,6 +10,8 @@ namespace NTable {

class TForward {
using TIndex = NPage::TIndex;
using TGroupId = NPage::TGroupId;
using TPageId = NPage::TPageId;
using TIter = TIndex::TIter;

public:
Expand All @@ -28,40 +31,29 @@ namespace NTable {
TPageId PageId = Max<TPageId>();
};

explicit TForward(const TIndex& index, ui32 trace, const TIntrusiveConstPtr<TSlices>& bounds = nullptr)
explicit TForward(const TPart* part, IPages* env, TGroupId groupId, ui32 trace, const TIntrusiveConstPtr<TSlices>& bounds = nullptr)
: Trace(Max(ui32(1), trace))
, Tail(index->Begin())
, Head(index->Begin())
, Edge(index->Begin())
, End(index->End())
, Index(part, env, groupId)
{
if (bounds && !bounds->empty()) {
if (auto rowId = bounds->front().BeginRowId(); rowId > 0) {
// Find the first page we would allow to load
Tail = Head = Edge = index.LookupRow(rowId);
}
if (auto rowId = bounds->back().EndRowId(); rowId < index.GetEndRowId()) {
// Find the first page we would refuse to load
End = index.LookupRow(rowId);
if (End && End->GetRowId() < rowId) {
++End;
}
}
BeginBoundRowId = bounds->front().BeginRowId();
EndBoundRowId = bounds->back().EndRowId();
} else {
BeginBoundRowId = 0;
EndBoundRowId = Max<TRowId>();
}
}

inline TPageId On(bool head = false) const noexcept
inline TPageId On(bool head = false) noexcept
{
EnsureStarted();
return PageOf(head ? Head : Edge);
}

inline size_t Span() const noexcept
{
return Head - Tail;
}

TResult Clean(TPageId until) noexcept
{
EnsureStarted();

if (PageOf(Tail) > until) {
Y_FAIL("Part lookups goes below of its trace pages");
} else {
Expand All @@ -84,6 +76,8 @@ namespace NTable {

TResult More(TPageId until) noexcept
{
EnsureStarted();

if (Head != End && ((Edge - Head >= 0) || PageOf(Head) < until)) {
return (Head++)->GetPageId();
}
Expand All @@ -92,6 +86,37 @@ namespace NTable {
}

private:
void EnsureStarted()
{
if (Started) {
return;
}

auto index = Index.TryLoadRaw();

// temporary solution: its too hard to handle index page faults in Clean/More methods
Y_VERIFY(index, "Index should have been loaded before using TForward");

Tail = (*index)->Begin();
Head = (*index)->Begin();
Edge = (*index)->Begin();
End = (*index)->End();

if (BeginBoundRowId > 0) {
// Find the first page we would allow to load
Tail = Head = Edge = index->LookupRow(BeginBoundRowId);
}
if (EndBoundRowId < index->GetEndRowId()) {
// Find the first page we would refuse to load
End = index->LookupRow(EndBoundRowId);
if (End && End->GetRowId() < EndBoundRowId) {
++End;
}
}

Started = true;
}

inline TPageId PageOf(const TIter &it) const
{
return it == End ? Max<TPageId>() : it->GetPageId();
Expand All @@ -101,7 +126,10 @@ namespace NTable {
const ui32 Trace = 0;

private:
TPartIndexIt Index;
bool Started = false;
TIter Tail, Head, Edge, End;
TRowId BeginBoundRowId, EndBoundRowId;
};
}
}
2 changes: 1 addition & 1 deletion ydb/core/tablet_flat/flat_part_index_iter.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class TPartIndexIt {
return bool(Iter);
}

// for precharge only
// for precharge and TForward only
TIndex* TryLoadRaw() {
return TryGetIndex();
}
Expand Down
26 changes: 25 additions & 1 deletion ydb/core/tablet_flat/flat_scan_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ namespace NTable {
}

PrepareBoots();
SeekState = ESeekState::SeekBoots;
SeekState = ESeekState::LoadIndexes;
return true;
}

Expand All @@ -356,6 +356,20 @@ namespace NTable {
return LoadingParts == 0;
}

bool LoadIndexes() noexcept
{
bool ready = true;
for (const auto& partView : Subset.Flatten) {
for (auto indexPageId : partView->IndexPages.Groups) {
ready &= bool(CurrentEnv->TryGetPage(partView.Part.Get(), indexPageId));
}
for (auto indexPageId : partView->IndexPages.Historic) {
ready &= bool(CurrentEnv->TryGetPage(partView.Part.Get(), indexPageId));
}
}
return ready;
}

void PrepareBoots() noexcept
{
auto keyDefaults = Subset.Scheme->Keys;
Expand Down Expand Up @@ -433,6 +447,9 @@ namespace NTable {
return Boots.empty();
}

/**
* @return true on page fault
*/
bool Seek() noexcept
{
switch (SeekState) {
Expand All @@ -444,6 +461,12 @@ namespace NTable {
[[fallthrough]];
case ESeekState::PrepareBoots:
PrepareBoots();
SeekState = ESeekState::LoadIndexes;
[[fallthrough]];
case ESeekState::LoadIndexes:
if (!LoadIndexes()) {
return true;
}
SeekState = ESeekState::SeekBoots;
[[fallthrough]];
case ESeekState::SeekBoots:
Expand Down Expand Up @@ -473,6 +496,7 @@ namespace NTable {
enum class ESeekState {
LoadColdParts,
PrepareBoots,
LoadIndexes,
SeekBoots,
Finished,
};
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tablet_flat/test/libs/table/test_envs.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,8 @@ namespace NTest {
for (ui32 room : xrange(partStore->Store->GetRoomCount())) {
if (room < partStore->Store->GetGroupCount()) {
NPage::TGroupId groupId(room);
slots.push_back(Settle(partStore, room, new NFwd::TCache(partStore->GetGroupIndex(groupId))));
auto *cache = new NFwd::TCache(part, this, groupId);
slots.push_back(Settle(partStore, room, cache));
} else if (room == partStore->Store->GetOuterRoom()) {
slots.push_back(Settle(partStore, room, MakeOuter(partStore)));
} else if (room == partStore->Store->GetExternRoom()) {
Expand All @@ -285,7 +286,8 @@ namespace NTest {
}
for (ui32 group : xrange(part->HistoricGroupsCount)) {
NPage::TGroupId groupId(group, true);
slots.push_back(Settle(partStore, group, new NFwd::TCache(partStore->GetGroupIndex(groupId))));
auto *cache = new NFwd::TCache(part, this, groupId);
slots.push_back(Settle(partStore, group, cache));
}
}

Expand Down

0 comments on commit 5ad1dcc

Please sign in to comment.