Skip to content

Commit

Permalink
Merge c64aab8 into 53f75b6
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga authored May 30, 2024
2 parents 53f75b6 + c64aab8 commit 88e144a
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 8 deletions.
4 changes: 2 additions & 2 deletions ydb/core/engine/minikql/minikql_engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ bool TEngineHost::IsValidKey(TKeyDesc& key) const {
return NMiniKQL::IsValidKey(Scheme, localTableId, key);
}
ui64 TEngineHost::CalculateReadSize(const TVector<const TKeyDesc*>& keys) const {
NTable::TSizeEnv env;
auto env = Db.CreateSizeEnv();

for (const TKeyDesc* ki : keys) {
DoCalculateReadSize(*ki, env);
Expand Down Expand Up @@ -120,7 +120,7 @@ ui64 TEngineHost::CalculateResultSize(const TKeyDesc& key) const {
if (key.Range.Point) {
return Db.EstimateRowSize(localTid);
} else {
NTable::TSizeEnv env;
auto env = Db.CreateSizeEnv();
DoCalculateReadSize(key, env);
ui64 size = env.GetSize();

Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tablet_flat/flat_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ TSelectRowVersionResult TDatabase::SelectRowVersion(
return Require(table)->SelectRowVersion(key, Env, readFlags, visible, observer);
}

TSizeEnv TDatabase::CreateSizeEnv()
{
return TSizeEnv(Env);
}


void TDatabase::CalculateReadSize(TSizeEnv& env, ui32 table, TRawVals minKey, TRawVals maxKey,
TTagsRef tags, ui64 flg, ui64 items, ui64 bytes,
EDirection direction, TRowVersion snapshot)
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tablet_flat/flat_database.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class TDatabase {
EDirection direction = EDirection::Forward,
TRowVersion snapshot = TRowVersion::Max());

TSizeEnv CreateSizeEnv();
void CalculateReadSize(TSizeEnv& env, ui32 table, TRawVals minKey, TRawVals maxKey,
TTagsRef tags, ui64 readFlags, ui64 itemsLimit, ui64 bytesLimit,
EDirection direction = EDirection::Forward,
Expand Down
31 changes: 25 additions & 6 deletions ydb/core/tablet_flat/flat_dbase_sz_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ namespace NTable {
struct TSizeEnv : public IPages {
using TInfo = NTabletFlatExecutor::TPrivatePageCache::TInfo;

TSizeEnv(IPages* env)
: Env(env)
{
}

TResult Locate(const TMemTable*, ui64, ui32) noexcept override
{
Y_ABORT("IPages::Locate(TMemTable*, ...) shouldn't be used here");
Expand All @@ -20,32 +25,46 @@ namespace NTable {
{
auto *partStore = CheckedCast<const NTable::TPartStore*>(part);

return { true, Touch(partStore->Locate(lob, ref), ref) };
AddPageSize(partStore->Locate(lob, ref), ref);

return { true, nullptr };
}

const TSharedData* TryGetPage(const TPart* part, TPageId page, TGroupId groupId) override
const TSharedData* TryGetPage(const TPart* part, TPageId pageId, TGroupId groupId) override
{
auto *partStore = CheckedCast<const NTable::TPartStore*>(part);

return Touch(partStore->PageCollections.at(groupId.Index).Get(), page);
auto info = partStore->PageCollections.at(groupId.Index).Get();
auto type = EPage(info->PageCollection->Page(pageId).Type);

switch (type) {
case EPage::Index:
case EPage::BTreeIndex:
// need index pages to continue counting
// do not count index
// if these pages are not in memory, data won't be counted in precharge
return Env->TryGetPage(part, pageId, groupId);
default:
AddPageSize(partStore->PageCollections.at(groupId.Index).Get(), pageId);
return nullptr;
}
}

ui64 GetSize() const {
return Bytes;
}

private:
const TSharedData* Touch(TInfo *info, TPageId page) noexcept
void AddPageSize(TInfo *info, TPageId page) noexcept
{
if (Touched[info].insert(page).second) {
Pages++;
Bytes += info->PageCollection->Page(page).Size;
}

return nullptr;
}

private:
IPages* Env;
THashMap<const void*, THashSet<TPageId>> Touched;
ui64 Pages = 0;
ui64 Bytes = 0;
Expand Down
73 changes: 73 additions & 0 deletions ydb/core/tablet_flat/flat_executor_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "flat_dbase_sz_env.h"
#include "flat_executor_ut_common.h"

namespace NKikimr {
Expand Down Expand Up @@ -4979,6 +4980,41 @@ Y_UNIT_TEST_SUITE(TFlatTableSnapshotWithCommits) {

Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {

struct TTxCalculateReadSize : public ITransaction {
ui32 Attempt = 0;
TVector<ui64>& ReadSizes;
ui64 MinKey, MaxKey;

TTxCalculateReadSize(TVector<ui64>& readSizes, ui64 minKey, ui64 maxKey)
: ReadSizes(readSizes)
, MinKey(minKey)
, MaxKey(maxKey)
{
ReadSizes.clear();
}


bool Execute(TTransactionContext &txc, const TActorContext &) override
{
UNIT_ASSERT_LE(++Attempt, 10);

const auto minKey = NScheme::TInt64::TInstance(MinKey);
const auto maxKey = NScheme::TInt64::TInstance(MaxKey);
const TVector<NTable::TTag> tags{ { TRowsModel::ColumnKeyId, TRowsModel::ColumnValueId } };

auto sizeEnv = txc.DB.CreateSizeEnv();
txc.DB.CalculateReadSize(sizeEnv, TRowsModel::TableId, { minKey }, { maxKey }, tags, 0, 0, 0);
ReadSizes.push_back(sizeEnv.GetSize());

return txc.DB.Precharge(TRowsModel::TableId, { minKey }, { maxKey }, tags, 0, 0, 0);
}

void Complete(const TActorContext &ctx) override
{
ctx.Send(ctx.SelfID, new NFake::TEvReturn);
}
};

struct TTxPrechargeAndSeek : public ITransaction {
ui32 Attempt = 0;
bool Pinned = false;
Expand Down Expand Up @@ -5020,6 +5056,43 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutorIndexLoading) {
}
};

void ZeroSharedCache(TMyEnvBase &env) {
env.Env.GetMemObserver()->NotifyStat({1, 1, 1});
TDispatchOptions options;
options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(NSharedCache::EvMem, 1));
env->DispatchEvents(options);
}

Y_UNIT_TEST(CalculateReadSize_FlatIndex) {
TMyEnvBase env;
TRowsModel rows;
const ui32 rowsCount = 1024;

env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) {
return new TTestFlatTablet(env.Edge, tablet, info);
});
env.WaitForWakeUp();
ZeroSharedCache(env);

env.SendSync(rows.MakeScheme(new TCompactionPolicy(), false));

env.SendSync(rows.MakeRows(rowsCount, 10*1024));

env.SendSync(new NFake::TEvCompact(TRowsModel::TableId));
env.WaitFor<NFake::TEvCompacted>();

TVector<ui64> sizes;

env.SendSync(new NFake::TEvExecute{ new TTxCalculateReadSize(sizes, 0, 1) });
UNIT_ASSERT_VALUES_EQUAL(sizes, (TVector<ui64>{20566, 20566}));

env.SendSync(new NFake::TEvExecute{ new TTxCalculateReadSize(sizes, 100, 200) });
UNIT_ASSERT_VALUES_EQUAL(sizes, (TVector<ui64>{1048866, 1048866}));

env.SendSync(new NFake::TEvExecute{ new TTxCalculateReadSize(sizes, 300, 700) });
UNIT_ASSERT_VALUES_EQUAL(sizes, (TVector<ui64>{4133766, 4133766}));
}

Y_UNIT_TEST(TestPrechargeAndSeek) {
TMyEnvBase env;
TRowsModel rows;
Expand Down

0 comments on commit 88e144a

Please sign in to comment.