Skip to content

Commit

Permalink
Merge 7b3a1c1 into 3317b75
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Dec 19, 2024
2 parents 3317b75 + 7b3a1c1 commit 384f207
Show file tree
Hide file tree
Showing 87 changed files with 1,374 additions and 1,603 deletions.
40 changes: 17 additions & 23 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,20 @@ extern "C" {

namespace NKikimr {
namespace NKqp {

TString GetConfigProtoWithName(const TString & tierName) {
return TStringBuilder() << "Name : \"" << tierName << "\"\n" <<
R"(
ObjectStorage : {
Endpoint: "fake"
Bucket: "fake"
SecretableAccessKey: {
Value: {
Data: "secretAccessKey"
}
}
SecretableSecretKey: {
Value: {
Data: "fakeSecret"
}
}
}
)";
}

using namespace NYdb;

TTestHelper::TTestHelper(const TKikimrSettings& settings) {
TKikimrSettings kikimrSettings(settings);
if (!kikimrSettings.FeatureFlags.HasEnableTieringInColumnShard()) {
kikimrSettings.SetEnableTieringInColumnShard(true);
}
if (!kikimrSettings.FeatureFlags.HasEnableExternalDataSources()) {
kikimrSettings.SetEnableExternalDataSources(true);
}

Kikimr = std::make_unique<TKikimrRunner>(kikimrSettings);
TableClient = std::make_unique<NYdb::NTable::TTableClient>(Kikimr->GetTableClient());
TableClient =
std::make_unique<NYdb::NTable::TTableClient>(Kikimr->GetTableClient(NYdb::NTable::TClientSettings().AuthToken("root@builtin")));
Session = std::make_unique<NYdb::NTable::TSession>(TableClient->CreateSession().GetValueSync().GetSession());
}

Expand All @@ -64,7 +47,18 @@ namespace NKqp {
}

void TTestHelper::CreateTier(const TString& tierName) {
auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync();
auto result = GetSession().ExecuteSchemeQuery(R"(
UPSERT OBJECT `accessKey` (TYPE SECRET) WITH (value = `secretAccessKey`);
UPSERT OBJECT `secretKey` (TYPE SECRET) WITH (value = `fakeSecret`);
CREATE EXTERNAL DATA SOURCE `)" + tierName + R"(` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="http://fake.fake/fake",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="accessKey",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="secretKey",
AWS_REGION="ru-central1"
);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/ut/common/kqp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ class TKikimrRunner {

NYdb::TDriverConfig GetDriverConfig() const { return DriverConfig; }

NYdb::NTable::TTableClient GetTableClient() const {
return NYdb::NTable::TTableClient(*Driver, NYdb::NTable::TClientSettings()
.UseQueryCache(false));
NYdb::NTable::TTableClient GetTableClient(
NYdb::NTable::TClientSettings settings = NYdb::NTable::TClientSettings()) const {
return NYdb::NTable::TTableClient(*Driver, settings.UseQueryCache(false));
}

NYdb::NQuery::TQueryClient GetQueryClient(
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/ut/olap/tiering_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TTestEvictionBase {
UNIT_ASSERT_GT(columnRawBytes, 0);
}

TestHelper->SetTiering("/Root/olapStore/olapTable", "tier1", "timestamp");
TestHelper->SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
csController->WaitActualization(TDuration::Seconds(5));

{
Expand All @@ -82,7 +82,7 @@ class TTestEvictionBase {

auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "tier1");
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL_C(GetUint64(rows[0].at("RawBytes")), columnRawBytes,
TStringBuilder() << "RawBytes changed after eviction: before=" << columnRawBytes
<< " after=" << GetUint64(rows[0].at("RawBytes")));
Expand Down Expand Up @@ -121,7 +121,7 @@ class TTestEvictionResetTiering : public TTestEvictionBase {
class TTestEvictionIncreaseDuration : public TTestEvictionBase {
private:
void UnevictAll() {
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P30000D") TO EXTERNAL DATA SOURCE tier1 ON timestamp)";
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P30000D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON timestamp)";
auto result = TestHelper->GetSession().ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
Expand Down Expand Up @@ -152,18 +152,18 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) {
testHelper.CreateTier("tier1");

{
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE tier1 ON unknown_column;)";
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON unknown_column;)";
auto result = testHelper.GetSession().ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
}

{
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE tier1 ON uid;)";
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON uid;)";
auto result = testHelper.GetSession().ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
}

testHelper.SetTiering("/Root/olapStore/olapTable", "tier1", "timestamp");
testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
}
}

Expand Down
18 changes: 9 additions & 9 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5468,7 +5468,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE tier1 ON Key
TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE `/Root/tier1` ON Key
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
Expand All @@ -5479,12 +5479,12 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(desc.GetTableDescription().GetTtlSettings());
auto ttl = desc.GetTableDescription().GetTtlSettings();
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TDateTypeColumnModeSettings>(ttl->GetTiers()[0].GetExpression()).GetExpireAfter(), TDuration::Seconds(10));
}
auto query2 = TStringBuilder() << R"(
--!syntax_v1
ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE tier2 ON Key);)";
ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE `/Root/tier2` ON Key);)";
result = session.ExecuteSchemeQuery(query2).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

