diff --git a/ydb/core/grpc_services/rpc_export_base.h b/ydb/core/grpc_services/rpc_export_base.h index 0950aa815270..d22086d4b0d7 100644 --- a/ydb/core/grpc_services/rpc_export_base.h +++ b/ydb/core/grpc_services/rpc_export_base.h @@ -1,7 +1,8 @@ #pragma once +#include "rpc_operation_conv_base.h" + #include -#include #include #include @@ -10,7 +11,7 @@ namespace NKikimr { namespace NGRpcService { -struct TExportConv { +struct TExportConv: public TOperationConv { static Ydb::TOperationId MakeOperationId(const ui64 id, NKikimrExport::TExport::SettingsCase kind) { Ydb::TOperationId operationId; operationId.SetKind(Ydb::TOperationId::EXPORT); @@ -31,38 +32,24 @@ struct TExportConv { return operationId; } - static Ydb::Operations::Operation ToOperation(const NKikimrExport::TExport& exprt) { - Ydb::Operations::Operation operation; + static Operation ToOperation(const NKikimrExport::TExport& in) { + auto operation = TOperationConv::ToOperation(in); - operation.set_id(NOperationId::ProtoToString(MakeOperationId(exprt.GetId(), exprt.GetSettingsCase()))); - operation.set_status(exprt.GetStatus()); if (operation.status() == Ydb::StatusIds::SUCCESS) { - operation.set_ready(exprt.GetProgress() == Ydb::Export::ExportProgress::PROGRESS_DONE); - } else { - operation.set_ready(true); - } - if (exprt.IssuesSize()) { - operation.mutable_issues()->CopyFrom(exprt.GetIssues()); + operation.set_ready(in.GetProgress() == Ydb::Export::ExportProgress::PROGRESS_DONE); + } else if (operation.status() != Ydb::StatusIds::CANCELLED) { + return operation; } - if (exprt.HasStartTime()) { - *operation.mutable_create_time() = exprt.GetStartTime(); - } - if (exprt.HasEndTime()) { - *operation.mutable_end_time() = exprt.GetEndTime(); - } - - if (exprt.HasUserSID()) { - operation.set_created_by(exprt.GetUserSID()); - } + operation.set_id(NOperationId::ProtoToString(MakeOperationId(in.GetId(), in.GetSettingsCase()))); using namespace Ydb::Export; - switch (exprt.GetSettingsCase()) { + switch (in.GetSettingsCase()) { case NKikimrExport::TExport::kExportToYtSettings: - Fill(operation, exprt, exprt.GetExportToYtSettings()); + Fill(operation, in, in.GetExportToYtSettings()); break; case NKikimrExport::TExport::kExportToS3Settings: - Fill(operation, exprt, exprt.GetExportToS3Settings()); + Fill(operation, in, in.GetExportToS3Settings()); break; default: Y_DEBUG_ABORT("Unknown export kind"); @@ -72,22 +59,6 @@ struct TExportConv { return operation; } -private: - template - static void Fill( - Ydb::Operations::Operation& operation, - const NKikimrExport::TExport& exprt, - const TSettings& settings) { - TMetadata metadata; - metadata.mutable_settings()->CopyFrom(settings); - metadata.set_progress(exprt.GetProgress()); - metadata.mutable_items_progress()->CopyFrom(exprt.GetItemsProgress()); - operation.mutable_metadata()->PackFrom(metadata); - - TResult result; - operation.mutable_result()->PackFrom(result); - } - }; // TExportConv } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_import_base.h b/ydb/core/grpc_services/rpc_import_base.h index be700af4f026..e18253907346 100644 --- a/ydb/core/grpc_services/rpc_import_base.h +++ b/ydb/core/grpc_services/rpc_import_base.h @@ -1,7 +1,8 @@ #pragma once +#include "rpc_operation_conv_base.h" + #include -#include #include #include @@ -10,7 +11,7 @@ namespace NKikimr { namespace NGRpcService { -struct TImportConv { +struct TImportConv: public TOperationConv { static Ydb::TOperationId MakeOperationId(const ui64 id, NKikimrImport::TImport::SettingsCase kind) { Ydb::TOperationId operationId; operationId.SetKind(Ydb::TOperationId::IMPORT); @@ -28,35 +29,21 @@ struct TImportConv { return operationId; } - static Ydb::Operations::Operation ToOperation(const NKikimrImport::TImport& import) { - Ydb::Operations::Operation operation; + static Operation ToOperation(const NKikimrImport::TImport& in) { + auto operation = TOperationConv::ToOperation(in); - operation.set_id(NOperationId::ProtoToString(MakeOperationId(import.GetId(), import.GetSettingsCase()))); - operation.set_status(import.GetStatus()); if (operation.status() == Ydb::StatusIds::SUCCESS) { - operation.set_ready(import.GetProgress() == Ydb::Import::ImportProgress::PROGRESS_DONE); - } else { - operation.set_ready(true); - } - if (import.IssuesSize()) { - operation.mutable_issues()->CopyFrom(import.GetIssues()); + operation.set_ready(in.GetProgress() == Ydb::Import::ImportProgress::PROGRESS_DONE); + } else if (operation.status() != Ydb::StatusIds::CANCELLED) { + return operation; } - if (import.HasStartTime()) { - *operation.mutable_create_time() = import.GetStartTime(); - } - if (import.HasEndTime()) { - *operation.mutable_end_time() = import.GetEndTime(); - } - - if (import.HasUserSID()) { - operation.set_created_by(import.GetUserSID()); - } + operation.set_id(NOperationId::ProtoToString(MakeOperationId(in.GetId(), in.GetSettingsCase()))); using namespace Ydb::Import; - switch (import.GetSettingsCase()) { + switch (in.GetSettingsCase()) { case NKikimrImport::TImport::kImportFromS3Settings: - Fill(operation, import, import.GetImportFromS3Settings()); + Fill(operation, in, in.GetImportFromS3Settings()); break; default: Y_DEBUG_ABORT("Unknown import kind"); @@ -66,22 +53,6 @@ struct TImportConv { return operation; } -private: - template - static void Fill( - Ydb::Operations::Operation& operation, - const NKikimrImport::TImport& import, - const TSettings& settings) { - TMetadata metadata; - metadata.mutable_settings()->CopyFrom(settings); - metadata.set_progress(import.GetProgress()); - metadata.mutable_items_progress()->CopyFrom(import.GetItemsProgress()); - operation.mutable_metadata()->PackFrom(metadata); - - TResult result; - operation.mutable_result()->PackFrom(result); - } - }; // TImportConv } // namespace NGRpcService diff --git a/ydb/core/grpc_services/rpc_operation_conv_base.h b/ydb/core/grpc_services/rpc_operation_conv_base.h new file mode 100644 index 000000000000..8451c7c0d02d --- /dev/null +++ b/ydb/core/grpc_services/rpc_operation_conv_base.h @@ -0,0 +1,53 @@ +#pragma once + +#include + +namespace NKikimr { +namespace NGRpcService { + +template +struct TOperationConv { + using Operation = Ydb::Operations::Operation; + + static Operation ToOperation(const TOperation& in) { + Operation operation; + + operation.set_ready(true); + operation.set_status(in.GetStatus()); + + if (in.IssuesSize()) { + operation.mutable_issues()->CopyFrom(in.GetIssues()); + } + + if (in.HasStartTime()) { + *operation.mutable_create_time() = in.GetStartTime(); + } + + if (in.HasEndTime()) { + *operation.mutable_end_time() = in.GetEndTime(); + } + + if (in.HasUserSID()) { + operation.set_created_by(in.GetUserSID()); + } + + return operation; + } + +protected: + template + static void Fill(Operation& out, const TOperation& in, const TSettings& settings) { + TMetadata metadata; + metadata.mutable_settings()->CopyFrom(settings); + metadata.set_progress(in.GetProgress()); + metadata.mutable_items_progress()->CopyFrom(in.GetItemsProgress()); + out.mutable_metadata()->PackFrom(metadata); + + TResult result; + out.mutable_result()->PackFrom(result); + } + +}; // TOperationConv + +} // namespace NGRpcService +} // namespace NKikimr diff --git a/ydb/services/ydb/ydb_import_ut.cpp b/ydb/services/ydb/ydb_import_ut.cpp index f3315a516e46..0009d21f2aea 100644 --- a/ydb/services/ydb/ydb_import_ut.cpp +++ b/ydb/services/ydb/ydb_import_ut.cpp @@ -127,4 +127,36 @@ Y_UNIT_TEST_SUITE(YdbImport) { } } + Y_UNIT_TEST(ImportFromS3ToExistingTable) { + TKikimrWithGrpcAndRootSchema server; + auto driver = TDriver(TDriverConfig().SetEndpoint(TStringBuilder() + << "localhost:" << server.GetPort())); + + { + NYdb::NTable::TTableClient client(driver); + auto session = client.GetSession().ExtractValueSync().GetSession(); + + auto builder = NYdb::NTable::TTableBuilder() + .AddNullableColumn("Key", EPrimitiveType::Uint64) + .AddNullableColumn("Value", EPrimitiveType::String) + .SetPrimaryKeyColumn("Key"); + + auto result = session.CreateTable("/Root/Table", builder.Build()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + NYdb::NImport::TImportClient client(driver); + auto settings = NYdb::NImport::TImportFromS3Settings() + .AppendItem({.Src = "Fake/S3/Prefix", .Dst = "/Root/Table"}) + .Endpoint("s3.fake.endpoint.net") + .Bucket("fake_bucket") + .AccessKey("fake_access_key") + .SecretKey("fake_secret_key"); + + auto result = client.ImportFromS3(settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.Status().GetStatus(), EStatus::BAD_REQUEST, result.Status().GetIssues().ToString()); + } + } + }