Skip to content

Commit

Permalink
Introduce 24-1-async-replication (#5418)
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jun 10, 2024
1 parent 9525b0f commit 4002b4c
Show file tree
Hide file tree
Showing 181 changed files with 7,558 additions and 711 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ ydb/core/tx/columnshard/ut_schema TColumnShardTestSchema.RebootForgetAfterFail
ydb/core/tx/columnshard/engines/ut *
ydb/core/tx/coordinator/ut Coordinator.RestoreTenantConfiguration
ydb/core/tx/datashard/ut_change_exchange Cdc.InitialScanDebezium
ydb/core/tx/replication/ydb_proxy/ut YdbProxyTests.ReadTopic
ydb/core/tx/schemeshard/ut_restore TImportTests.ShouldSucceedOnManyTables
ydb/core/tx/schemeshard/ut_split_merge TSchemeShardSplitBySizeTest.Merge1KShards
ydb/core/tx/tx_proxy/ut_ext_tenant TExtSubDomainTest.CreateTableInsideAndAlterDomainAndTable-AlterDatabaseCreateHiveFirst*
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,11 @@ struct TKikimrEvents : TEvents {
ES_TABLE_CREATOR,
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
ES_REPLICATION_SERVICE,
ES_REPLICATION_WORKER,
ES_CHANGE_EXCHANGE,
ES_S3_FILE_QUEUE,
ES_NEBIUS_ACCESS_SERVICE,
ES_REPLICATION_SERVICE,
};
};

Expand Down
27 changes: 18 additions & 9 deletions ydb/core/change_exchange/change_sender_common_ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,37 @@ struct TEvChangeExchangePrivate {

static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_PRIVATE));

template <typename TEv, ui32 TEventType>
struct TEvWithPartitionId: public TEventLocal<TEv, TEventType> {
struct TEvReady: public TEventLocal<TEvReady, EvReady> {
ui64 PartitionId;

explicit TEvWithPartitionId(ui64 partiionId)
explicit TEvReady(ui64 partiionId)
: PartitionId(partiionId)
{
}

TString ToString() const override {
return TStringBuilder() << TEventLocal<TEv, TEventType>::ToStringHeader() << " {"
return TStringBuilder() << ToStringHeader() << " {"
<< " PartitionId: " << PartitionId
<< " }";
}
};

struct TEvReady: public TEvWithPartitionId<TEvReady, EvReady> {
using TEvWithPartitionId::TEvWithPartitionId;
};
struct TEvGone: public TEventLocal<TEvGone, EvGone> {
ui64 PartitionId;
bool HardError;

explicit TEvGone(ui64 partitionId, bool hardError = false)
: PartitionId(partitionId)
, HardError(hardError)
{
}

struct TEvGone: public TEvWithPartitionId<TEvGone, EvGone> {
using TEvWithPartitionId::TEvWithPartitionId;
TString ToString() const override {
return TStringBuilder() << ToStringHeader() << " {"
<< " PartitionId: " << PartitionId
<< " HardError: " << HardError
<< " }";
}
};

}; // TEvChangeExchangePrivate
Expand Down
16 changes: 12 additions & 4 deletions ydb/core/driver_lib/run/run.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
#include <ydb/services/persqueue_v1/persqueue.h>
#include <ydb/services/persqueue_v1/topic.h>
#include <ydb/services/rate_limiter/grpc_service.h>
#include <ydb/services/replication/grpc_service.h>
#include <ydb/services/ydb/ydb_clickhouse_internal.h>
#include <ydb/services/ydb/ydb_dummy.h>
#include <ydb/services/ydb/ydb_export.h>
Expand Down Expand Up @@ -583,6 +584,8 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
names["query_service"] = &hasQueryService;
TServiceCfg hasKeyValue = services.empty();
names["keyvalue"] = &hasKeyValue;
TServiceCfg hasReplication = services.empty();
names["replication"] = &hasReplication;

std::unordered_set<TString> enabled;
for (const auto& name : services) {
Expand Down Expand Up @@ -849,6 +852,11 @@ void TKikimrRunner::InitializeGRpc(const TKikimrRunConfig& runConfig) {
grpcRequestProxies[0]));
}

if (hasReplication) {
server.AddService(new NGRpcService::TGRpcReplicationService(ActorSystem.Get(), Counters,
grpcRequestProxies[0], hasReplication.IsRlAllowed()));
}

