Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce DescribeReplication #4904

Merged
merged 1 commit into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
#include <ydb/services/persqueue_v1/persqueue.h>
#include <ydb/services/persqueue_v1/topic.h>
#include <ydb/services/rate_limiter/grpc_service.h>
#include <ydb/services/replication/grpc_service.h>
#include <ydb/services/ydb/ydb_clickhouse_internal.h>
#include <ydb/services/ydb/ydb_dummy.h>
#include <ydb/services/ydb/ydb_export.h>
Expand Down Expand Up @@ -590,6 +591,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
names["query_service"] = &hasQueryService;
TServiceCfg hasKeyValue = services.empty();
names["keyvalue"] = &hasKeyValue;
TServiceCfg hasReplication = services.empty();
names["replication"] = &hasReplication;

std::unordered_set<TString> enabled;
for (const auto& name : services) {
Expand Down Expand Up @@ -871,6 +874,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
grpcRequestProxies[0]));
}

if (hasReplication) {
server.AddService(new NGRpcService::TGRpcReplicationService(ActorSystem.Get(), Counters,
grpcRequestProxies[0], hasReplication.IsRlAllowed()));
}

if (ModuleFactories) {
for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) {
server.AddService(service);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ PEERDIR(
ydb/services/deprecated/persqueue_v0
ydb/services/persqueue_v1
ydb/services/rate_limiter
ydb/services/replication
ydb/services/ydb
)

Expand Down
191 changes: 191 additions & 0 deletions ydb/core/grpc_services/rpc_replication.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
#include "rpc_scheme_base.h"
#include "service_replication.h"

#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/tx/replication/controller/public_events.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/public/api/protos/draft/ydb_replication.pb.h>

namespace NKikimr::NGRpcService {

using namespace Ydb;

using TEvDescribeReplication = TGrpcRequestOperationCall<Replication::DescribeReplicationRequest, Replication::DescribeReplicationResponse>;

class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication> {
using TBase = TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication>;

public:
using TBase::TBase;

void Bootstrap() {
DescribeScheme();
}

void PassAway() override {
if (ControllerPipeClient) {
NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeClient);
}

TBase::PassAway();
}

private:
void DescribeScheme() {
auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>();
SetAuthToken(ev, *Request_);
SetDatabase(ev.get(), *Request_);
ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path());

Send(MakeTxProxyID(), ev.release());
Become(&TDescribeReplicationRPC::StateDescribeScheme);
}

STATEFN(StateDescribeScheme) {
switch (ev->GetTypeRewrite()) {
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
default:
return TBase::StateWork(ev);
}
}

void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->GetRecord();
const auto& desc = record.GetPathDescription();

if (record.HasReason()) {
auto issue = NYql::TIssue(record.GetReason());
Request_->RaiseIssue(issue);
}

switch (record.GetStatus()) {
case NKikimrScheme::StatusSuccess:
if (desc.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeReplication) {
auto issue = NYql::TIssue("Is not a replication");
Request_->RaiseIssue(issue);
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

ConvertDirectoryEntry(desc.GetSelf(), Result.mutable_self(), true);
return DescribeReplication(desc.GetReplicationDescription().GetControllerId(),
PathIdFromPathId(desc.GetReplicationDescription().GetPathId()));

case NKikimrScheme::StatusPathDoesNotExist:
case NKikimrScheme::StatusSchemeError:
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);

case NKikimrScheme::StatusAccessDenied:
return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);

case NKikimrScheme::StatusNotAvailable:
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);

default: {
return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
}
}
}
Comment on lines +55 to +90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Выглядит так, что мы таскаем эту копипасту из функции в функцию, в данном ревью не критично, но стоит обобщить.


void DescribeReplication(ui64 tabletId, const TPathId& pathId) {
if (!ControllerPipeClient) {
NTabletPipe::TClientConfig config;
config.RetryPolicy = {
.RetryLimitCount = 3,
};
ControllerPipeClient = Register(NTabletPipe::CreateClient(SelfId(), tabletId, config));
}

auto ev = std::make_unique<NReplication::TEvController::TEvDescribeReplication>();
PathIdFromPathId(pathId, ev->Record.MutablePathId());

NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release());
Become(&TDescribeReplicationRPC::StateDescribeReplication);
}

