Skip to content

Commit

Permalink
Merge 78ae448 into fb2c723
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Dec 5, 2024
2 parents fb2c723 + 78ae448 commit 15ba1d6
Show file tree
Hide file tree
Showing 74 changed files with 1,140 additions and 1,449 deletions.
40 changes: 18 additions & 22 deletions ydb/core/kqp/ut/common/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,40 +5,24 @@
#include <ydb/core/formats/arrow/serializer/parsing.h>
#include <ydb/core/testlib/cs_helper.h>

#include <format>

extern "C" {
#include <yql/essentials/parser/pg_wrapper/postgresql/src/include/catalog/pg_type_d.h>
}

namespace NKikimr {
namespace NKqp {

TString GetConfigProtoWithName(const TString & tierName) {
return TStringBuilder() << "Name : \"" << tierName << "\"\n" <<
R"(
ObjectStorage : {
Endpoint: "fake"
Bucket: "fake"
SecretableAccessKey: {
Value: {
Data: "secretAccessKey"
}
}
SecretableSecretKey: {
Value: {
Data: "fakeSecret"
}
}
}
)";
}

using namespace NYdb;

TTestHelper::TTestHelper(const TKikimrSettings& settings) {
TKikimrSettings kikimrSettings(settings);
if (!kikimrSettings.FeatureFlags.HasEnableTieringInColumnShard()) {
kikimrSettings.SetEnableTieringInColumnShard(true);
}
if (!kikimrSettings.FeatureFlags.HasEnableExternalDataSources()) {
kikimrSettings.SetEnableExternalDataSources(true);
}

Kikimr = std::make_unique<TKikimrRunner>(kikimrSettings);
TableClient = std::make_unique<NYdb::NTable::TTableClient>(Kikimr->GetTableClient());
Expand All @@ -64,7 +48,19 @@ namespace NKqp {
}

void TTestHelper::CreateTier(const TString& tierName) {
auto result = GetSession().ExecuteSchemeQuery("CREATE OBJECT " + tierName + " (TYPE TIER) WITH tierConfig = `" + GetConfigProtoWithName(tierName) + "`").GetValueSync();
// auto result = GetSession().ExecuteSchemeQuery(R"(
auto result = Kikimr->GetTableClient(NYdb::NTable::TClientSettings().AuthToken("root@builtin")).GetSession().GetValueSync().GetSession().ExecuteSchemeQuery(R"(
UPSERT OBJECT `accessKey` (TYPE SECRET) WITH (value = `secretAccessKey`);
UPSERT OBJECT `secretKey` (TYPE SECRET) WITH (value = `fakeSecret`);
CREATE EXTERNAL DATA SOURCE `)" + tierName + R"(` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="http://fake.fake/fake",
AUTH_METHOD="AWS",
AWS_ACCESS_KEY_ID_SECRET_NAME="accessKey",
AWS_SECRET_ACCESS_KEY_SECRET_NAME="secretKey",
AWS_REGION="ru-central1"
);
)").GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/ut/common/kqp_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ class TKikimrRunner {

NYdb::TDriverConfig GetDriverConfig() const { return DriverConfig; }

NYdb::NTable::TTableClient GetTableClient() const {
return NYdb::NTable::TTableClient(*Driver, NYdb::NTable::TClientSettings()
.UseQueryCache(false));
NYdb::NTable::TTableClient GetTableClient(
NYdb::NTable::TClientSettings settings = NYdb::NTable::TClientSettings()) const {
return NYdb::NTable::TTableClient(*Driver, settings.UseQueryCache(false));
}

NYdb::NQuery::TQueryClient GetQueryClient(
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/ut/olap/tiering_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TTestEvictionBase {
UNIT_ASSERT_GT(columnRawBytes, 0);
}

TestHelper->SetTiering("/Root/olapStore/olapTable", "tier1", "timestamp");
TestHelper->SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
csController->WaitActualization(TDuration::Seconds(5));

{
Expand All @@ -82,7 +82,7 @@ class TTestEvictionBase {

auto rows = ExecuteScanQuery(tableClient, selectQuery);
UNIT_ASSERT_VALUES_EQUAL(rows.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "tier1");
UNIT_ASSERT_VALUES_EQUAL(GetUtf8(rows[0].at("TierName")), "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL_C(GetUint64(rows[0].at("RawBytes")), columnRawBytes,
TStringBuilder() << "RawBytes changed after eviction: before=" << columnRawBytes
<< " after=" << GetUint64(rows[0].at("RawBytes")));
Expand Down Expand Up @@ -121,7 +121,7 @@ class TTestEvictionResetTiering : public TTestEvictionBase {
class TTestEvictionIncreaseDuration : public TTestEvictionBase {
private:
void UnevictAll() {
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P30000D") TO EXTERNAL DATA SOURCE tier1 ON timestamp)";
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P30000D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON timestamp)";
auto result = TestHelper->GetSession().ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
}
Expand Down Expand Up @@ -152,18 +152,18 @@ Y_UNIT_TEST_SUITE(KqpOlapTiering) {
testHelper.CreateTier("tier1");

{
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE tier1 ON unknown_column;)";
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON unknown_column;)";
auto result = testHelper.GetSession().ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
}

{
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE tier1 ON uid;)";
const TString query = R"(ALTER TABLE `/Root/olapStore/olapTable` SET TTL Interval("P10D") TO EXTERNAL DATA SOURCE `/Root/tier1` ON uid;)";
auto result = testHelper.GetSession().ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_UNEQUAL(result.GetStatus(), NYdb::EStatus::SUCCESS);
}

testHelper.SetTiering("/Root/olapStore/olapTable", "tier1", "timestamp");
testHelper.SetTiering("/Root/olapStore/olapTable", "/Root/tier1", "timestamp");
}
}

Expand Down
18 changes: 9 additions & 9 deletions ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5441,7 +5441,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
WITH (
STORE = COLUMN,
AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10,
TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE tier1 ON Key
TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE `/Root/tier1` ON Key
);)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
Expand All @@ -5452,12 +5452,12 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(desc.GetTableDescription().GetTtlSettings());
auto ttl = desc.GetTableDescription().GetTtlSettings();
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).StorageName, "tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).StorageName, "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers()[0].GetApplyAfter(), TDuration::Seconds(10));
}
auto query2 = TStringBuilder() << R"(
--!syntax_v1
ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE tier2 ON Key);)";
ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE `/Root/tier2` ON Key);)";
result = session.ExecuteSchemeQuery(query2).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

