Skip to content

Commit

Permalink
Forward cache simplify (#2514)
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga authored Mar 11, 2024
1 parent 3a6ff80 commit e0a15dd
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 188 deletions.
107 changes: 70 additions & 37 deletions ydb/core/tablet_flat/flat_fwd_cache.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#pragma once

#include "flat_part_iface.h"
#include "flat_part_forward.h"
#include "flat_fwd_iface.h"
#include "flat_fwd_misc.h"
#include "flat_fwd_page.h"
#include "flat_part_index_iter.h"
#include "flat_part_index_iter_iface.h"
#include "flat_table_part.h"
#include "flat_part_slice.h"

namespace NKikimr {
namespace NTable {
Expand Down Expand Up @@ -58,8 +61,16 @@ namespace NFwd {
TCache() = delete;

TCache(const TPart* part, IPages* env, TGroupId groupId, const TIntrusiveConstPtr<TSlices>& bounds = nullptr)
: Index(part, env, groupId, 1, bounds)
{ }
: Index(MakeHolder<TPartIndexIt>(part, env, groupId)) // TODO: use CreateIndexIter(part, env, groupId)
{
if (bounds && !bounds->empty()) {
BeginRowId = bounds->front().BeginRowId();
EndRowId = bounds->back().EndRowId();
} else {
BeginRowId = 0;
EndRowId = Index->GetEndRowId();
}
}

~TCache()
{
Expand All @@ -70,23 +81,35 @@ namespace NFwd {

TResult Handle(IPageLoadingQueue *head, TPageId pageId, ui64 lower) noexcept override
{
Y_ABORT_UNLESS(pageId != Max<TPageId>(), "Invalid requested pageId");
Y_ABORT_UNLESS(pageId != Max<TPageId>(), "Requested page is invalid");

if (auto *page = Trace.Get(pageId)) {
return { page, false, true };
}

Rewind(pageId); /* points Offset to pageId */
DropPagesBefore(pageId);
Shrink();

bool more = Grow && (OnHold + OnFetch <= lower);
bool grow = OnHold + OnFetch <= lower;

if (Offset == Pages.size()) { // isn't processed yet
SyncIndex(pageId);
AddToQueue(head, pageId);
}

grow &= Index->IsValid() && Index->GetRowId() < EndRowId;

return { Preload(head, 0).Touch(pageId, Stat), more, true };
return {Pages.at(Offset).Touch(pageId, Stat), grow, true};
}

void Forward(IPageLoadingQueue *head, ui64 upper) noexcept override
{
Preload(head, upper);
Y_ABORT_UNLESS(Started, "Couldn't be called before Handle returns grow");

while (OnHold + OnFetch < upper && Index->IsValid() && Index->GetRowId() < EndRowId) {
AddToQueue(head, Index->GetPageId());
Y_ABORT_UNLESS(Index->Next() != EReady::Page);
}
}

void Apply(TArrayRef<NPageCollection::TLoadedPage> loaded) noexcept override
Expand Down Expand Up @@ -117,35 +140,16 @@ namespace NFwd {
}

private:
TPage& Preload(IPageLoadingQueue *head, ui64 upper) noexcept
{
auto until = [this, upper]() {
return OnHold + OnFetch < upper ? Max<TPageId>() : 0;
};

while (auto more = Index.More(until())) {
auto size = head->AddToQueue(more, EPage::DataPage);

Stat.Fetch += size;
OnFetch += size;

Pages.emplace_back(more, size, 0, Max<TPageId>());
Pages.back().Fetch = EFetch::Wait;
}

Grow = Grow && Index.On(true) < Max<TPageId>();

return Pages.at(Offset);
}

void Rewind(TPageId pageId) noexcept
void DropPagesBefore(TPageId pageId) noexcept
{
while (auto drop = Index.Clean(pageId)) {
while (Offset < Pages.size()) {
auto &page = Pages.at(Offset);

if (!Pages || page.PageId != drop.PageId) {
Y_ABORT("Dropping page that is not exist in cache");
} else if (page.Size == 0) {
if (page.PageId >= pageId) {
break;
}

if (page.Size == 0) {
Y_ABORT("Dropping page that has not been touched");
} else if (page.Usage == EUsage::Keep && page) {
OnHold -= Trace.Emplace(page);
Expand All @@ -166,13 +170,42 @@ namespace NFwd {
}
}

void SyncIndex(TPageId pageId) noexcept
{
if (!Started) {
Y_ABORT_UNLESS(Index->Seek(BeginRowId) == EReady::Data);
Y_ABORT_UNLESS(Index->GetPageId() <= pageId, "Requested page is out of slice bounds");
Started = true;
}

while (Index->IsValid() && Index->GetPageId() < pageId) {
Y_ABORT_UNLESS(Index->Next() == EReady::Data);
Y_ABORT_UNLESS(Index->GetRowId() < EndRowId, "Requested page is out of slice bounds");
}

Y_ABORT_UNLESS(Index->GetPageId() == pageId, "Requested page doesn't belong to the part");
Y_ABORT_UNLESS(Index->Next() != EReady::Page);
}

void AddToQueue(IPageLoadingQueue *head, TPageId pageId) noexcept
{
auto size = head->AddToQueue(pageId, EPage::DataPage);

Stat.Fetch += size;
OnFetch += size;

Y_ABORT_UNLESS(!Pages || Pages.back().PageId < pageId);
Pages.emplace_back(pageId, size, 0, Max<TPageId>());
Pages.back().Fetch = EFetch::Wait;
}

private:
bool Grow = true; /* Have some pages for Forward(...) */
TForward Index;
THolder<IIndexIter> Index; /* Points on next to load page */
bool Started = false;
TRowId BeginRowId, EndRowId;
TLoadedPagesCircularBuffer<TPart::Trace> Trace;

/*_ Forward cache line state */

ui64 OnHold = 0;
ui64 OnFetch = 0;
ui32 Offset = 0;
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/tablet_flat/flat_fwd_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
#include "flat_table_subset.h"
#include "flat_part_iface.h"
#include "flat_part_store.h"
#include "flat_sausage_packet.h"
#include "flat_sausage_fetch.h"
#include "util_fmt_abort.h"
#include "util_basics.h"
#include <util/generic/deque.h>
#include <util/random/random.h>
#include <util/generic/intrlist.h>
Expand Down
135 changes: 0 additions & 135 deletions ydb/core/tablet_flat/flat_part_forward.h

This file was deleted.

11 changes: 6 additions & 5 deletions ydb/core/tablet_flat/test/libs/table/test_part.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#pragma once

#include "test_store.h"
#include <ydb/core/tablet_flat/flat_fwd_blobs.h>
#include <ydb/core/tablet_flat/flat_fwd_cache.h>
#include <ydb/core/tablet_flat/flat_part_iface.h>
#include <ydb/core/tablet_flat/flat_part_index_iter.h>
#include <ydb/core/tablet_flat/flat_part_laid.h>
#include <ydb/core/tablet_flat/flat_row_scheme.h>
#include <ydb/core/tablet_flat/flat_table_misc.h>
#include <ydb/core/tablet_flat/flat_table_part.h>
#include <ydb/core/tablet_flat/flat_table_subset.h>
#include <ydb/core/tablet_flat/flat_part_laid.h>
#include <ydb/core/tablet_flat/flat_part_iface.h>
#include <ydb/core/tablet_flat/flat_fwd_cache.h>
#include <ydb/core/tablet_flat/flat_fwd_blobs.h>
#include <ydb/core/tablet_flat/flat_row_scheme.h>
#include <ydb/core/tablet_flat/util_fmt_abort.h>

#include <util/generic/cast.h>
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tablet_flat/test/libs/table/test_steps.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "util/string/builder.h"
#include <util/system/yassert.h>
#include <util/stream/output.h>

Expand All @@ -21,13 +22,15 @@ namespace NTest {

IOutputStream& Log() const noexcept
{
Cerr << "On " << Seq << ": ";
Cerr << CurrentStepStr() << ": ";

return Cerr;
}

size_t CurrentStep() const noexcept { return Seq; }

TString CurrentStepStr() const noexcept { return TStringBuilder() << "On " << CurrentStep(); }

private:
size_t Seq = 0;
};
Expand Down
Loading

0 comments on commit e0a15dd

Please sign in to comment.