if (ModuleFactories) {
for (const auto& service : ModuleFactories->GrpcServiceFactory.Create(enabled, disabled, ActorSystem.Get(), Counters, grpcRequestProxies[0])) {
server.AddService(service);
Expand Down Expand Up @@ -1705,10 +1713,6 @@ void TKikimrRunner::KikimrStop(bool graceful) {
SqsHttp->Shutdown();
}

if (YdbDriver) {
YdbDriver->Stop(true);
}

if (Monitoring) {
Monitoring->Stop();
}
Expand Down Expand Up @@ -1749,6 +1753,10 @@ void TKikimrRunner::KikimrStop(bool graceful) {
ModuleFactories->DataShardExportFactory->Shutdown();
}
}

if (YdbDriver) {
YdbDriver->Stop(true);
}
}

void TKikimrRunner::BusyLoop() {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ PEERDIR(
ydb/services/deprecated/persqueue_v0
ydb/services/persqueue_v1
ydb/services/rate_limiter
ydb/services/replication
ydb/services/ydb
)

Expand Down
195 changes: 195 additions & 0 deletions ydb/core/grpc_services/rpc_replication.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#include "rpc_scheme_base.h"
#include "service_replication.h"

#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/tx/replication/controller/public_events.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/public/api/protos/draft/ydb_replication.pb.h>

namespace NKikimr::NGRpcService {

using namespace Ydb;

using TEvDescribeReplication = TGrpcRequestOperationCall<Replication::DescribeReplicationRequest, Replication::DescribeReplicationResponse>;

class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication> {
using TBase = TRpcSchemeRequestActor<TDescribeReplicationRPC, TEvDescribeReplication>;

public:
using TBase::TBase;

void Bootstrap() {
DescribeScheme();
}

void PassAway() override {
if (ControllerPipeClient) {
NTabletPipe::CloseAndForgetClient(SelfId(), ControllerPipeClient);
}

TBase::PassAway();
}

private:
void DescribeScheme() {
auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>();
SetAuthToken(ev, *Request_);
SetDatabase(ev.get(), *Request_);
ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path());

Send(MakeTxProxyID(), ev.release());
Become(&TDescribeReplicationRPC::StateDescribeScheme);
}

STATEFN(StateDescribeScheme) {
switch (ev->GetTypeRewrite()) {
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
default:
return TBase::StateWork(ev);
}
}

void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->GetRecord();
const auto& desc = record.GetPathDescription();

if (record.HasReason()) {
auto issue = NYql::TIssue(record.GetReason());
Request_->RaiseIssue(issue);
}

switch (record.GetStatus()) {
case NKikimrScheme::StatusSuccess:
if (desc.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeReplication) {
auto issue = NYql::TIssue("Is not a replication");
Request_->RaiseIssue(issue);
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

ConvertDirectoryEntry(desc.GetSelf(), Result.mutable_self(), true);
return DescribeReplication(desc.GetReplicationDescription().GetControllerId(),
PathIdFromPathId(desc.GetReplicationDescription().GetPathId()));

case NKikimrScheme::StatusPathDoesNotExist:
case NKikimrScheme::StatusSchemeError:
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);

case NKikimrScheme::StatusAccessDenied:
return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);

case NKikimrScheme::StatusNotAvailable:
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);

default: {
return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
}
}
}

void DescribeReplication(ui64 tabletId, const TPathId& pathId) {
if (!ControllerPipeClient) {
NTabletPipe::TClientConfig config;
config.RetryPolicy = {
.RetryLimitCount = 3,
};
ControllerPipeClient = Register(NTabletPipe::CreateClient(SelfId(), tabletId, config));
}

auto ev = std::make_unique<NReplication::TEvController::TEvDescribeReplication>();
PathIdFromPathId(pathId, ev->Record.MutablePathId());

NTabletPipe::SendData(SelfId(), ControllerPipeClient, ev.release());
Become(&TDescribeReplicationRPC::StateDescribeReplication);
}

STATEFN(StateDescribeReplication) {
switch (ev->GetTypeRewrite()) {
HFunc(NReplication::TEvController::TEvDescribeReplicationResult, Handle);
default:
return TBase::StateWork(ev);
}
}

