diff --git a/ydb/core/tablet_flat/flat_fwd_cache.h b/ydb/core/tablet_flat/flat_fwd_cache.h index 15cea1b2bc31..b76933f941a9 100644 --- a/ydb/core/tablet_flat/flat_fwd_cache.h +++ b/ydb/core/tablet_flat/flat_fwd_cache.h @@ -1,10 +1,13 @@ #pragma once #include "flat_part_iface.h" -#include "flat_part_forward.h" #include "flat_fwd_iface.h" #include "flat_fwd_misc.h" #include "flat_fwd_page.h" +#include "flat_part_index_iter.h" +#include "flat_part_index_iter_iface.h" +#include "flat_table_part.h" +#include "flat_part_slice.h" namespace NKikimr { namespace NTable { @@ -58,8 +61,16 @@ namespace NFwd { TCache() = delete; TCache(const TPart* part, IPages* env, TGroupId groupId, const TIntrusiveConstPtr& bounds = nullptr) - : Index(part, env, groupId, 1, bounds) - { } + : Index(MakeHolder(part, env, groupId)) // TODO: use CreateIndexIter(part, env, groupId) + { + if (bounds && !bounds->empty()) { + BeginRowId = bounds->front().BeginRowId(); + EndRowId = bounds->back().EndRowId(); + } else { + BeginRowId = 0; + EndRowId = Index->GetEndRowId(); + } + } ~TCache() { @@ -70,23 +81,35 @@ namespace NFwd { TResult Handle(IPageLoadingQueue *head, TPageId pageId, ui64 lower) noexcept override { - Y_ABORT_UNLESS(pageId != Max(), "Invalid requested pageId"); + Y_ABORT_UNLESS(pageId != Max(), "Requested page is invalid"); if (auto *page = Trace.Get(pageId)) { return { page, false, true }; } - Rewind(pageId); /* points Offset to pageId */ + DropPagesBefore(pageId); Shrink(); - bool more = Grow && (OnHold + OnFetch <= lower); + bool grow = OnHold + OnFetch <= lower; + + if (Offset == Pages.size()) { // isn't processed yet + SyncIndex(pageId); + AddToQueue(head, pageId); + } + + grow &= Index->IsValid() && Index->GetRowId() < EndRowId; - return { Preload(head, 0).Touch(pageId, Stat), more, true }; + return {Pages.at(Offset).Touch(pageId, Stat), grow, true}; } void Forward(IPageLoadingQueue *head, ui64 upper) noexcept override { - Preload(head, upper); + Y_ABORT_UNLESS(Started, "Couldn't be called before Handle returns grow"); + + while (OnHold + OnFetch < upper && Index->IsValid() && Index->GetRowId() < EndRowId) { + AddToQueue(head, Index->GetPageId()); + Y_ABORT_UNLESS(Index->Next() != EReady::Page); + } } void Apply(TArrayRef loaded) noexcept override @@ -117,35 +140,16 @@ namespace NFwd { } private: - TPage& Preload(IPageLoadingQueue *head, ui64 upper) noexcept - { - auto until = [this, upper]() { - return OnHold + OnFetch < upper ? Max() : 0; - }; - - while (auto more = Index.More(until())) { - auto size = head->AddToQueue(more, EPage::DataPage); - - Stat.Fetch += size; - OnFetch += size; - - Pages.emplace_back(more, size, 0, Max()); - Pages.back().Fetch = EFetch::Wait; - } - - Grow = Grow && Index.On(true) < Max(); - - return Pages.at(Offset); - } - - void Rewind(TPageId pageId) noexcept + void DropPagesBefore(TPageId pageId) noexcept { - while (auto drop = Index.Clean(pageId)) { + while (Offset < Pages.size()) { auto &page = Pages.at(Offset); - if (!Pages || page.PageId != drop.PageId) { - Y_ABORT("Dropping page that is not exist in cache"); - } else if (page.Size == 0) { + if (page.PageId >= pageId) { + break; + } + + if (page.Size == 0) { Y_ABORT("Dropping page that has not been touched"); } else if (page.Usage == EUsage::Keep && page) { OnHold -= Trace.Emplace(page); @@ -166,13 +170,42 @@ namespace NFwd { } } + void SyncIndex(TPageId pageId) noexcept + { + if (!Started) { + Y_ABORT_UNLESS(Index->Seek(BeginRowId) == EReady::Data); + Y_ABORT_UNLESS(Index->GetPageId() <= pageId, "Requested page is out of slice bounds"); + Started = true; + } + + while (Index->IsValid() && Index->GetPageId() < pageId) { + Y_ABORT_UNLESS(Index->Next() == EReady::Data); + Y_ABORT_UNLESS(Index->GetRowId() < EndRowId, "Requested page is out of slice bounds"); + } + + Y_ABORT_UNLESS(Index->GetPageId() == pageId, "Requested page doesn't belong to the part"); + Y_ABORT_UNLESS(Index->Next() != EReady::Page); + } + + void AddToQueue(IPageLoadingQueue *head, TPageId pageId) noexcept + { + auto size = head->AddToQueue(pageId, EPage::DataPage); + + Stat.Fetch += size; + OnFetch += size; + + Y_ABORT_UNLESS(!Pages || Pages.back().PageId < pageId); + Pages.emplace_back(pageId, size, 0, Max()); + Pages.back().Fetch = EFetch::Wait; + } + private: - bool Grow = true; /* Have some pages for Forward(...) */ - TForward Index; + THolder Index; /* Points on next to load page */ + bool Started = false; + TRowId BeginRowId, EndRowId; TLoadedPagesCircularBuffer Trace; /*_ Forward cache line state */ - ui64 OnHold = 0; ui64 OnFetch = 0; ui32 Offset = 0; diff --git a/ydb/core/tablet_flat/flat_fwd_env.h b/ydb/core/tablet_flat/flat_fwd_env.h index 7432617d3085..ed8776e3871a 100644 --- a/ydb/core/tablet_flat/flat_fwd_env.h +++ b/ydb/core/tablet_flat/flat_fwd_env.h @@ -10,10 +10,8 @@ #include "flat_table_subset.h" #include "flat_part_iface.h" #include "flat_part_store.h" -#include "flat_sausage_packet.h" #include "flat_sausage_fetch.h" #include "util_fmt_abort.h" -#include "util_basics.h" #include #include #include diff --git a/ydb/core/tablet_flat/flat_part_forward.h b/ydb/core/tablet_flat/flat_part_forward.h deleted file mode 100644 index eb1acc50f17b..000000000000 --- a/ydb/core/tablet_flat/flat_part_forward.h +++ /dev/null @@ -1,135 +0,0 @@ -#pragma once - -#include "flat_part_index_iter.h" -#include "flat_table_part.h" -#include "flat_page_index.h" -#include "flat_part_slice.h" - -namespace NKikimr { -namespace NTable { - - class TForward { - using TIndex = NPage::TIndex; - using TGroupId = NPage::TGroupId; - using TPageId = NPage::TPageId; - using TIter = TIndex::TIter; - - public: - struct TResult { - TResult(TPageId pageId) noexcept : PageId(pageId) { } - - explicit operator bool() const noexcept - { - return PageId != Max(); - } - - operator TPageId() const noexcept - { - return PageId; - } - - TPageId PageId = Max(); - }; - - explicit TForward(const TPart* part, IPages* env, TGroupId groupId, ui32 trace, const TIntrusiveConstPtr& bounds = nullptr) - : Trace(Max(ui32(1), trace)) - , Index(part, env, groupId) - { - if (bounds && !bounds->empty()) { - BeginBoundRowId = bounds->front().BeginRowId(); - EndBoundRowId = bounds->back().EndRowId(); - } else { - BeginBoundRowId = 0; - EndBoundRowId = Max(); - } - } - - inline TPageId On(bool head = false) noexcept - { - EnsureStarted(); - return PageOf(head ? Head : Edge); - } - - TResult Clean(TPageId until) noexcept - { - EnsureStarted(); - - if (PageOf(Tail) > until) { - Y_ABORT("Part lookups goes below of its trace pages"); - } else { - const auto edgeId = Max(PageOf(Edge), until); - - while (Edge != End && Edge->GetPageId() < until) ++Edge; - - if (PageOf(Edge) != edgeId) - Y_ABORT("Part lookup page is out of its index"); - - if (Tail == Head) Tail = Head = Edge; - } - - if (Tail != Head && Edge - Tail >= ssize_t(Trace)) { - return (Tail++)->GetPageId(); - } - - return Max(); - } - - TResult More(TPageId until) noexcept - { - EnsureStarted(); - - if (Head != End && ((Edge - Head >= 0) || PageOf(Head) < until)) { - return (Head++)->GetPageId(); - } - - return Max(); - } - - private: - void EnsureStarted() - { - if (Started) { - return; - } - - auto index = Index.TryLoadRaw(); - - // temporary solution: its too hard to handle index page faults in Clean/More methods - Y_ABORT_UNLESS(index, "Index should have been loaded before using TForward"); - - Tail = (*index)->Begin(); - Head = (*index)->Begin(); - Edge = (*index)->Begin(); - End = (*index)->End(); - - if (BeginBoundRowId > 0) { - // Find the first page we would allow to load - Tail = Head = Edge = index->LookupRow(BeginBoundRowId); - } - if (EndBoundRowId < index->GetEndRowId()) { - // Find the first page we would refuse to load - End = index->LookupRow(EndBoundRowId); - if (End && End->GetRowId() < EndBoundRowId) { - ++End; - } - } - - Started = true; - } - - inline TPageId PageOf(const TIter &it) const - { - return it == End ? Max() : it->GetPageId(); - } - - public: - const ui32 Trace = 0; - - private: - TPartIndexIt Index; - bool Started = false; - TIter Tail, Head, Edge, End; - TRowId BeginBoundRowId, EndBoundRowId; - }; -} -} diff --git a/ydb/core/tablet_flat/test/libs/table/test_part.h b/ydb/core/tablet_flat/test/libs/table/test_part.h index 7474d9485413..04a5292388f2 100644 --- a/ydb/core/tablet_flat/test/libs/table/test_part.h +++ b/ydb/core/tablet_flat/test/libs/table/test_part.h @@ -1,14 +1,15 @@ #pragma once #include "test_store.h" +#include +#include +#include +#include +#include +#include #include #include #include -#include -#include -#include -#include -#include #include #include diff --git a/ydb/core/tablet_flat/test/libs/table/test_steps.h b/ydb/core/tablet_flat/test/libs/table/test_steps.h index ba265d5f1d04..38ef7d151f8b 100644 --- a/ydb/core/tablet_flat/test/libs/table/test_steps.h +++ b/ydb/core/tablet_flat/test/libs/table/test_steps.h @@ -1,5 +1,6 @@ #pragma once +#include "util/string/builder.h" #include #include @@ -21,13 +22,15 @@ namespace NTest { IOutputStream& Log() const noexcept { - Cerr << "On " << Seq << ": "; + Cerr << CurrentStepStr() << ": "; return Cerr; } size_t CurrentStep() const noexcept { return Seq; } + TString CurrentStepStr() const noexcept { return TStringBuilder() << "On " << CurrentStep(); } + private: size_t Seq = 0; }; diff --git a/ydb/core/tablet_flat/ut/ut_forward.cpp b/ydb/core/tablet_flat/ut/ut_forward.cpp index 2c42a67373ab..45b5cb8bf98a 100644 --- a/ydb/core/tablet_flat/ut/ut_forward.cpp +++ b/ydb/core/tablet_flat/ut/ut_forward.cpp @@ -186,7 +186,7 @@ namespace { Grow = Grow || got.Grow; - UNIT_ASSERT_VALUES_EQUAL_C(RescaleStat(Cache->Stat), stat, TStringBuilder() << CurrentStep()); + UNIT_ASSERT_VALUES_EQUAL_C(RescaleStat(Cache->Stat), stat, CurrentStepStr()); return *this; } @@ -201,7 +201,7 @@ namespace { for (auto pageIndex : pageIndexes) { pageIds.push_back(IndexTools::GetPageId(*Part, pageIndex)); } - UNIT_ASSERT_VALUES_EQUAL_C(TVector(Queue.begin(), Queue.end()), pageIds, TStringBuilder() << CurrentStep()); + UNIT_ASSERT_VALUES_EQUAL_C(TVector(Queue.begin(), Queue.end()), pageIds, CurrentStepStr()); TVector load; NTest::TTestEnv testEnv; @@ -213,7 +213,7 @@ namespace { Cache->Apply(load); - UNIT_ASSERT_VALUES_EQUAL_C(RescaleStat(Cache->Stat), stat, TStringBuilder() << CurrentStep()); + UNIT_ASSERT_VALUES_EQUAL_C(RescaleStat(Cache->Stat), stat, CurrentStepStr()); return *this; } @@ -228,9 +228,9 @@ namespace { for (auto pageIndex : pageIndexes) { pageIds.push_back(IndexTools::GetPageId(*Part, pageIndex)); } - UNIT_ASSERT_VALUES_EQUAL_C(TVector(Queue.begin(), Queue.end()), pageIds, TStringBuilder() << CurrentStep()); + UNIT_ASSERT_VALUES_EQUAL_C(TVector(Queue.begin(), Queue.end()), pageIds, CurrentStepStr()); - UNIT_ASSERT_VALUES_EQUAL_C(RescaleStat(Cache->Stat), stat, TStringBuilder() << CurrentStep()); + UNIT_ASSERT_VALUES_EQUAL_C(RescaleStat(Cache->Stat), stat, CurrentStepStr()); return *this; } @@ -570,6 +570,31 @@ Y_UNIT_TEST_SUITE(NFwd) { {10, 10, 4, 2, 0}); } + Y_UNIT_TEST(Pages_Twice) + { + // 20 pages, 50 bytes each + const auto eggs = CookPart(); + + TPagesWrap wrap(eggs.Lone(), 200, 350); + + wrap.To(0).Get(5, false, true, true, + {1, 0, 1, 0, 0}); + wrap.To(1).Get(5, false, true, true, + {1, 0, 1, 0, 0}); + wrap.To(2).Fill({5, 6, 7, 8, 9, 10, 11}, + {7, 7, 1, 0, 0}); + + wrap.To(3).Get(5, true, false, true, + {7, 7, 1, 0, 0}); + wrap.To(4).Get(5, true, false, true, + {7, 7, 1, 0, 0}); + + wrap.To(5).Get(6, true, false, true, + {7, 7, 2, 0, 0}); + wrap.To(6).Get(6, true, false, true, + {7, 7, 2, 0, 0}); + } + Y_UNIT_TEST(Pages_Jump_Done) { // 20 pages, 50 bytes each @@ -694,10 +719,10 @@ Y_UNIT_TEST_SUITE(NFwd) { const auto eggs = CookPart(); TIntrusivePtr slices = new TSlices; - // pages 5 - 8 + // pages 5 - 7 slices->emplace_back(TSlice({ }, { }, 10, 16, true, false)); - // pages 9 - 11 - slices->emplace_back(TSlice({ }, { }, 19, 23, true, true)); + // pages 10 - 11 + slices->emplace_back(TSlice({ }, { }, 20, 23, true, true)); TPagesWrap wrap(eggs.Lone(), slices, 1000, 1000); @@ -705,6 +730,10 @@ Y_UNIT_TEST_SUITE(NFwd) { {1, 0, 1, 0, 0}); wrap.To(1).Fill({5, 6, 7, 8, 9, 10, 11}, {7, 7, 1, 0, 0}); + wrap.To(2).Get(10, true, false, true, + {7, 7, 2, 4, 0}); + wrap.To(3).Get(11, true, false, true, + {7, 7, 3, 4, 0}); } Y_UNIT_TEST(TLoadedPagesCircularBuffer) {