Expand All @@ -5468,7 +5468,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(desc.GetTableDescription().GetTtlSettings());
auto ttl = desc.GetTableDescription().GetTtlSettings();
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).StorageName, "tier2");
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).StorageName, "/Root/tier2");
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers()[0].GetApplyAfter(), TDuration::Seconds(10));
}

Expand All @@ -5488,7 +5488,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {

auto query4 = TStringBuilder() << R"(
--!syntax_v1
ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE tier1 ON Key);)";
ALTER TABLE `)" << tableName << R"(` SET (TTL = Interval("PT10S") TO EXTERNAL DATA SOURCE `/Root/tier1` ON Key);)";
result = session.ExecuteSchemeQuery(query4).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());

Expand All @@ -5499,7 +5499,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) {
UNIT_ASSERT(desc.GetTableDescription().GetTtlSettings());
auto ttl = desc.GetTableDescription().GetTtlSettings();
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 1);
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).StorageName, "tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(ttl->GetTiers()[0].GetAction()).StorageName, "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers()[0].GetApplyAfter(), TDuration::Seconds(10));
}

Expand Down Expand Up @@ -8149,7 +8149,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
testHelper.BulkUpsert(testTable, tableInserter);
}

testHelper.SetTiering(tableName, "tier1", "created_at");
testHelper.SetTiering(tableName, "/Root/tier1", "created_at");