STATEFN(StateDescribeReplication) {
switch (ev->GetTypeRewrite()) {
HFunc(NReplication::TEvController::TEvDescribeReplicationResult, Handle);
default:
return TBase::StateWork(ev);
}
}

void Handle(NReplication::TEvController::TEvDescribeReplicationResult::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;

switch (record.GetStatus()) {
case NKikimrReplication::TEvDescribeReplicationResult::SUCCESS:
break;
case NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND:
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
default:
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
}

ConvertConnectionParams(record.GetConnectionParams(), *Result.mutable_connection_params());
ConvertState(*record.MutableState(), Result);

for (const auto& target : record.GetTargets()) {
ConvertItem(target, *Result.add_items());
}

return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
}

static void ConvertConnectionParams(const NKikimrReplication::TConnectionParams& from, Ydb::Replication::ConnectionParams& to) {
to.set_endpoint(from.GetEndpoint());
to.set_database(from.GetDatabase());

switch (from.GetCredentialsCase()) {
case NKikimrReplication::TConnectionParams::kStaticCredentials:
return ConvertStaticCredentials(from.GetStaticCredentials(), *to.mutable_static_credentials());
case NKikimrReplication::TConnectionParams::kOAuthToken:
return ConvertOAuth(from.GetOAuthToken(), *to.mutable_oauth());
default:
break;
}
}

static void ConvertStaticCredentials(const NKikimrReplication::TStaticCredentials& from, Ydb::Replication::ConnectionParams::StaticCredentials& to) {
to.set_user(from.GetUser());
to.set_password_secret_name(from.GetPasswordSecretName());
}

static void ConvertOAuth(const NKikimrReplication::TOAuthToken& from, Ydb::Replication::ConnectionParams::OAuth& to) {
to.set_token_secret_name(from.GetTokenSecretName());
}

static void ConvertItem(const NKikimrReplication::TReplicationConfig::TTargetSpecific::TTarget& from, Ydb::Replication::DescribeReplicationResult::Item& to) {
to.set_source_path(from.GetSrcPath());
to.set_destination_path(from.GetDstPath());
}

static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
switch (from.GetStateCase()) {
case NKikimrReplication::TReplicationState::kStandBy:
to.mutable_running();
break;
case NKikimrReplication::TReplicationState::kError:
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());
break;
case NKikimrReplication::TReplicationState::kDone:
to.mutable_done();
break;
default:
break;
Comment on lines +177 to +178
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. default скроет ошибку компилятора при добавлении нового кейса
  2. отсутствует логирование на неизвестный кейс.

}
}

private:
Ydb::Replication::DescribeReplicationResult Result;
TActorId ControllerPipeClient;
};

void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
f.RegisterActor(new TDescribeReplicationRPC(p.release()));
}

}
12 changes: 12 additions & 0 deletions ydb/core/grpc_services/service_replication.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

#include <memory>

namespace NKikimr::NGRpcService {

class IRequestOpCtx;
class IFacilityProvider;

void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);

}
1 change: 1 addition & 0 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ SRCS(
rpc_read_rows.cpp
rpc_remove_directory.cpp
rpc_rename_tables.cpp
rpc_replication.cpp
rpc_rollback_transaction.cpp
rpc_scheme_base.cpp
rpc_stream_execute_scan_query.cpp
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ enum ETxTypes {
TXTYPE_ALTER_REPLICATION = 10 [(TxTypeOpts) = {Name: "TxAlterReplication"}];
TXTYPE_RESOLVE_SECRET_RESULT = 11 [(TxTypeOpts) = {Name: "TxResolveSecretResult"}];
TXTYPE_ALTER_DST_RESULT = 12 [(TxTypeOpts) = {Name: "TxAlterDstResult"}];
TXTYPE_DESCRIBE_REPLICATION = 13 [(TxTypeOpts) = {Name: "TxDescribeReplication"}];
}
23 changes: 23 additions & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import "ydb/core/scheme/protos/pathid.proto";
import "ydb/public/api/protos/annotations/sensitive.proto";
import "ydb/public/api/protos/ydb_issue_message.proto";

package NKikimrReplication;
option java_package = "ru.yandex.kikimr.proto";
Expand Down Expand Up @@ -70,10 +71,15 @@ message TReplicationState {
optional EFailoverMode FailoverMode = 1;
}

message TError {
repeated Ydb.Issue.IssueMessage Issues = 1;
}

oneof State {
TStandBy StandBy = 1;
TPaused Paused = 2;
TDone Done = 3;
TError Error = 4;
}
}

