Skip to content

Commit

Permalink
Introduce DescribeReplication
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL committed May 27, 2024
1 parent f6fa669 commit 5c6d0c2
Show file tree
Hide file tree
Showing 27 changed files with 924 additions and 2 deletions.
8 changes: 8 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
#include <ydb/services/persqueue_v1/persqueue.h>
#include <ydb/services/persqueue_v1/topic.h>
#include <ydb/services/rate_limiter/grpc_service.h>
#include <ydb/services/replication/grpc_service.h>
#include <ydb/services/ydb/ydb_clickhouse_internal.h>
#include <ydb/services/ydb/ydb_dummy.h>
#include <ydb/services/ydb/ydb_export.h>
Expand Down Expand Up @@ -590,6 +591,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
names["query_service"] = &hasQueryService;
TServiceCfg hasKeyValue = services.empty();
names["keyvalue"] = &hasKeyValue;
TServiceCfg hasReplication = services.empty();
names["replication"] = &hasReplication;

std::unordered_set<TString> enabled;
for (const auto& name : services) {
Expand Down Expand Up @@ -871,6 +874,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
grpcRequestProxies[0]));
}

if (hasReplication) {
server.AddService(new NGRpcService::TGRpcReplicationService(ActorSystem.Get(), Counters,
grpcRequestProxies[0], hasReplication.IsRlAllowed()));
}

if (ModuleFactories) {
for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) {
server.AddService(service);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ PEERDIR(
ydb/services/deprecated/persqueue_v0
ydb/services/persqueue_v1
ydb/services/rate_limiter
ydb/services/replication
ydb/services/ydb
)

Expand Down
191 changes: 191 additions & 0 deletions ydb/core/grpc_services/rpc_replication.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#include "rpc_scheme_base.h"
#include "service_replication.h"

#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/tx/replication/controller/public_events.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/public/api/protos/draft/ydb_replication.pb.h>

namespace NKikimr::NGRpcService {

using namespace Ydb;

using TEvDescribeReplication = TGrpcRequestOperationCall<Replication::DescribeReplicationRequest, Replication::DescribeReplicationResponse>;

class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication> {
using TBase = TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication>;

public:
using TBase::TBase;

void Bootstrap() {
DescribeScheme();
}

void PassAway() override {
if (ControllerPipeClient) {
NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeClient);
}

TBase::PassAway();
}

private:
void DescribeScheme() {
auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>();
SetAuthToken(ev, *Request_);
SetDatabase(ev.get(), *Request_);
ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path());

Send(MakeTxProxyID(), ev.release());
Become(&TDescribeReplicationRPC::StateDescribeScheme);
}

STATEFN(StateDescribeScheme) {
switch (ev->GetTypeRewrite()) {
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
default:
return TBase::StateWork(ev);
}
}

void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->GetRecord();
const auto& desc = record.GetPathDescription();

if (record.HasReason()) {
auto issue = NYql::TIssue(record.GetReason());
Request_->RaiseIssue(issue);
}

switch (record.GetStatus()) {
case NKikimrScheme::StatusSuccess:
if (desc.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeReplication) {
auto issue = NYql::TIssue("Is not a replication");
Request_->RaiseIssue(issue);
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

ConvertDirectoryEntry(desc.GetSelf(), Result.mutable_self(), true);
return DescribeReplication(desc.GetReplicationDescription().GetControllerId(),
PathIdFromPathId(desc.GetReplicationDescription().GetPathId()));

case NKikimrScheme::StatusPathDoesNotExist:
case NKikimrScheme::StatusSchemeError:
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);

case NKikimrScheme::StatusAccessDenied:
return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);

case NKikimrScheme::StatusNotAvailable:
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);

default: {
return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
}
}
}