void Handle(NReplication::TEvController::TEvDescribeReplicationResult::TPtr& ev, const TActorContext& ctx) {
auto& record = ev->Get()->Record;

switch (record.GetStatus()) {
case NKikimrReplication::TEvDescribeReplicationResult::SUCCESS:
break;
case NKikimrReplication::TEvDescribeReplicationResult::NOT_FOUND:
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
default:
return Reply(Ydb::StatusIds::INTERNAL_ERROR, ctx);
}

ConvertConnectionParams(record.GetConnectionParams(), *Result.mutable_connection_params());
ConvertState(*record.MutableState(), Result);

for (const auto& target : record.GetTargets()) {
ConvertItem(target, *Result.add_items());
}

return ReplyWithResult(Ydb::StatusIds::SUCCESS, Result, ctx);
}

static void ConvertConnectionParams(const NKikimrReplication::TConnectionParams& from, Ydb::Replication::ConnectionParams& to) {
to.set_endpoint(from.GetEndpoint());
to.set_database(from.GetDatabase());

switch (from.GetCredentialsCase()) {
case NKikimrReplication::TConnectionParams::kStaticCredentials:
return ConvertStaticCredentials(from.GetStaticCredentials(), *to.mutable_static_credentials());
case NKikimrReplication::TConnectionParams::kOAuthToken:
return ConvertOAuth(from.GetOAuthToken(), *to.mutable_oauth());
default:
break;
}
}

static void ConvertStaticCredentials(const NKikimrReplication::TStaticCredentials& from, Ydb::Replication::ConnectionParams::StaticCredentials& to) {
to.set_user(from.GetUser());
to.set_password_secret_name(from.GetPasswordSecretName());
}

static void ConvertOAuth(const NKikimrReplication::TOAuthToken& from, Ydb::Replication::ConnectionParams::OAuth& to) {
to.set_token_secret_name(from.GetTokenSecretName());
}

static void ConvertItem(const NKikimrReplication::TReplicationConfig::TTargetSpecific::TTarget& from, Ydb::Replication::DescribeReplicationResult::Item& to) {
to.set_id(from.GetId());
to.set_source_path(from.GetSrcPath());
to.set_destination_path(from.GetDstPath());
if (from.HasSrcStreamName()) {
to.set_source_changefeed_name(from.GetSrcStreamName());
}
}

static void ConvertState(NKikimrReplication::TReplicationState& from, Ydb::Replication::DescribeReplicationResult& to) {
switch (from.GetStateCase()) {
case NKikimrReplication::TReplicationState::kStandBy:
to.mutable_running();
break;
case NKikimrReplication::TReplicationState::kError:
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());
break;
case NKikimrReplication::TReplicationState::kDone:
to.mutable_done();
break;
default:
break;
}
}

private:
Ydb::Replication::DescribeReplicationResult Result;
TActorId ControllerPipeClient;
};

void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
f.RegisterActor(new TDescribeReplicationRPC(p.release()));
}

}
12 changes: 12 additions & 0 deletions ydb/core/grpc_services/service_replication.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#pragma once

#include <memory>

namespace NKikimr::NGRpcService {

class IRequestOpCtx;
class IFacilityProvider;

void DoDescribeReplication(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);

}
1 change: 1 addition & 0 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ SRCS(
rpc_read_rows.cpp
rpc_remove_directory.cpp
rpc_rename_tables.cpp
rpc_replication.cpp
rpc_rollback_transaction.cpp
rpc_scheme_base.cpp
rpc_stream_execute_scan_query.cpp
Expand Down
18 changes: 18 additions & 0 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,24 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
break;
}

case NKqpProto::TKqpSchemeOperation::kCreateReplication: {
const auto& modifyScheme = schemeOp.GetCreateReplication();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kAlterReplication: {
const auto& modifyScheme = schemeOp.GetAlterReplication();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kDropReplication: {
const auto& modifyScheme = schemeOp.GetDropReplication();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

default:
InternalError(TStringBuilder() << "Unexpected scheme operation: "
<< (ui32) schemeOp.GetOperationCase());
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/kqp/gateway/kqp_ic_gateway.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -994,6 +994,18 @@ class TKikimrIcGateway : public IKqpGateway {
}
}

TFuture<TGenericResult> CreateReplication(const TString&, const NYql::TCreateReplicationSettings&) override {
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> AlterReplication(const TString&, const NYql::TAlterReplicationSettings&) override {
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> DropReplication(const TString&, const NYql::TDropReplicationSettings&) override {
return NotImplemented<TGenericResult>();
}

TFuture<TGenericResult> AlterColumnTable(const TString& cluster,
const NYql::TAlterColumnTableSettings& settings) override {
Y_UNUSED(cluster);
Expand Down
Loading

0 comments on commit 4002b4c

Please sign in to comment.