Expand All @@ -5495,7 +5495,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(desc.GetTableDescription().GetTtlSettings());
auto ttl = desc.GetTableDescription().GetTtlSettings();
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "tier2");
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "/Root/tier2");
UNIT_ASSERT_VALUES_EQUAL(std::get<TDateTypeColumnModeSettings>(ttl->GetTiers()[0].GetExpression()).GetExpireAfter(), TDuration::Seconds(10));
}

Expand All @@ -5515,7 +5515,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {

auto query4 = TStringBuilder() << R"(
--!syntax_v1
ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE tier1 ON Key);)";
ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE `/Root/tier1` ON Key);)";
result = session.ExecuteSchemeQuery(query4).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

Expand All @@ -5526,7 +5526,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(desc.GetTableDescription().GetTtlSettings());
auto ttl = desc.GetTableDescription().GetTtlSettings();
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).GetStorage(), "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TDateTypeColumnModeSettings>(ttl->GetTiers()[0].GetExpression()).GetExpireAfter(), TDuration::Seconds(10));
}

Expand Down Expand Up @@ -8441,7 +8441,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.BulkUpsert(testTable, tableInserter);
}

testHelper.SetTiering(tableName, "tier1", "created_at");
testHelper.SetTiering(tableName, "/Root/tier1", "created_at");

while (csController->GetTieringUpdates().Val() == 0) {
Cout << "Wait tiering..." << Endl;
Expand Down Expand Up @@ -8509,7 +8509,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
UNIT_ASSERT_VALUES_EQUAL(description.GetTtlSettings()->GetDateTypeColumn().GetExpireAfter(), TDuration::Hours(1));
}
{
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`SET (TTL = Interval(\"PT10S\") TO EXTERNAL DATA SOURCE tier1, Interval(\"PT1H\") DELETE ON created_at);";
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`SET (TTL = Interval(\"PT10S\") TO EXTERNAL DATA SOURCE `/Root/tier1`, Interval(\"PT1H\") DELETE ON created_at);";
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
Expand All @@ -8524,7 +8524,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 2);
auto evictTier = ttl->GetTiers()[0];
UNIT_ASSERT(std::holds_alternative<TTtlEvictToExternalStorageAction>(evictTier.GetAction()));
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(evictTier.GetAction()).GetStorage(), "tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(evictTier.GetAction()).GetStorage(), "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TDateTypeColumnModeSettings>(evictTier.GetExpression()).GetExpireAfter(), TDuration::Seconds(10));
auto deleteTier = ttl->GetTiers()[1];
UNIT_ASSERT(std::holds_alternative<TTtlDeleteAction>(deleteTier.GetAction()));
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/abstract/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class IBlobsStorageOperator {
virtual void DoStartGCAction(const std::shared_ptr<IBlobsGCAction>& counters) const = 0;

void StartGCAction(const std::shared_ptr<IBlobsGCAction>& action) const {
AFL_VERIFY(IsReady());
return DoStartGCAction(action);
}

Expand Down Expand Up @@ -96,14 +97,17 @@ class IBlobsStorageOperator {
}

std::shared_ptr<IBlobsDeclareRemovingAction> StartDeclareRemovingAction(const NBlobOperations::EConsumer consumerId) {
AFL_VERIFY(IsReady());
return DoStartDeclareRemovingAction(Counters->GetConsumerCounter(consumerId)->GetRemoveDeclareCounters());
}
std::shared_ptr<IBlobsWritingAction> StartWritingAction(const NBlobOperations::EConsumer consumerId) {
AFL_VERIFY(IsReady());
auto result = DoStartWritingAction();
result->SetCounters(Counters->GetConsumerCounter(consumerId)->GetWriteCounters());
return result;
}
std::shared_ptr<IBlobsReadingAction> StartReadingAction(const NBlobOperations::EConsumer consumerId) {
AFL_VERIFY(IsReady());
auto result = DoStartReadingAction();
result->SetCounters(Counters->GetConsumerCounter(consumerId)->GetReadCounters());
return result;
Expand All @@ -129,6 +133,8 @@ class IBlobsStorageOperator {
CurrentGCAction = task;
return CurrentGCAction;
}

virtual bool IsReady() const = 0;
};

}
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/bs/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class TOperator: public IBlobsStorageOperator {
virtual std::shared_ptr<IBlobInUseTracker> GetBlobsTracker() const override {
return Manager;
}

virtual bool IsReady() const override {
return true;
}
};

}
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/local/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class TOperator: public IBlobsStorageOperator {
return false;
}

virtual bool IsReady() const override {
return true;
}
};

}
19 changes: 8 additions & 11 deletions ydb/core/tx/columnshard/blobs_action/tier/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ void TOperator::DoStartGCAction(const std::shared_ptr<IBlobsGCAction>& action) c
}

