diff --git a/ydb/core/driver_lib/run/run.cpp b/ydb/core/driver_lib/run/run.cpp index 970b06183a2b..bcea28ffdff5 100644 --- a/ydb/core/driver_lib/run/run.cpp +++ b/ydb/core/driver_lib/run/run.cpp @@ -110,6 +110,7 @@ #include #include #include +#include #include #include #include @@ -590,6 +591,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { names["query_service"] = &hasQueryService; TServiceCfg hasKeyValue = services.empty(); names["keyvalue"] = &hasKeyValue; + TServiceCfg hasReplication = services.empty(); + names["replication"] = &hasReplication; std::unordered_set enabled; for (const auto& name : services) { @@ -871,6 +874,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) { grpcRequestProxies[0])); } + if (hasReplication) { + server.AddService(new NGRpcService::TGRpcReplicationService(ActorSystem.Get(), Counters, + grpcRequestProxies[0], hasReplication.IsRlAllowed())); + } + if (ModuleFactories) { for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) { server.AddService(service); diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index 1bd7d39f8610..78e4eda1b6c8 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -162,6 +162,7 @@ PEERDIR( ydb/services/deprecated/persqueue_v0 ydb/services/persqueue_v1 ydb/services/rate_limiter + ydb/services/replication ydb/services/ydb ) diff --git a/ydb/core/grpc_services/rpc_replication.cpp b/ydb/core/grpc_services/rpc_replication.cpp new file mode 100644 index 000000000000..9be788db7725 --- /dev/null +++ b/ydb/core/grpc_services/rpc_replication.cpp @@ -0,0 +1,191 @@ +#include "rpc_scheme_base.h" +#include "service_replication.h" + +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NGRpcService { + +using namespace Ydb; + +using TEvDescribeReplication = TGrpcRequestOperationCall; + +class TDescribeReplicationRPC: public TRpcSchemeRequestActor { + using TBase = TRpcSchemeRequestActor; + +public: + using TBase::TBase; + + void Bootstrap() { + DescribeScheme(); + } + + void PassAway() override { + if (ControllerPipeClient) { + NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeClient); + } + + TBase::PassAway(); + } + +private: + void DescribeScheme() { + auto ev = std::make_unique(); + SetAuthToken(ev, *Request_); + SetDatabase(ev.get(), *Request_); + ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path()); + + Send(MakeTxProxyID(), ev.release()); + Become(&TDescribeReplicationRPC::StateDescribeScheme); + } + + STATEFN(StateDescribeScheme) { + switch (ev->GetTypeRewrite()) { + HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle); + default: + return TBase::StateWork(ev); + } + } + + void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) { + const auto& record = ev->Get()->GetRecord(); + const auto& desc = record.GetPathDescription(); + + if (record.HasReason()) { + auto issue = NYql::TIssue(record.GetReason()); + Request_->RaiseIssue(issue); + } + + switch (record.GetStatus()) { + case NKikimrScheme::StatusSuccess: + if (desc.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeReplication) { + auto issue = NYql::TIssue("Is not a replication"); + Request_->RaiseIssue(issue); + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + } + + ConvertDirectoryEntry(desc.GetSelf(), Result.mutable_self(), true); + return DescribeReplication(desc.GetReplicationDescription().GetControllerId(), + PathIdFromPathId(desc.GetReplicationDescription().GetPathId())); + + case NKikimrScheme::StatusPathDoesNotExist: + case NKikimrScheme::StatusSchemeError: + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + + case NKikimrScheme::StatusAccessDenied: + return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx); + + case NKikimrScheme::StatusNotAvailable: + return Reply(Ydb::StatusIds::UNAVAILABLE, ctx); + + default: { + return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx); + } + } + } + + void DescribeReplication(ui64 tabletId, const TPathId& pathId) { + if (!ControllerPipeClient) { + NTabletPipe::TClientConfig config; + config.RetryPolicy = { + .RetryLimitCount = 3, + }; + ControllerPipeClient = Register(NTabletPipe::CreateClient(SelfId(), tabletId, config)); + } + + auto ev = std::make_unique(); + PathIdFromPathId(pathId, ev->Record.MutablePathId()); + + NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release()); + Become(&TDescribeReplicationRPC::StateDescribeReplication); + } + + STATEFN(StateDescribeReplication) { + switch (ev->GetTypeRewrite()) { + HFunc(NReplication::TEvController::TEvDescribeReplicationResult, Handle); + default: + return TBase::StateWork(ev); + } + } + + void Handle(NReplication::TEvController::TEvDescribeReplicationResult::TPtr& ev, const TActorContext& ctx) { + auto& record = ev->Get()->Record; + + switch (record.GetStatus()) { + case NKikimrReplication::TEvDescribeReplicationResult::SUCCESS: + break; + case NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND: + return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx); + default: + return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx); + } + + ConvertConnectionParams(record.GetConnectionParams(), *Result.mutable_connection_params()); + ConvertState(*record.MutableState(), Result); + + for (const auto& target : record.GetTargets()) { + ConvertItem(target, *Result.add_items()); + } + + return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx); + } + + static void ConvertConnectionParams(const NKikimrReplication::TConnectionParams& from, Ydb::Replication::ConnectionParams& to) { + to.set_endpoint(from.GetEndpoint()); + to.set_database(from.GetDatabase()); + + switch (from.GetCredentialsCase()) { + case NKikimrReplication::TConnectionParams::kStaticCredentials: + return ConvertStaticCredentials(from.GetStaticCredentials(), *to.mutable_static_credentials()); + case NKikimrReplication::TConnectionParams::kOAuthToken: + return ConvertOAuth(from.GetOAuthToken(), *to.mutable_oauth()); + default: + break; + } + } + + static void ConvertStaticCredentials(const NKikimrReplication::TStaticCredentials& from, Ydb::Replication::ConnectionParams::StaticCredentials& to) { + to.set_user(from.GetUser()); + to.set_password_secret_name(from.GetPasswordSecretName()); + } + + static void ConvertOAuth(const NKikimrReplication::TOAuthToken& from, Ydb::Replication::ConnectionParams::OAuth& to) { + to.set_token_secret_name(from.GetTokenSecretName()); + } + + static void ConvertItem(const NKikimrReplication::TReplicationConfig::TTargetSpecific::TTarget& from, Ydb::Replication::DescribeReplicationResult::Item& to) { + to.set_source_path(from.GetSrcPath()); + to.set_destination_path(from.GetDstPath()); + } + + static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) { + switch (from.GetStateCase()) { + case NKikimrReplication::TReplicationState::kStandBy: + to.mutable_running(); + break; + case NKikimrReplication::TReplicationState::kError: + *to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues()); + break; + case NKikimrReplication::TReplicationState::kDone: + to.mutable_done(); + break; + default: + break; + } + } + +private: + Ydb::Replication::DescribeReplicationResult Result; + TActorId ControllerPipeClient; +}; + +void DoDescribeReplication(std::unique_ptr p, const IFacilityProvider& f) { + f.RegisterActor(new TDescribeReplicationRPC(p.release())); +} + +} diff --git a/ydb/core/grpc_services/service_replication.h b/ydb/core/grpc_services/service_replication.h new file mode 100644 index 000000000000..a52e050886f8 --- /dev/null +++ b/ydb/core/grpc_services/service_replication.h @@ -0,0 +1,12 @@ +#pragma once + +#include + +namespace NKikimr::NGRpcService { + +class IRequestOpCtx; +class IFacilityProvider; + +void DoDescribeReplication(std::unique_ptr p, const IFacilityProvider& f); + +} diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index 99de02b5e795..e9cd44fdd7fb 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -67,6 +67,7 @@ SRCS( rpc_read_rows.cpp rpc_remove_directory.cpp rpc_rename_tables.cpp + rpc_replication.cpp rpc_rollback_transaction.cpp rpc_scheme_base.cpp rpc_stream_execute_scan_query.cpp diff --git a/ydb/core/protos/counters_replication.proto b/ydb/core/protos/counters_replication.proto index f1fdf9963ede..2bd8e30881cf 100644 --- a/ydb/core/protos/counters_replication.proto +++ b/ydb/core/protos/counters_replication.proto @@ -36,4 +36,5 @@ enum ETxTypes { TXTYPE_ALTER_REPLICATION = 10 [(TxTypeOpts) = {Name: "TxAlterReplication"}]; TXTYPE_RESOLVE_SECRET_RESULT = 11 [(TxTypeOpts) = {Name: "TxResolveSecretResult"}]; TXTYPE_ALTER_DST_RESULT = 12 [(TxTypeOpts) = {Name: "TxAlterDstResult"}]; + TXTYPE_DESCRIBE_REPLICATION = 13 [(TxTypeOpts) = {Name: "TxDescribeReplication"}]; } diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index 17227e475f5e..f6d5d10bcbb1 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -1,5 +1,6 @@ import "ydb/core/scheme/protos/pathid.proto"; import "ydb/public/api/protos/annotations/sensitive.proto"; +import "ydb/public/api/protos/ydb_issue_message.proto"; package NKikimrReplication; option java_package = "ru.yandex.kikimr.proto"; @@ -70,10 +71,15 @@ message TReplicationState { optional EFailoverMode FailoverMode = 1; } + message TError { + repeated Ydb.Issue.IssueMessage Issues = 1; + } + oneof State { TStandBy StandBy = 1; TPaused Paused = 2; TDone Done = 3; + TError Error = 4; } } @@ -135,6 +141,23 @@ message TEvDropReplicationResult { optional EStatus Status = 3; } +message TEvDescribeReplication { + optional NKikimrProto.TPathID PathId = 1; +} + +message TEvDescribeReplicationResult { + enum EStatus { + UNKNOWN = 0; + SUCCESS = 1; + NOT_FOUND = 2; + } + + optional EStatus Status = 1; + optional TConnectionParams ConnectionParams = 2; + repeated TReplicationConfig.TTargetSpecific.TTarget Targets = 3; + optional TReplicationState State = 4; +} + message TControllerIdentity { optional uint64 TabletId = 1; optional uint64 Generation = 2; diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index 994e9b5ce20f..076c5234edcb 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -45,6 +45,7 @@ STFUNC(TController::StateWork) { HFunc(TEvController::TEvCreateReplication, Handle); HFunc(TEvController::TEvAlterReplication, Handle); HFunc(TEvController::TEvDropReplication, Handle); + HFunc(TEvController::TEvDescribeReplication, Handle); HFunc(TEvPrivate::TEvDropReplication, Handle); HFunc(TEvPrivate::TEvDiscoveryTargetsResult, Handle); HFunc(TEvPrivate::TEvAssignStreamName, Handle); @@ -124,6 +125,11 @@ void TController::Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorC RunTxDropReplication(ev, ctx); } +void TController::Handle(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx) { + CLOG_T(ctx, "Handle " << ev->Get()->ToString()); + RunTxDescribeReplication(ev, ctx); +} + void TController::Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); RunTxDiscoveryTargetsResult(ev, ctx); diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index ddc966f95735..ab7c294c26dc 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -68,6 +68,7 @@ class TController void Handle(TEvController::TEvCreateReplication::TPtr& ev, const TActorContext& ctx); void Handle(TEvController::TEvAlterReplication::TPtr& ev, const TActorContext& ctx); void Handle(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx); + void Handle(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx); void Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx); @@ -99,6 +100,7 @@ class TController class TTxCreateReplication; class TTxAlterReplication; class TTxDropReplication; + class TTxDescribeReplication; class TTxDiscoveryTargetsResult; class TTxAssignStreamName; class TTxCreateStreamResult; @@ -115,6 +117,7 @@ class TController void RunTxAlterReplication(TEvController::TEvAlterReplication::TPtr& ev, const TActorContext& ctx); void RunTxDropReplication(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx); void RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx); + void RunTxDescribeReplication(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx); void RunTxDiscoveryTargetsResult(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx); void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx); void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx); diff --git a/ydb/core/tx/replication/controller/public_events.h b/ydb/core/tx/replication/controller/public_events.h index 06cb18cb2c12..dd35a3d60a84 100644 --- a/ydb/core/tx/replication/controller/public_events.h +++ b/ydb/core/tx/replication/controller/public_events.h @@ -15,6 +15,8 @@ struct TEvController { EvAlterReplicationResult, EvDropReplication, EvDropReplicationResult, + EvDescribeReplication, + EvDescribeReplicationResult, EvEnd, }; @@ -46,6 +48,14 @@ struct TEvController { : public TEventPB {}; + struct TEvDescribeReplication + : public TEventPB + {}; + + struct TEvDescribeReplicationResult + : public TEventPB + {}; + }; // TEvController } diff --git a/ydb/core/tx/replication/controller/tx_describe_replication.cpp b/ydb/core/tx/replication/controller/tx_describe_replication.cpp new file mode 100644 index 000000000000..bff9461ad1f3 --- /dev/null +++ b/ydb/core/tx/replication/controller/tx_describe_replication.cpp @@ -0,0 +1,82 @@ +#include "controller_impl.h" + +#include + +namespace NKikimr::NReplication::NController { + +class TController::TTxDescribeReplication: public TTxBase { + TEvController::TEvDescribeReplication::TPtr Ev; + THolder Result; + +public: + explicit TTxDescribeReplication(TController* self, TEvController::TEvDescribeReplication::TPtr& ev) + : TTxBase("TxDescribeReplication", self) + , Ev(ev) + { + } + + TTxType GetTxType() const override { + return TXTYPE_DESCRIBE_REPLICATION; + } + + bool Execute(TTransactionContext&, const TActorContext& ctx) override { + CLOG_D(ctx, "Execute: " << Ev->Get()->ToString()); + + const auto& record = Ev->Get()->Record; + Result = MakeHolder(); + + const auto pathId = PathIdFromPathId(record.GetPathId()); + auto replication = Self->Find(pathId); + + if (!replication) { + Result->Record.SetStatus(NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND); + return true; + } + + Result->Record.SetStatus(NKikimrReplication::TEvDescribeReplicationResult::SUCCESS); + Result->Record.MutableConnectionParams()->CopyFrom(replication->GetConfig().GetSrcConnectionParams()); + + for (ui64 tid = 0; tid < replication->GetNextTargetId(); ++tid) { + auto* target = replication->FindTarget(tid); + if (!target) { + continue; + } + + auto& item = *Result->Record.AddTargets(); + item.SetSrcPath(target->GetSrcPath()); + item.SetDstPath(target->GetDstPath()); + } + + auto& state = *Result->Record.MutableState(); + switch (replication->GetState()) { + case TReplication::EState::Ready: + case TReplication::EState::Removing: + state.MutableStandBy(); + break; + case TReplication::EState::Error: + if (auto issue = state.MutableError()->AddIssues()) { + issue->set_severity(NYql::TSeverityIds::S_ERROR); + issue->set_message(replication->GetIssue()); + } + break; + // TODO: 'done' state + } + + return true; + } + + void Complete(const TActorContext& ctx) override { + CLOG_D(ctx, "Complete"); + + if (Result) { + ctx.Send(Ev->Sender, Result.Release(), 0, Ev->Cookie); + } + } + +}; // TTxDescribeReplication + +void TController::RunTxDescribeReplication(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx) { + Execute(new TTxDescribeReplication(this, ev), ctx); +} + +} diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make index 4f8bcffaf3de..33343863fa3c 100644 --- a/ydb/core/tx/replication/controller/ya.make +++ b/ydb/core/tx/replication/controller/ya.make @@ -39,6 +39,7 @@ SRCS( tx_create_dst_result.cpp tx_create_replication.cpp tx_create_stream_result.cpp + tx_describe_replication.cpp tx_discovery_targets_result.cpp tx_drop_dst_result.cpp tx_drop_replication.cpp diff --git a/ydb/public/api/grpc/draft/ya.make b/ydb/public/api/grpc/draft/ya.make index d6c539409114..0c38fefe52a2 100644 --- a/ydb/public/api/grpc/draft/ya.make +++ b/ydb/public/api/grpc/draft/ya.make @@ -15,6 +15,7 @@ SRCS( ydb_maintenance_v1.proto ydb_persqueue_v1.proto ydb_object_storage_v1.proto + ydb_replication_v1.proto ) PEERDIR( diff --git a/ydb/public/api/grpc/draft/ydb_replication_v1.proto b/ydb/public/api/grpc/draft/ydb_replication_v1.proto new file mode 100644 index 000000000000..cc363a0eee87 --- /dev/null +++ b/ydb/public/api/grpc/draft/ydb_replication_v1.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; + +import "ydb/public/api/protos/draft/ydb_replication.proto"; + +package Ydb.Replication.V1; +option java_package = "com.yandex.ydb.replication.v1"; + +service ReplicationService { + rpc DescribeReplication(Replication.DescribeReplicationRequest) returns (Replication.DescribeReplicationResponse); +} diff --git a/ydb/public/api/protos/draft/ydb_replication.proto b/ydb/public/api/protos/draft/ydb_replication.proto new file mode 100644 index 000000000000..e4a978f9d5b4 --- /dev/null +++ b/ydb/public/api/protos/draft/ydb_replication.proto @@ -0,0 +1,68 @@ +syntax = "proto3"; +option cc_enable_arenas = true; + +import "ydb/public/api/protos/annotations/validation.proto"; +import "ydb/public/api/protos/ydb_issue_message.proto"; +import "ydb/public/api/protos/ydb_operation.proto"; +import "ydb/public/api/protos/ydb_scheme.proto"; + +package Ydb.Replication; +option java_package = "com.yandex.ydb.replication"; + +message DescribeReplicationRequest { + Ydb.Operations.OperationParams operation_params = 1; + // Replication path. + string path = 2 [(required) = true]; +} + +message DescribeReplicationResponse { + // Result of request will be inside operation. + Ydb.Operations.Operation operation = 1; +} + +message ConnectionParams { + message StaticCredentials { + string user = 1; + string password_secret_name = 2; + } + + message OAuth { + string token_secret_name = 1; + } + + string endpoint = 1; + string database = 2; + + oneof credentials { + StaticCredentials static_credentials = 3; + OAuth oauth = 4; + } +} + +message DescribeReplicationResult { + message Item { + string source_path = 1; + string destination_path = 2; + } + + message RunningState { + } + + message ErrorState { + repeated Ydb.Issue.IssueMessage issues = 1; + } + + message DoneState { + } + + // Description of scheme object. + Ydb.Scheme.Entry self = 1; + ConnectionParams connection_params = 2; + repeated Item items = 3; + oneof state { + RunningState running = 4; + ErrorState error = 5; + DoneState done = 6; + } +} + diff --git a/ydb/public/api/protos/ya.make b/ydb/public/api/protos/ya.make index 35b19a053a0b..033b535b5bbe 100644 --- a/ydb/public/api/protos/ya.make +++ b/ydb/public/api/protos/ya.make @@ -16,6 +16,7 @@ SRCS( draft/ydb_logstore.proto draft/ydb_maintenance.proto draft/ydb_object_storage.proto + draft/ydb_replication.proto ydb_federation_discovery.proto persqueue_error_codes_v1.proto ydb_auth.proto diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index b5d028c1dcb6..7144f954cbfa 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -177,6 +177,8 @@ int TCommandDescribe::PrintPathResponse(TDriver& driver, const NScheme::TDescrib return DescribeTopic(driver); case NScheme::ESchemeEntryType::CoordinationNode: return DescribeCoordinationNode(driver); + case NScheme::ESchemeEntryType::Replication: + return DescribeReplication(driver); default: return DescribeEntryDefault(entry); } @@ -425,6 +427,86 @@ int TCommandDescribe::DescribeCoordinationNode(const TDriver& driver) { return PrintCoordinationNodeResponse(description); } +int TCommandDescribe::PrintReplicationResponsePretty(const NYdb::NReplication::TDescribeReplicationResult& result) const { + const auto& desc = result.GetReplicationDescription(); + + Cout << Endl << "State: " << desc.GetState(); + switch (desc.GetState()) { + case NReplication::TReplicationDescription::EState::Error: + Cout << Endl << "Issues: " << desc.GetErrorState().GetIssues().ToOneLineString(); + break; + default: + break; + } + + const auto& connParams = desc.GetConnectionParams(); + Cout << Endl << "Endpoint: " << connParams.GetDiscoveryEndpoint(); + Cout << Endl << "Database: " << connParams.GetDatabase(); + + switch (connParams.GetCredentials()) { + case NReplication::TConnectionParams::ECredentials::Static: + Cout << Endl << "User: " << connParams.GetStaticCredentials().User; + Cout << Endl << "PasswordSecretName: " << connParams.GetStaticCredentials().PasswordSecretName; + break; + case NReplication::TConnectionParams::ECredentials::OAuth: + Cout << Endl << "TokenSecretName: " << connParams.GetOAuthCredentials().TokenSecretName; + break; + } + + if (const auto& items = desc.GetItems()) { + Cout << Endl << "Items (source => destination):"; + for (const auto& item : items) { + Cout << Endl << " " << item.SrcPath << " => " << item.DstPath; + } + } + + Cout << Endl; + return EXIT_SUCCESS; +} + +int TCommandDescribe::PrintReplicationResponseProtoJsonBase64(const NYdb::NReplication::TDescribeReplicationResult& result) const { + TString json; + google::protobuf::util::JsonPrintOptions jsonOpts; + jsonOpts.preserve_proto_field_names = true; + auto convertStatus = google::protobuf::util::MessageToJsonString( + NYdb::TProtoAccessor::GetProto(result), + &json, + jsonOpts + ); + if (convertStatus.ok()) { + Cout << json << Endl; + } else { + Cerr << "Error occurred while converting result proto to json" << Endl; + return EXIT_FAILURE; + } + return EXIT_SUCCESS; +} + +int TCommandDescribe::PrintReplicationResponse(const NYdb::NReplication::TDescribeReplicationResult& result) const { + switch (OutputFormat) { + case EOutputFormat::Default: + case EOutputFormat::Pretty: + PrintReplicationResponsePretty(result); + break; + case EOutputFormat::Json: + Cerr << "Warning! Option --json is deprecated and will be removed soon. " + << "Use \"--format proto-json-base64\" option instead." << Endl; + [[fallthrough]]; + case EOutputFormat::ProtoJsonBase64: + return PrintReplicationResponseProtoJsonBase64(result); + default: + throw TMisuseException() << "This command doesn't support " << OutputFormat << " output format"; + } + return EXIT_SUCCESS; +} + +int TCommandDescribe::DescribeReplication(const TDriver& driver) { + NReplication::TReplicationClient client(driver); + auto result = client.DescribeReplication(Path).ExtractValueSync(); + ThrowOnError(result); + return PrintReplicationResponse(result); +} + namespace { void PrintColumns(const NTable::TTableDescription& tableDescription) { if (!tableDescription.GetTableColumns().size()) { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h index fe3a089305f3..5a31a75a54b7 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -71,8 +72,13 @@ class TCommandDescribe : public TYdbOperationCommand, public TCommandWithPath, p int PrintCoordinationNodeResponsePretty(const NYdb::NCoordination::TNodeDescription& result) const; int PrintCoordinationNodeResponseProtoJsonBase64(const NYdb::NCoordination::TNodeDescription& result) const; + int DescribeReplication(const TDriver& driver); + int PrintReplicationResponse(const NYdb::NReplication::TDescribeReplicationResult& result) const; + int PrintReplicationResponsePretty(const NYdb::NReplication::TDescribeReplicationResult& result) const; + int PrintReplicationResponseProtoJsonBase64(const NYdb::NReplication::TDescribeReplicationResult& result) const; + template - void PrintPermissionsIfNeeded(const TDescriptionType& description) { + void PrintPermissionsIfNeeded(const TDescriptionType& description) const { if (ShowPermissions) { Cout << Endl; PrintAllPermissions( diff --git a/ydb/public/lib/ydb_cli/common/print_utils.cpp b/ydb/public/lib/ydb_cli/common/print_utils.cpp index 833b6062673b..c3c548588daf 100644 --- a/ydb/public/lib/ydb_cli/common/print_utils.cpp +++ b/ydb/public/lib/ydb_cli/common/print_utils.cpp @@ -104,9 +104,10 @@ TString EntryTypeToString(NScheme::ESchemeEntryType entry) { return "external-table"; case NScheme::ESchemeEntryType::View: return "view"; + case NScheme::ESchemeEntryType::Replication: + return "replication"; case NScheme::ESchemeEntryType::Unknown: case NScheme::ESchemeEntryType::Sequence: - case NScheme::ESchemeEntryType::Replication: return "unknown"; } } diff --git a/ydb/public/sdk/cpp/client/draft/ya.make b/ydb/public/sdk/cpp/client/draft/ya.make index 7d4f206e71be..6a7803c48991 100644 --- a/ydb/public/sdk/cpp/client/draft/ya.make +++ b/ydb/public/sdk/cpp/client/draft/ya.make @@ -2,10 +2,14 @@ LIBRARY() SRCS( ydb_dynamic_config.cpp + ydb_replication.cpp ydb_scripting.cpp ) +GENERATE_ENUM_SERIALIZATION(ydb_replication.h) + PEERDIR( + ydb/library/yql/public/issue ydb/public/api/grpc/draft ydb/public/sdk/cpp/client/ydb_table ydb/public/sdk/cpp/client/ydb_types/operation diff --git a/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp b/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp new file mode 100644 index 000000000000..bf4de195a967 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp @@ -0,0 +1,204 @@ +#include "ydb_replication.h" + +#define INCLUDE_YDB_INTERNAL_H +#include +#undef INCLUDE_YDB_INTERNAL_H + +#include +#include +#include +#include +#include + +#include + +namespace NYdb { +namespace NReplication { + +TConnectionParams::TConnectionParams(const Ydb::Replication::ConnectionParams& params) { + DiscoveryEndpoint(params.endpoint()); + Database(params.database()); + + switch (params.credentials_case()) { + case Ydb::Replication::ConnectionParams::kStaticCredentials: + Credentials_ = TStaticCredentials( + params.static_credentials().user(), + params.static_credentials().password_secret_name()); + break; + + case Ydb::Replication::ConnectionParams::kOauth: + Credentials_ = TOAuthCredentials(params.oauth().token_secret_name()); + break; + + default: + break; + } +} + +const TString& TConnectionParams::GetDiscoveryEndpoint() const { + return *DiscoveryEndpoint_; +} + +const TString& TConnectionParams::GetDatabase() const { + return *Database_; +} + +TConnectionParams::ECredentials TConnectionParams::GetCredentials() const { + return static_cast(Credentials_.index()); +} + +const TStaticCredentials& TConnectionParams::GetStaticCredentials() const { + return std::get(Credentials_); +} + +const TOAuthCredentials& TConnectionParams::GetOAuthCredentials() const { + return std::get(Credentials_); +} + +class TErrorState::TImpl { +public: + NYql::TIssues Issues; + + explicit TImpl(NYql::TIssues&& issues) + : Issues(std::move(issues)) + { + } +}; + +TErrorState::TErrorState(NYql::TIssues&& issues) + : Impl_(std::make_shared(std::move(issues))) +{ +} + +const NYql::TIssues& TErrorState::GetIssues() const { + return Impl_->Issues; +} + +template +NYql::TIssues IssuesFromMessage(const ::google::protobuf::RepeatedPtrField& message) { + NYql::TIssues issues; + NYql::IssuesFromMessage(message, issues); + return issues; +} + +TReplicationDescription::TReplicationDescription(const Ydb::Replication::DescribeReplicationResult& desc) + : ConnectionParams_(desc.connection_params()) +{ + Items_.reserve(desc.items_size()); + for (const auto& item : desc.items()) { + Items_.emplace_back(item.source_path(), item.destination_path()); + } + + switch (desc.state_case()) { + case Ydb::Replication::DescribeReplicationResult::kRunning: + State_ = TRunningState(); + break; + + case Ydb::Replication::DescribeReplicationResult::kError: + State_ = TErrorState(IssuesFromMessage(desc.error().issues())); + break; + + case Ydb::Replication::DescribeReplicationResult::kDone: + State_ = TDoneState(); + break; + + default: + break; + } +} + +const TConnectionParams& TReplicationDescription::GetConnectionParams() const { + return ConnectionParams_; +} + +const TVector TReplicationDescription::GetItems() const { + return Items_; +} + +TReplicationDescription::EState TReplicationDescription::GetState() const { + return static_cast(State_.index()); +} + +const TRunningState& TReplicationDescription::GetRunningState() const { + return std::get(State_); +} + +const TErrorState& TReplicationDescription::GetErrorState() const { + return std::get(State_); +} + +const TDoneState& TReplicationDescription::GetDoneState() const { + return std::get(State_); +} + +TDescribeReplicationResult::TDescribeReplicationResult(TStatus&& status, Ydb::Replication::DescribeReplicationResult&& desc) + : NScheme::TDescribePathResult(std::move(status), desc.self()) + , ReplicationDescription_(desc) + , Proto_(std::make_unique()) +{ + *Proto_ = std::move(desc); +} + +const TReplicationDescription& TDescribeReplicationResult::GetReplicationDescription() const { + return ReplicationDescription_; +} + +const Ydb::Replication::DescribeReplicationResult& TDescribeReplicationResult::GetProto() const { + return *Proto_; +} + +class TReplicationClient::TImpl: public TClientImplCommon { +public: + TImpl(std::shared_ptr&& connections, const TCommonClientSettings& settings) + : TClientImplCommon(std::move(connections), settings) + { + } + + TAsyncDescribeReplicationResult DescribeReplication(const TString& path, const TDescribeReplicationSettings& settings) { + using namespace Ydb::Replication; + + auto request = MakeOperationRequest(settings); + request.set_path(path); + + auto promise = NThreading::NewPromise(); + + auto extractor = [promise] + (google::protobuf::Any* any, TPlainStatus status) mutable { + DescribeReplicationResult result; + if (any) { + any->UnpackTo(&result); + } + + TDescribeReplicationResult val(TStatus(std::move(status)), std::move(result)); + promise.SetValue(std::move(val)); + }; + + Connections_->RunDeferred( + std::move(request), + extractor, + &V1::ReplicationService::Stub::AsyncDescribeReplication, + DbDriverState_, + INITIAL_DEFERRED_CALL_DELAY, + TRpcRequestSettings::Make(settings)); + + return promise.GetFuture(); + } + +}; + +TReplicationClient::TReplicationClient(const TDriver& driver, const TCommonClientSettings& settings) + : Impl_(std::make_shared(CreateInternalInterface(driver), settings)) +{ +} + +TAsyncDescribeReplicationResult TReplicationClient::DescribeReplication(const TString& path, const TDescribeReplicationSettings& settings) { + return Impl_->DescribeReplication(path, settings); +} + +} // NReplication + +const Ydb::Replication::DescribeReplicationResult& TProtoAccessor::GetProto(const NReplication::TDescribeReplicationResult& result) { + return result.GetProto(); +} + +} // NYdb diff --git a/ydb/public/sdk/cpp/client/draft/ydb_replication.h b/ydb/public/sdk/cpp/client/draft/ydb_replication.h new file mode 100644 index 000000000000..794c1b15f5d8 --- /dev/null +++ b/ydb/public/sdk/cpp/client/draft/ydb_replication.h @@ -0,0 +1,133 @@ +#pragma once + +#include +#include + +#include + +namespace Ydb::Replication { + class ConnectionParams; + class DescribeReplicationResult; +} + +namespace NYdb { + class TProtoAccessor; +} + +namespace NYql { + class TIssues; +} + +namespace NYdb::NReplication { + +class TDescribeReplicationResult; +using TAsyncDescribeReplicationResult = NThreading::TFuture; +struct TDescribeReplicationSettings: public TOperationRequestSettings {}; + +struct TStaticCredentials { + TString User; + TString PasswordSecretName; +}; + +struct TOAuthCredentials { + TString TokenSecretName; +}; + +class TConnectionParams: private TCommonClientSettings { +public: + enum class ECredentials { + Static, + OAuth, + }; + + explicit TConnectionParams(const Ydb::Replication::ConnectionParams& params); + + const TString& GetDiscoveryEndpoint() const; + const TString& GetDatabase() const; + + ECredentials GetCredentials() const; + const TStaticCredentials& GetStaticCredentials() const; + const TOAuthCredentials& GetOAuthCredentials() const; + +private: + std::variant< + TStaticCredentials, + TOAuthCredentials + > Credentials_; +}; + +struct TRunningState {}; +struct TDoneState {}; + +class TErrorState { + class TImpl; + +public: + explicit TErrorState(NYql::TIssues&& issues); + + const NYql::TIssues& GetIssues() const; + +private: + std::shared_ptr Impl_; +}; + +class TReplicationDescription { +public: + struct TItem { + TString SrcPath; + TString DstPath; + }; + + enum class EState { + Running, + Error, + Done, + }; + + explicit TReplicationDescription(const Ydb::Replication::DescribeReplicationResult& desc); + + const TConnectionParams& GetConnectionParams() const; + const TVector GetItems() const; + + EState GetState() const; + const TRunningState& GetRunningState() const; + const TErrorState& GetErrorState() const; + const TDoneState& GetDoneState() const; + +private: + TConnectionParams ConnectionParams_; + TVector Items_; + std::variant< + TRunningState, + TErrorState, + TDoneState + > State_; +}; + +class TDescribeReplicationResult: public NScheme::TDescribePathResult { + friend class NYdb::TProtoAccessor; + const Ydb::Replication::DescribeReplicationResult& GetProto() const; + +public: + TDescribeReplicationResult(TStatus&& status, Ydb::Replication::DescribeReplicationResult&& desc); + const TReplicationDescription& GetReplicationDescription() const; + +private: + TReplicationDescription ReplicationDescription_; + std::unique_ptr Proto_; +}; + +class TReplicationClient { + class TImpl; + +public: + TReplicationClient(const TDriver& driver, const TCommonClientSettings& settings = TCommonClientSettings()); + + TAsyncDescribeReplicationResult DescribeReplication(const TString& path, + const TDescribeReplicationSettings& settings = TDescribeReplicationSettings()); + +private: + std::shared_ptr Impl_; +}; + +} // namespace NYdb::NReplication diff --git a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h index e2dda78f2b31..b69f26e3d2ff 100644 --- a/ydb/public/sdk/cpp/client/ydb_proto/accessor.h +++ b/ydb/public/sdk/cpp/client/ydb_proto/accessor.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include #include @@ -9,6 +10,7 @@ #include #include +#include #include #include #include @@ -45,6 +47,7 @@ class TProtoAccessor { static const Ydb::Topic::DescribeConsumerResult& GetProto(const NYdb::NTopic::TConsumerDescription& consumerDescription); static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult); static const Ydb::Coordination::DescribeNodeResult& GetProto(const NYdb::NCoordination::TNodeDescription &describeNodeResult); + static const Ydb::Replication::DescribeReplicationResult& GetProto(const NYdb::NReplication::TDescribeReplicationResult& desc); static NTable::TQueryStats FromProto(const Ydb::TableStats::QueryStats& queryStats); static NTable::TTableDescription FromProto(const Ydb::Table::CreateTableRequest& request); diff --git a/ydb/services/replication/grpc_service.cpp b/ydb/services/replication/grpc_service.cpp new file mode 100644 index 000000000000..418d5a57ca95 --- /dev/null +++ b/ydb/services/replication/grpc_service.cpp @@ -0,0 +1,37 @@ +#include "grpc_service.h" + +#include +#include +#include +#include +#include + +namespace NKikimr::NGRpcService { + +void TGRpcReplicationService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) { + Y_UNUSED(logger); + + auto getCounterBlock = CreateCounterCb(Counters_, ActorSystem_); + using namespace Ydb; + +#ifdef ADD_REQUEST +#error ADD_REQUEST macro already defined +#endif + +#define ADD_REQUEST(NAME, REQUEST, RESPONSE, CB) \ + MakeIntrusive> \ + (this, &Service_, CQ_, \ + [this](NYdbGrpc::IRequestContextBase *ctx) { \ + NGRpcService::ReportGrpcReqToMon(*ActorSystem_, ctx->GetPeer()); \ + ActorSystem_->Send(GRpcRequestProxyId_, \ + new TGrpcRequestOperationCall \ + (ctx, &CB, TRequestAuxSettings{RLSWITCH(TRateLimiterMode::Rps), nullptr})); \ + }, &Replication::V1::ReplicationService::AsyncService::Request ## NAME, \ + #NAME, logger, getCounterBlock("replication", #NAME))->Run(); + + ADD_REQUEST(DescribeReplication, DescribeReplicationRequest, DescribeReplicationResponse, DoDescribeReplication); + +#undef ADD_REQUEST +} + +} diff --git a/ydb/services/replication/grpc_service.h b/ydb/services/replication/grpc_service.h new file mode 100644 index 000000000000..e85422b2ed6b --- /dev/null +++ b/ydb/services/replication/grpc_service.h @@ -0,0 +1,18 @@ +#pragma once + +#include +#include + +#include +#include + +namespace NKikimr::NGRpcService { + +class TGRpcReplicationService: public TGrpcServiceBase { +public: + using TGrpcServiceBase::TGrpcServiceBase; +private: + void SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger); +}; + +} diff --git a/ydb/services/replication/ya.make b/ydb/services/replication/ya.make new file mode 100644 index 000000000000..4c749f7533ee --- /dev/null +++ b/ydb/services/replication/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + grpc_service.cpp +) + +PEERDIR( + ydb/core/grpc_services + ydb/library/actors/core + ydb/library/grpc/server + ydb/public/api/grpc +) + +END() diff --git a/ydb/services/ya.make b/ydb/services/ya.make index a8369269df3d..8e76965f0ccc 100644 --- a/ydb/services/ya.make +++ b/ydb/services/ya.make @@ -19,5 +19,6 @@ RECURSE( persqueue_cluster_discovery persqueue_v1 rate_limiter + replication ydb )