void DescribeReplication(ui64 tabletId, const TPathId& pathId) {
if (!ControllerPipeClient) {
NTabletPipe::TClientConfig config;
config.RetryPolicy = {
.RetryLimitCount = 3,
};
ControllerPipeClient = Register(NTabletPipe::CreateClient(SelfId(), tabletId, config));
}

auto ev = std::make_unique<NReplication::TEvController::TEvDescribeReplication>();
PathIdFromPathId(pathId, ev->Record.MutablePathId());

NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release());
Become(&TDescribeReplicationRPC::StateDescribeReplication);
}

STATEFN(StateDescribeReplication) {
switch (ev->GetTypeRewrite()) {
HFunc(NReplication::TEvController::TEvDescribeReplicationResult, Handle);
default:
return TBase::StateWork(ev);
}
}

void Handle(NReplication::TEvController::TEvDescribeReplicationResult::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;

switch (record.GetStatus()) {
case NKikimrReplication::TEvDescribeReplicationResult::SUCCESS:
break;
case NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND:
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
default:
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
}

ConvertConnectionParams(record.GetConnectionParams(), *Result.mutable_connection_params());
ConvertState(*record.MutableState(), Result);

for (const auto& target : record.GetTargets()) {
ConvertItem(target, *Result.add_items());
}

return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
}

static void ConvertConnectionParams(const NKikimrReplication::TConnectionParams& from, Ydb::Replication::ConnectionParams& to) {
to.set_endpoint(from.GetEndpoint());
to.set_database(from.GetDatabase());

switch (from.GetCredentialsCase()) {
case NKikimrReplication::TConnectionParams::kStaticCredentials:
return ConvertStaticCredentials(from.GetStaticCredentials(), *to.mutable_static_credentials());
case NKikimrReplication::TConnectionParams::kOAuthToken:
return ConvertOAuth(from.GetOAuthToken(), *to.mutable_oauth());
default:
break;
}
}

static void ConvertStaticCredentials(const NKikimrReplication::TStaticCredentials& from, Ydb::Replication::ConnectionParams::StaticCredentials& to) {
to.set_user(from.GetUser());
to.set_password_secret_name(from.GetPasswordSecretName());
}

static void ConvertOAuth(const NKikimrReplication::TOAuthToken& from, Ydb::Replication::ConnectionParams::OAuth& to) {
to.set_token_secret_name(from.GetTokenSecretName());
}

static void ConvertItem(const NKikimrReplication::TReplicationConfig::TTargetSpecific::TTarget& from, Ydb::Replication::DescribeReplicationResult::Item& to) {
to.set_source_path(from.GetSrcPath());
to.set_destination_path(from.GetDstPath());
}

static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
switch (from.GetStateCase()) {
case NKikimrReplication::TReplicationState::kStandBy:
to.mutable_running();
break;
case NKikimrReplication::TReplicationState::kError:
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());
break;
case NKikimrReplication::TReplicationState::kDone:
to.mutable_done();
break;
default:
break;
}
}

private:
Ydb::Replication::DescribeReplicationResult Result;
TActorId ControllerPipeClient;
};

void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
f.RegisterActor(new TDescribeReplicationRPC(p.release()));
}

}
12 changes: 12 additions & 0 deletions ydb/core/grpc_services/service_replication.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

#include <memory>