void TOperator::InitNewExternalOperator(const NColumnShard::NTiers::TManager* tierManager) {
NKikimrSchemeOp::TS3Settings settings;
if (tierManager) {
settings = tierManager->GetS3Settings();
} else {
settings.SetEndpoint("nowhere");
if (!tierManager || !tierManager->IsReady()) {
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings.reset();
ExternalStorageOperator = nullptr;
return;
}

NKikimrSchemeOp::TS3Settings settings = tierManager->GetS3Settings();
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings.SerializeAsString()) {
Expand Down Expand Up @@ -103,12 +105,7 @@ TOperator::TOperator(const TString& storageId, const TActorId& shardActorId, con

void TOperator::DoOnTieringModified(const std::shared_ptr<NColumnShard::ITiersManager>& tiers) {
auto* tierManager = tiers->GetManagerOptional(TBase::GetStorageId());
if (tierManager) {
InitNewExternalOperator(tierManager);
} else {
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
ExternalStorageOperator = nullptr;
}
InitNewExternalOperator(tierManager);
}

bool TOperator::DoLoad(IBlobManagerDb& dbBlobs) {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/tier/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class TOperator: public IBlobsStorageOperator {
return GCInfo->HasToDelete(blobId, tabletId);
}

virtual bool IsReady() const override {
return !!ExternalStorageOperator;
}
};

}
41 changes: 30 additions & 11 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,17 @@ void TColumnShard::BecomeBroken(const TActorContext& ctx) {
}

void TColumnShard::SwitchToWork(const TActorContext& ctx) {
ProgressTxController->OnTabletInit();
{
const TLogContextGuard gLogging =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletID())("self_id", SelfId())("process", "SwitchToWork");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SwitchToWork");

for (const auto& [pathId, tiering] : TablesManager.GetTtl()) {
THashSet<TString> tiers;
for (const auto& [name, config] : tiering.GetTierByName()) {
tiers.emplace(name);
}
ActivateTiering(pathId, tiers);
}

Become(&TThis::StateWork);
SignalTabletActive(ctx);
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "initialize_shard")("step", "SignalTabletActive");
TryRegisterMediatorTimeCast();
EnqueueProgressTx(ctx, std::nullopt);
OnTieringModified();
}
Counters.GetCSCounters().OnIndexMetadataLimit(NOlap::IColumnEngine::GetMetadataLimit());
EnqueueBackgroundActivities();
Expand All @@ -87,6 +80,20 @@ void TColumnShard::SwitchToWork(const TActorContext& ctx) {
NYDBTest::TControllers::GetColumnShardController()->OnSwitchToWork(TabletID());
AFL_VERIFY(!!StartInstant);
Counters.GetCSCounters().Initialization.OnSwitchToWork(TMonotonic::Now() - *StartInstant, TMonotonic::Now() - CreateInstant);
NYDBTest::TControllers::GetColumnShardController()->OnTabletInitCompleted(*this);
}

bool TColumnShard::TrySwitchToWork(const TActorContext& ctx) {
if (!Tiers->AreConfigsComplete()) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "skip_switch_to_work")("reason", "tiering_metadata_not_ready");
return false;
}
if (!IsTxInitFinished) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "skip_switch_to_work")("reason", "db_reading_not_finished");
return false;
}
SwitchToWork(ctx);
return true;
}

void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
Expand All @@ -104,8 +111,10 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
ctx.Send(selfActorId, new TEvPrivate::TEvTieringModified);
});
Tiers->Start(Tiers);
if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) {
Tiers->TakeConfigs(NYDBTest::TControllers::GetColumnShardController()->GetFallbackTiersSnapshot(), nullptr);
if (const auto& tiersSnapshot = NYDBTest::TControllers::GetColumnShardController()->GetOverrideTierConfigs(); !tiersSnapshot.empty()) {
for (const auto& [id, tier] : tiersSnapshot) {
Tiers->UpdateTierConfig(tier, id, false);
}
}
BackgroundSessionsManager = std::make_shared<NOlap::NBackground::TSessionsManager>(
std::make_shared<NBackground::TAdapter>(selfActorId, (NOlap::TTabletId)TabletID(), *this));
Expand All @@ -124,10 +133,20 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
}

void TColumnShard::Handle(TEvPrivate::TEvTieringModified::TPtr& /*ev*/, const TActorContext& /*ctx*/) {
if (const auto& tiersSnapshot = NYDBTest::TControllers::GetColumnShardController()->GetOverrideTierConfigs(); !tiersSnapshot.empty()) {
for (const auto& [id, tier] : tiersSnapshot) {
Tiers->UpdateTierConfig(tier, id, false);
}
}

OnTieringModified();
NYDBTest::TControllers::GetColumnShardController()->OnTieringModified(Tiers);
}

void TColumnShard::HandleInit(TEvPrivate::TEvTieringModified::TPtr& /*ev*/, const TActorContext& ctx) {
TrySwitchToWork(ctx);
}

void TColumnShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr& ev, const TActorContext&) {
auto tabletId = ev->Get()->TabletId;
auto clientId = ev->Get()->ClientId;
Expand Down
Loading

0 comments on commit 384f207

Please sign in to comment.