Skip to content

Commit

Permalink
Safe operation convertations
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed Jan 15, 2025
1 parent 1402e4e commit df5ac68
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 81 deletions.
53 changes: 12 additions & 41 deletions ydb/core/grpc_services/rpc_export_base.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include "rpc_operation_conv_base.h"

#include <ydb/core/protos/export.pb.h>
#include <ydb/public/api/protos/ydb_operation.pb.h>
#include <ydb/public/api/protos/ydb_export.pb.h>
#include <ydb/public/lib/operation_id/operation_id.h>

Expand All @@ -10,7 +11,7 @@
namespace NKikimr {
namespace NGRpcService {

struct TExportConv {
struct TExportConv: public TOperationConv<NKikimrExport::TExport> {
static Ydb::TOperationId MakeOperationId(const ui64 id, NKikimrExport::TExport::SettingsCase kind) {
Ydb::TOperationId operationId;
operationId.SetKind(Ydb::TOperationId::EXPORT);
Expand All @@ -31,38 +32,24 @@ struct TExportConv {
return operationId;
}

static Ydb::Operations::Operation ToOperation(const NKikimrExport::TExport& exprt) {
Ydb::Operations::Operation operation;
static Operation ToOperation(const NKikimrExport::TExport& in) {
auto operation = TOperationConv::ToOperation(in);

operation.set_id(NOperationId::ProtoToString(MakeOperationId(exprt.GetId(), exprt.GetSettingsCase())));
operation.set_status(exprt.GetStatus());
if (operation.status() == Ydb::StatusIds::SUCCESS) {
operation.set_ready(exprt.GetProgress() == Ydb::Export::ExportProgress::PROGRESS_DONE);
} else {
operation.set_ready(true);
}
if (exprt.IssuesSize()) {
operation.mutable_issues()->CopyFrom(exprt.GetIssues());
operation.set_ready(in.GetProgress() == Ydb::Export::ExportProgress::PROGRESS_DONE);
} else if (operation.status() != Ydb::StatusIds::CANCELLED) {
return operation;
}

if (exprt.HasStartTime()) {
*operation.mutable_create_time() = exprt.GetStartTime();
}
if (exprt.HasEndTime()) {
*operation.mutable_end_time() = exprt.GetEndTime();
}

if (exprt.HasUserSID()) {
operation.set_created_by(exprt.GetUserSID());
}
operation.set_id(NOperationId::ProtoToString(MakeOperationId(in.GetId(), in.GetSettingsCase())));

using namespace Ydb::Export;
switch (exprt.GetSettingsCase()) {
switch (in.GetSettingsCase()) {
case NKikimrExport::TExport::kExportToYtSettings:
Fill<ExportToYtMetadata, ExportToYtResult>(operation, exprt, exprt.GetExportToYtSettings());
Fill<ExportToYtMetadata, ExportToYtResult>(operation, in, in.GetExportToYtSettings());
break;
case NKikimrExport::TExport::kExportToS3Settings:
Fill<ExportToS3Metadata, ExportToS3Result>(operation, exprt, exprt.GetExportToS3Settings());
Fill<ExportToS3Metadata, ExportToS3Result>(operation, in, in.GetExportToS3Settings());
break;
default:
Y_DEBUG_ABORT("Unknown export kind");
Expand All @@ -72,22 +59,6 @@ struct TExportConv {
return operation;
}

private:
template <typename TMetadata, typename TResult, typename TSettings>
static void Fill(
Ydb::Operations::Operation& operation,
const NKikimrExport::TExport& exprt,
const TSettings& settings) {
TMetadata metadata;
metadata.mutable_settings()->CopyFrom(settings);
metadata.set_progress(exprt.GetProgress());
metadata.mutable_items_progress()->CopyFrom(exprt.GetItemsProgress());
operation.mutable_metadata()->PackFrom(metadata);

TResult result;
operation.mutable_result()->PackFrom(result);
}

}; // TExportConv

} // namespace NGRpcService
Expand Down
51 changes: 11 additions & 40 deletions ydb/core/grpc_services/rpc_import_base.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include "rpc_operation_conv_base.h"

#include <ydb/core/protos/import.pb.h>
#include <ydb/public/api/protos/ydb_operation.pb.h>
#include <ydb/public/api/protos/ydb_import.pb.h>
#include <ydb/public/lib/operation_id/operation_id.h>

Expand All @@ -10,7 +11,7 @@
namespace NKikimr {
namespace NGRpcService {

struct TImportConv {
struct TImportConv: public TOperationConv<NKikimrImport::TImport> {
static Ydb::TOperationId MakeOperationId(const ui64 id, NKikimrImport::TImport::SettingsCase kind) {
Ydb::TOperationId operationId;
operationId.SetKind(Ydb::TOperationId::IMPORT);
Expand All @@ -28,35 +29,21 @@ struct TImportConv {
return operationId;
}

static Ydb::Operations::Operation ToOperation(const NKikimrImport::TImport& import) {
Ydb::Operations::Operation operation;
static Operation ToOperation(const NKikimrImport::TImport& in) {
auto operation = TOperationConv::ToOperation(in);

operation.set_id(NOperationId::ProtoToString(MakeOperationId(import.GetId(), import.GetSettingsCase())));
operation.set_status(import.GetStatus());
if (operation.status() == Ydb::StatusIds::SUCCESS) {
operation.set_ready(import.GetProgress() == Ydb::Import::ImportProgress::PROGRESS_DONE);
} else {
operation.set_ready(true);
}
if (import.IssuesSize()) {
operation.mutable_issues()->CopyFrom(import.GetIssues());
operation.set_ready(in.GetProgress() == Ydb::Import::ImportProgress::PROGRESS_DONE);
} else if (operation.status() != Ydb::StatusIds::CANCELLED) {
return operation;
}

if (import.HasStartTime()) {
*operation.mutable_create_time() = import.GetStartTime();
}
if (import.HasEndTime()) {
*operation.mutable_end_time() = import.GetEndTime();
}

if (import.HasUserSID()) {
operation.set_created_by(import.GetUserSID());
}
operation.set_id(NOperationId::ProtoToString(MakeOperationId(in.GetId(), in.GetSettingsCase())));

using namespace Ydb::Import;
switch (import.GetSettingsCase()) {
switch (in.GetSettingsCase()) {
case NKikimrImport::TImport::kImportFromS3Settings:
Fill<ImportFromS3Metadata, ImportFromS3Result>(operation, import, import.GetImportFromS3Settings());
Fill<ImportFromS3Metadata, ImportFromS3Result>(operation, in, in.GetImportFromS3Settings());
break;
default:
Y_DEBUG_ABORT("Unknown import kind");
Expand All @@ -66,22 +53,6 @@ struct TImportConv {
return operation;
}

private:
template <typename TMetadata, typename TResult, typename TSettings>
static void Fill(
Ydb::Operations::Operation& operation,
const NKikimrImport::TImport& import,
const TSettings& settings) {
TMetadata metadata;
metadata.mutable_settings()->CopyFrom(settings);
metadata.set_progress(import.GetProgress());
metadata.mutable_items_progress()->CopyFrom(import.GetItemsProgress());
operation.mutable_metadata()->PackFrom(metadata);

TResult result;
operation.mutable_result()->PackFrom(result);
}

}; // TImportConv

} // namespace NGRpcService
Expand Down
53 changes: 53 additions & 0 deletions ydb/core/grpc_services/rpc_operation_conv_base.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#pragma once

#include <ydb/public/api/protos/ydb_operation.pb.h>

namespace NKikimr {
namespace NGRpcService {

template <typename TOperation>
struct TOperationConv {
using Operation = Ydb::Operations::Operation;

static Operation ToOperation(const TOperation& in) {
Operation operation;

operation.set_ready(true);
operation.set_status(in.GetStatus());

if (in.IssuesSize()) {
operation.mutable_issues()->CopyFrom(in.GetIssues());
}

if (in.HasStartTime()) {
*operation.mutable_create_time() = in.GetStartTime();
}

if (in.HasEndTime()) {
*operation.mutable_end_time() = in.GetEndTime();
}

if (in.HasUserSID()) {
operation.set_created_by(in.GetUserSID());
}

return operation;
}

protected:
template <typename TMetadata, typename TResult, typename TSettings>
static void Fill(Operation& out, const TOperation& in, const TSettings& settings) {
TMetadata metadata;
metadata.mutable_settings()->CopyFrom(settings);
metadata.set_progress(in.GetProgress());
metadata.mutable_items_progress()->CopyFrom(in.GetItemsProgress());
out.mutable_metadata()->PackFrom(metadata);

TResult result;
out.mutable_result()->PackFrom(result);
}

}; // TOperationConv

} // namespace NGRpcService
} // namespace NKikimr
32 changes: 32 additions & 0 deletions ydb/services/ydb/ydb_import_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,36 @@ Y_UNIT_TEST_SUITE(YdbImport) {
}
}

Y_UNIT_TEST(ImportFromS3ToExistingTable) {
TKikimrWithGrpcAndRootSchema server;
auto driver = TDriver(TDriverConfig().SetEndpoint(TStringBuilder()
<< "localhost:" << server.GetPort()));

{
NYdb::NTable::TTableClient client(driver);
auto session = client.GetSession().ExtractValueSync().GetSession();

auto builder = NYdb::NTable::TTableBuilder()
.AddNullableColumn("Key", EPrimitiveType::Uint64)
.AddNullableColumn("Value", EPrimitiveType::String)
.SetPrimaryKeyColumn("Key");

auto result = session.CreateTable("/Root/Table", builder.Build()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}

{
NYdb::NImport::TImportClient client(driver);
auto settings = NYdb::NImport::TImportFromS3Settings()
.AppendItem({.Src = "Fake/S3/Prefix", .Dst = "/Root/Table"})
.Endpoint("s3.fake.endpoint.net")
.Bucket("fake_bucket")
.AccessKey("fake_access_key")
.SecretKey("fake_secret_key");

auto result = client.ImportFromS3(settings).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.Status().GetStatus(), EStatus::BAD_REQUEST, result.Status().GetIssues().ToString());
}
}

}

0 comments on commit df5ac68

Please sign in to comment.