namespace NKikimr::NGRpcService {

class IRequestOpCtx;
class IFacilityProvider;

void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);

}
1 change: 1 addition & 0 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ SRCS(
rpc_read_rows.cpp
rpc_remove_directory.cpp
rpc_rename_tables.cpp
rpc_replication.cpp
rpc_rollback_transaction.cpp
rpc_scheme_base.cpp
rpc_stream_execute_scan_query.cpp
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ enum ETxTypes {
TXTYPE_ALTER_REPLICATION = 10 [(TxTypeOpts) = {Name: "TxAlterReplication"}];
TXTYPE_RESOLVE_SECRET_RESULT = 11 [(TxTypeOpts) = {Name: "TxResolveSecretResult"}];
TXTYPE_ALTER_DST_RESULT = 12 [(TxTypeOpts) = {Name: "TxAlterDstResult"}];
TXTYPE_DESCRIBE_REPLICATION = 13 [(TxTypeOpts) = {Name: "TxDescribeReplication"}];
}
23 changes: 23 additions & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import "ydb/core/scheme/protos/pathid.proto";
import "ydb/public/api/protos/annotations/sensitive.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";

package NKikimrReplication;
option java_package = "ru.yandex.kikimr.proto";
Expand Down Expand Up @@ -70,10 +71,15 @@ message TReplicationState {
optional EFailoverMode FailoverMode = 1;
}

message TError {
repeated Ydb.Issue.IssueMessage Issues = 1;
}

oneof State {
TStandBy StandBy = 1;
TPaused Paused = 2;
TDone Done = 3;
TError Error = 4;
}
}

Expand Down Expand Up @@ -135,6 +141,23 @@ message TEvDropReplicationResult {
optional EStatus Status = 3;
}

message TEvDescribeReplication {
optional NKikimrProto.TPathID PathId = 1;
}

message TEvDescribeReplicationResult {
enum EStatus {
UNKNOWN = 0;
SUCCESS = 1;
NOT_FOUND = 2;
}

optional EStatus Status = 1;
optional TConnectionParams ConnectionParams = 2;
repeated TReplicationConfig.TTargetSpecific.TTarget Targets = 3;
optional TReplicationState State = 4;
}

message TControllerIdentity {
optional uint64 TabletId = 1;
optional uint64 Generation = 2;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ STFUNC(TController::StateWork) {
HFunc(TEvController::TEvCreateReplication, Handle);
HFunc(TEvController::TEvAlterReplication, Handle);
HFunc(TEvController::TEvDropReplication, Handle);
HFunc(TEvController::TEvDescribeReplication, Handle);
HFunc(TEvPrivate::TEvDropReplication, Handle);
HFunc(TEvPrivate::TEvDiscoveryTargetsResult, Handle);
HFunc(TEvPrivate::TEvAssignStreamName, Handle);
Expand Down Expand Up @@ -124,6 +125,11 @@ void TController::Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorC
RunTxDropReplication(ev, ctx);
}

void TController::Handle(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
RunTxDescribeReplication(ev, ctx);
}

void TController::Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
RunTxDiscoveryTargetsResult(ev, ctx);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TController
void Handle(TEvController::TEvCreateReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvController::TEvAlterReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -99,6 +100,7 @@ class TController
class TTxCreateReplication;
class TTxAlterReplication;
class TTxDropReplication;
class TTxDescribeReplication;
class TTxDiscoveryTargetsResult;
class TTxAssignStreamName;
class TTxCreateStreamResult;
Expand All @@ -115,6 +117,7 @@ class TController
void RunTxAlterReplication(TEvController::TEvAlterReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDropReplication(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDescribeReplication(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDiscoveryTargetsResult(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx);
void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/replication/controller/public_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct TEvController {
EvAlterReplicationResult,
EvDropReplication,
EvDropReplicationResult,
EvDescribeReplication,
EvDescribeReplicationResult,

EvEnd,
};
Expand Down Expand Up @@ -46,6 +48,14 @@ struct TEvController {
: public TEventPB<TEvDropReplicationResult, NKikimrReplication::TEvDropReplicationResult, EvDropReplicationResult>
{};

struct TEvDescribeReplication
: public TEventPB<TEvDescribeReplication, NKikimrReplication::TEvDescribeReplication, EvDescribeReplication>
{};

struct TEvDescribeReplicationResult
: public TEventPB<TEvDescribeReplicationResult, NKikimrReplication::TEvDescribeReplicationResult, EvDescribeReplicationResult>
{};

}; // TEvController

}
Loading

0 comments on commit 5c6d0c2

Please sign in to comment.