Expand Down Expand Up @@ -135,6 +141,23 @@ message TEvDropReplicationResult {
optional EStatus Status = 3;
}

message TEvDescribeReplication {
optional NKikimrProto.TPathID PathId = 1;
}

message TEvDescribeReplicationResult {
enum EStatus {
UNKNOWN = 0;
SUCCESS = 1;
NOT_FOUND = 2;
}

optional EStatus Status = 1;
optional TConnectionParams ConnectionParams = 2;
repeated TReplicationConfig.TTargetSpecific.TTarget Targets = 3;
optional TReplicationState State = 4;
}

message TControllerIdentity {
optional uint64 TabletId = 1;
optional uint64 Generation = 2;
Expand Down
6 changes: 6 additions & 0 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ STFUNC(TController::StateWork) {
HFunc(TEvController::TEvCreateReplication, Handle);
HFunc(TEvController::TEvAlterReplication, Handle);
HFunc(TEvController::TEvDropReplication, Handle);
HFunc(TEvController::TEvDescribeReplication, Handle);
HFunc(TEvPrivate::TEvDropReplication, Handle);
HFunc(TEvPrivate::TEvDiscoveryTargetsResult, Handle);
HFunc(TEvPrivate::TEvAssignStreamName, Handle);
Expand Down Expand Up @@ -124,6 +125,11 @@ void TController::Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorC
RunTxDropReplication(ev, ctx);
}

void TController::Handle(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
RunTxDescribeReplication(ev, ctx);
}

void TController::Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());
RunTxDiscoveryTargetsResult(ev, ctx);
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class TController
void Handle(TEvController::TEvCreateReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvController::TEvAlterReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
Expand Down Expand Up @@ -99,6 +100,7 @@ class TController
class TTxCreateReplication;
class TTxAlterReplication;
class TTxDropReplication;
class TTxDescribeReplication;
class TTxDiscoveryTargetsResult;
class TTxAssignStreamName;
class TTxCreateStreamResult;
Expand All @@ -115,6 +117,7 @@ class TController
void RunTxAlterReplication(TEvController::TEvAlterReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDropReplication(TEvController::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDropReplication(TEvPrivate::TEvDropReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDescribeReplication(TEvController::TEvDescribeReplication::TPtr& ev, const TActorContext& ctx);
void RunTxDiscoveryTargetsResult(TEvPrivate::TEvDiscoveryTargetsResult::TPtr& ev, const TActorContext& ctx);
void RunTxAssignStreamName(TEvPrivate::TEvAssignStreamName::TPtr& ev, const TActorContext& ctx);
void RunTxCreateStreamResult(TEvPrivate::TEvCreateStreamResult::TPtr& ev, const TActorContext& ctx);
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/replication/controller/public_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct TEvController {
EvAlterReplicationResult,
EvDropReplication,
EvDropReplicationResult,
EvDescribeReplication,
EvDescribeReplicationResult,

EvEnd,
};
Expand Down Expand Up @@ -46,6 +48,14 @@ struct TEvController {
: public TEventPB<TEvDropReplicationResult, NKikimrReplication::TEvDropReplicationResult, EvDropReplicationResult>
{};

struct TEvDescribeReplication
: public TEventPB<TEvDescribeReplication, NKikimrReplication::TEvDescribeReplication, EvDescribeReplication>
{};

struct TEvDescribeReplicationResult
: public TEventPB<TEvDescribeReplicationResult, NKikimrReplication::TEvDescribeReplicationResult, EvDescribeReplicationResult>
{};

}; // TEvController

}
Loading
Loading