Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward cache simplify #2514

Merged
merged 6 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 70 additions & 37 deletions ydb/core/tablet_flat/flat_fwd_cache.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
#pragma once

#include "flat_part_iface.h"
#include "flat_part_forward.h"
#include "flat_fwd_iface.h"
#include "flat_fwd_misc.h"
#include "flat_fwd_page.h"
#include "flat_part_index_iter.h"
#include "flat_part_index_iter_iface.h"
#include "flat_table_part.h"
#include "flat_part_slice.h"

namespace NKikimr {
namespace NTable {
Expand Down Expand Up @@ -58,8 +61,16 @@ namespace NFwd {
TCache() = delete;

TCache(const TPart* part, IPages* env, TGroupId groupId, const TIntrusiveConstPtr<TSlices>& bounds = nullptr)
: Index(part, env, groupId, 1, bounds)
{ }
: Index(MakeHolder<TPartIndexIt>(part, env, groupId)) // TODO: use CreateIndexIter(part, env, groupId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

На будущее: стоит подумать о том, чтобы не аллоцировать память, если можно не аллоцировать. Например можно выделять место под разные реализации через std::variant и сохранять указатель на интерфейс. Это к тому, что в будущем в CreateIndexIter стоит передавать ещё место под сторадж итератора.

{
if (bounds && !bounds->empty()) {
BeginRowId = bounds->front().BeginRowId();
EndRowId = bounds->back().EndRowId();
} else {
BeginRowId = 0;
EndRowId = Index->GetEndRowId();
}
}

~TCache()
{
Expand All @@ -70,23 +81,35 @@ namespace NFwd {

TResult Handle(IPageLoadingQueue *head, TPageId pageId, ui64 lower) noexcept override
{
Y_ABORT_UNLESS(pageId != Max<TPageId>(), "Invalid requested pageId");
Y_ABORT_UNLESS(pageId != Max<TPageId>(), "Requested page is invalid");

if (auto *page = Trace.Get(pageId)) {
return { page, false, true };
}

Rewind(pageId); /* points Offset to pageId */
DropPagesBefore(pageId);
Shrink();

bool more = Grow && (OnHold + OnFetch <= lower);
bool grow = OnHold + OnFetch <= lower;

if (Offset == Pages.size()) { // isn't processed yet
SyncIndex(pageId);
AddToQueue(head, pageId);
}

grow &= Index->IsValid() && Index->GetRowId() < EndRowId;

return { Preload(head, 0).Touch(pageId, Stat), more, true };
return {Pages.at(Offset).Touch(pageId, Stat), grow, true};
}

void Forward(IPageLoadingQueue *head, ui64 upper) noexcept override
{
Preload(head, upper);
Y_ABORT_UNLESS(Started, "Couldn't be called before Handle returns grow");

while (OnHold + OnFetch < upper && Index->IsValid() && Index->GetRowId() < EndRowId) {
AddToQueue(head, Index->GetPageId());
Y_ABORT_UNLESS(Index->Next() != EReady::Page);
}
}

void Apply(TArrayRef<NPageCollection::TLoadedPage> loaded) noexcept override
Expand Down Expand Up @@ -117,35 +140,16 @@ namespace NFwd {
}

private:
TPage& Preload(IPageLoadingQueue *head, ui64 upper) noexcept
{
auto until = [this, upper]() {
return OnHold + OnFetch < upper ? Max<TPageId>() : 0;
};

while (auto more = Index.More(until())) {
auto size = head->AddToQueue(more, EPage::DataPage);

Stat.Fetch += size;
OnFetch += size;

Pages.emplace_back(more, size, 0, Max<TPageId>());
Pages.back().Fetch = EFetch::Wait;
}

Grow = Grow && Index.On(true) < Max<TPageId>();

return Pages.at(Offset);
}

void Rewind(TPageId pageId) noexcept
void DropPagesBefore(TPageId pageId) noexcept
{
while (auto drop = Index.Clean(pageId)) {
while (Offset < Pages.size()) {
auto &page = Pages.at(Offset);

if (!Pages || page.PageId != drop.PageId) {
Y_ABORT("Dropping page that is not exist in cache");
} else if (page.Size == 0) {
if (page.PageId >= pageId) {
break;
}

if (page.Size == 0) {
Y_ABORT("Dropping page that has not been touched");
} else if (page.Usage == EUsage::Keep && page) {
OnHold -= Trace.Emplace(page);
Expand All @@ -166,13 +170,42 @@ namespace NFwd {
}
}

void SyncIndex(TPageId pageId) noexcept
{
if (!Started) {
Y_ABORT_UNLESS(Index->Seek(BeginRowId) == EReady::Data);
Y_ABORT_UNLESS(Index->GetPageId() <= pageId, "Requested page is out of slice bounds");
Started = true;
}

while (Index->IsValid() && Index->GetPageId() < pageId) {
Y_ABORT_UNLESS(Index->Next() == EReady::Data);
Y_ABORT_UNLESS(Index->GetRowId() < EndRowId, "Requested page is out of slice bounds");
}

Y_ABORT_UNLESS(Index->GetPageId() == pageId, "Requested page doesn't belong to the part");
Y_ABORT_UNLESS(Index->Next() != EReady::Page);
}

void AddToQueue(IPageLoadingQueue *head, TPageId pageId) noexcept
{
auto size = head->AddToQueue(pageId, EPage::DataPage);

Stat.Fetch += size;
OnFetch += size;

Y_ABORT_UNLESS(!Pages || Pages.back().PageId < pageId);
Pages.emplace_back(pageId, size, 0, Max<TPageId>());
Pages.back().Fetch = EFetch::Wait;
}

private:
bool Grow = true; /* Have some pages for Forward(...) */
TForward Index;
THolder<IIndexIter> Index; /* Points on next to load page */
bool Started = false;
TRowId BeginRowId, EndRowId;
TLoadedPagesCircularBuffer<TPart::Trace> Trace;

/*_ Forward cache line state */

ui64 OnHold = 0;
ui64 OnFetch = 0;
ui32 Offset = 0;
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/tablet_flat/flat_fwd_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
#include "flat_table_subset.h"
#include "flat_part_iface.h"
#include "flat_part_store.h"
#include "flat_sausage_packet.h"
#include "flat_sausage_fetch.h"
#include "util_fmt_abort.h"
#include "util_basics.h"
#include <util/generic/deque.h>
#include <util/random/random.h>
#include <util/generic/intrlist.h>
Expand Down
135 changes: 0 additions & 135 deletions ydb/core/tablet_flat/flat_part_forward.h

This file was deleted.

11 changes: 6 additions & 5 deletions ydb/core/tablet_flat/test/libs/table/test_part.h
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#pragma once

#include "test_store.h"
#include <ydb/core/tablet_flat/flat_fwd_blobs.h>
#include <ydb/core/tablet_flat/flat_fwd_cache.h>
#include <ydb/core/tablet_flat/flat_part_iface.h>
#include <ydb/core/tablet_flat/flat_part_index_iter.h>
#include <ydb/core/tablet_flat/flat_part_laid.h>
#include <ydb/core/tablet_flat/flat_row_scheme.h>
#include <ydb/core/tablet_flat/flat_table_misc.h>
#include <ydb/core/tablet_flat/flat_table_part.h>
#include <ydb/core/tablet_flat/flat_table_subset.h>
#include <ydb/core/tablet_flat/flat_part_laid.h>
#include <ydb/core/tablet_flat/flat_part_iface.h>
#include <ydb/core/tablet_flat/flat_fwd_cache.h>
#include <ydb/core/tablet_flat/flat_fwd_blobs.h>
#include <ydb/core/tablet_flat/flat_row_scheme.h>
#include <ydb/core/tablet_flat/util_fmt_abort.h>

#include <util/generic/cast.h>
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tablet_flat/test/libs/table/test_steps.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include "util/string/builder.h"
#include <util/system/yassert.h>
#include <util/stream/output.h>

Expand All @@ -21,13 +22,15 @@ namespace NTest {

IOutputStream& Log() const noexcept
{
Cerr << "On " << Seq << ": ";
Cerr << CurrentStepStr() << ": ";

return Cerr;
}

size_t CurrentStep() const noexcept { return Seq; }

TString CurrentStepStr() const noexcept { return TStringBuilder() << "On " << CurrentStep(); }

private:
size_t Seq = 0;
};
Expand Down
Loading
Loading