while (csController->GetTieringUpdates().Val() == 0) {
Cout << "Wait tiering..." << Endl;
Expand Down Expand Up @@ -8217,7 +8217,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
UNIT_ASSERT_VALUES_EQUAL(description.GetTtlSettings()->GetDateTypeColumn().GetExpireAfter(), TDuration::Hours(1));
}
{
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`SET (TTL = Interval(\"PT10S\") TO EXTERNAL DATA SOURCE tier1, Interval(\"PT1H\") DELETE ON created_at);";
auto alterQuery = TStringBuilder() << "ALTER TABLE `" << testTable.GetName() << "`SET (TTL = Interval(\"PT10S\") TO EXTERNAL DATA SOURCE `/Root/tier1`, Interval(\"PT1H\") DELETE ON created_at);";
auto alterResult = testHelper.GetSession().ExecuteSchemeQuery(alterQuery).GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString());
}
Expand All @@ -8232,7 +8232,7 @@ Y_UNIT_TEST_SUITE(KqpOlapScheme) {
UNIT_ASSERT_VALUES_EQUAL(ttl->GetTiers().size(), 2);
auto evictTier = ttl->GetTiers()[0];
UNIT_ASSERT(std::holds_alternative<TTtlEvictToExternalStorageAction>(evictTier.GetAction()));
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(evictTier.GetAction()).StorageName, "tier1");
UNIT_ASSERT_VALUES_EQUAL(std::get<TTtlEvictToExternalStorageAction>(evictTier.GetAction()).StorageName, "/Root/tier1");
UNIT_ASSERT_VALUES_EQUAL(evictTier.GetApplyAfter(), TDuration::Seconds(10));
auto deleteTier = ttl->GetTiers()[1];
UNIT_ASSERT(std::holds_alternative<TTtlDeleteAction>(deleteTier.GetAction()));
Expand Down
21 changes: 17 additions & 4 deletions ydb/core/tx/columnshard/blobs_action/tier/gc_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ void TGarbageCollectionActor::Handle(NWrappers::NExternalStorage::TEvDeleteObjec
TString errorMessage;
Y_ABORT_UNLESS(ev->Get()->Key);
AFL_VERIFY(TLogoBlobID::Parse(logoBlobId, *ev->Get()->Key, errorMessage))("error", errorMessage);
BlobIdsToRemove.erase(logoBlobId);
CheckFinished();
OnDeleteBlobFinished(logoBlobId, ev->Get()->IsSuccess());
}

void TGarbageCollectionActor::Bootstrap(const TActorContext& ctx) {
Expand All @@ -20,15 +19,21 @@ void TGarbageCollectionActor::Bootstrap(const TActorContext& ctx) {
for (auto&& i : GCTask->GetDraftBlobIds()) {
BlobIdsToRemove.emplace(i.GetLogoBlobId());
}
TBase::Bootstrap(ctx);
Become(&TGarbageCollectionActor::StateWork);
if (!GCTask->GetExternalStorageOperator()) {
for (auto&& i : BlobIdsToRemove) {
OnDeleteBlobFinished(i, false, "storage operator is uninitialized for tier: " + GCTask->GetStorageId());
}
return;
}
for (auto&& i : BlobIdsToRemove) {
auto awsRequest = Aws::S3::Model::DeleteObjectRequest().WithKey(i.ToString());
auto request = std::make_unique<NWrappers::NExternalStorage::TEvDeleteObjectRequest>(awsRequest);
auto hRequest = std::make_unique<IEventHandle>(NActors::TActorId(), TActorContext::AsActorContext().SelfID, request.release());
TAutoPtr<TEventHandle<NWrappers::NExternalStorage::TEvDeleteObjectRequest>> evPtr((TEventHandle<NWrappers::NExternalStorage::TEvDeleteObjectRequest>*)hRequest.release());
GCTask->GetExternalStorageOperator()->Execute(evPtr);
}
TBase::Bootstrap(ctx);
Become(&TGarbageCollectionActor::StateWork);
}

void TGarbageCollectionActor::CheckFinished() {
Expand All @@ -39,4 +44,12 @@ void TGarbageCollectionActor::CheckFinished() {
}
}

void TGarbageCollectionActor::OnDeleteBlobFinished(const TLogoBlobID& blobId, bool success, const TString& errorMessage) {
if (success) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_BLOBS_TIER)("actor", "TGarbageCollectionActor")("event", "delete_object_failed")(
"reason", errorMessage);
}
BlobIdsToRemove.erase(blobId);
CheckFinished();
}
}
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/blobs_action/tier/gc_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class TGarbageCollectionActor: public TSharedBlobsCollectionActor<TGarbageCollec
THashSet<TLogoBlobID> BlobIdsToRemove;
void Handle(NWrappers::NExternalStorage::TEvDeleteObjectResponse::TPtr& ev);
void CheckFinished();
void OnDeleteBlobFinished(const TLogoBlobID& blobId, bool success, const TString& errorMessage = "");

virtual void DoOnSharedRemovingFinished() override {
CheckFinished();
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/tier/read.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
#include "read.h"

#include <ydb/core/tx/columnshard/blob_cache.h>

namespace NKikimr::NOlap::NBlobOperations::NTier {

void TReadingAction::DoStartReading(THashSet<TBlobRange>&& ranges) {
if (!ExternalStorageOperator) {
for (const auto& range : ranges) {
auto response = std::make_unique<NBlobCache::TEvBlobCache::TEvReadBlobRangeResult>(range, NKikimrProto::EReplyStatus::ERROR,
"storage operator is uninitialized for tier: " + GetStorageId(), false, GetStorageId());
TActorContext::AsActorContext().Send(TActorContext::AsActorContext().SelfID, response.release());
}
return;
}

for (auto&& r : ranges) {
auto awsRequest = Aws::S3::Model::GetObjectRequest()
.WithKey(r.BlobId.GetLogoBlobId().ToString())
Expand Down
20 changes: 9 additions & 11 deletions ydb/core/tx/columnshard/blobs_action/tier/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,21 @@ void TOperator::DoStartGCAction(const std::shared_ptr<IBlobsGCAction>& action) c
}

void TOperator::InitNewExternalOperator(const NColumnShard::NTiers::TManager* tierManager) {
NKikimrSchemeOp::TS3Settings settings;
if (tierManager) {
settings = tierManager->GetS3Settings();
} else {
settings.SetEndpoint("nowhere");
if (!tierManager || !tierManager->IsReady()) {
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
CurrentS3Settings.reset();
ExternalStorageOperator = nullptr;
return;
}

NKikimrSchemeOp::TS3Settings settings = tierManager->GetS3Settings();
{
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
if (CurrentS3Settings && CurrentS3Settings->SerializeAsString() == settings.SerializeAsString()) {
return;
}
}

auto extStorageConfig = NWrappers::NExternalStorage::IExternalStorageConfig::Construct(settings);
AFL_VERIFY(extStorageConfig);
auto extStorageOperator = extStorageConfig->ConstructStorageOperator(false);
Expand Down Expand Up @@ -103,12 +106,7 @@ TOperator::TOperator(const TString& storageId, const TActorId& shardActorId, con

void TOperator::DoOnTieringModified(const std::shared_ptr<NColumnShard::ITiersManager>& tiers) {
auto* tierManager = tiers->GetManagerOptional(TBase::GetStorageId());
if (tierManager) {
InitNewExternalOperator(tierManager);
} else {
TGuard<TSpinLock> changeLock(ChangeOperatorLock);
ExternalStorageOperator = nullptr;
}
InitNewExternalOperator(tierManager);
}

bool TOperator::DoLoad(IBlobManagerDb& dbBlobs) {
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/tier/write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
namespace NKikimr::NOlap::NBlobOperations::NTier {

void TWriteAction::DoSendWriteBlobRequest(const TString& data, const TUnifiedBlobId& blobId) {
if (!ExternalStorageOperator) {
auto response = std::make_unique<TEvBlobStorage::TEvPutResult>(NKikimrProto::EReplyStatus::ERROR, blobId.GetLogoBlobId(), 0, TGroupId::FromValue(Max<ui32>()), 0, GetStorageId());
response->ErrorReason = "storage operator is uninitialized for tier: " + GetStorageId();
TActorContext::AsActorContext().Send(TActorContext::AsActorContext().SelfID, response.release());
return;
}

auto awsRequest = Aws::S3::Model::PutObjectRequest().WithKey(blobId.GetLogoBlobId().ToString());

TString moveData = data;
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
ctx.Send(selfActorId, new TEvPrivate::TEvTieringModified);
});
Tiers->Start(Tiers);
if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) {
Tiers->TakeConfigs(NYDBTest::TControllers::GetColumnShardController()->GetFallbackTiersSnapshot(), nullptr);
if (!NYDBTest::TControllers::GetColumnShardController()->GetOverrideTierConfigs().empty()) {
Send(SelfId(), new TEvPrivate::TEvTieringModified());
}
BackgroundSessionsManager = std::make_shared<NOlap::NBackground::TSessionsManager>(
std::make_shared<NBackground::TAdapter>(selfActorId, (NOlap::TTabletId)TabletID(), *this));
Expand All @@ -120,6 +120,12 @@ void TColumnShard::OnActivateExecutor(const TActorContext& ctx) {
}

void TColumnShard::Handle(TEvPrivate::TEvTieringModified::TPtr& /*ev*/, const TActorContext& /*ctx*/) {
if (const auto& tiersSnapshot = NYDBTest::TControllers::GetColumnShardController()->GetOverrideTierConfigs(); !tiersSnapshot.empty()) {
for (const auto& [id, tier] : tiersSnapshot) {
Tiers->UpdateTierConfig(tier, id, false);
}
}

OnTieringModified();
NYDBTest::TControllers::GetColumnShardController()->OnTieringModified(Tiers);
}
Expand Down
Loading

0 comments on commit 15ba1d6

Please sign in to comment.