From 6239e78f26082f24c24d88a41a1acd22bf32d192 Mon Sep 17 00:00:00 2001 From: Ilnaz Nizametdinov Date: Fri, 14 Jun 2024 18:15:51 +0300 Subject: [PATCH] Report & show replication lag --- ydb/core/grpc_services/rpc_replication.cpp | 6 ++ ydb/core/protos/replication.proto | 3 + .../tx/replication/controller/controller.cpp | 16 ++++++ .../replication/controller/controller_impl.h | 1 + .../replication/controller/lag_provider.cpp | 46 +++++++++++++++ .../tx/replication/controller/lag_provider.h | 25 ++++++++ .../tx/replication/controller/replication.cpp | 38 +++++++++++-- .../tx/replication/controller/replication.h | 6 +- .../tx/replication/controller/target_base.cpp | 18 +++++- .../tx/replication/controller/target_base.h | 12 +++- .../controller/tx_describe_replication.cpp | 3 + ydb/core/tx/replication/controller/ya.make | 1 + ydb/core/tx/replication/service/service.cpp | 57 +++++++++++++------ ydb/core/tx/replication/service/service.h | 7 +++ .../tx/replication/service/topic_reader.cpp | 2 +- ydb/core/tx/replication/service/worker.cpp | 52 ++++++++++++++++- ydb/core/tx/replication/service/worker.h | 14 ++++- ydb/core/tx/replication/ydb_proxy/ydb_proxy.h | 3 + .../api/protos/draft/ydb_replication.proto | 3 + .../ydb_cli/commands/ydb_service_scheme.cpp | 5 ++ .../sdk/cpp/client/draft/ydb_replication.cpp | 17 +++++- .../sdk/cpp/client/draft/ydb_replication.h | 13 ++++- 22 files changed, 312 insertions(+), 36 deletions(-) create mode 100644 ydb/core/tx/replication/controller/lag_provider.cpp create mode 100644 ydb/core/tx/replication/controller/lag_provider.h diff --git a/ydb/core/grpc_services/rpc_replication.cpp b/ydb/core/grpc_services/rpc_replication.cpp index f2955f3d6b56..72021a4762e6 100644 --- a/ydb/core/grpc_services/rpc_replication.cpp +++ b/ydb/core/grpc_services/rpc_replication.cpp @@ -9,6 +9,8 @@ #include #include +#include + namespace NKikimr::NGRpcService { using namespace Ydb; @@ -171,6 +173,10 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActormutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration( + from.GetStandBy().GetLagMilliSeconds()); + } break; case NKikimrReplication::TReplicationState::kError: *to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues()); diff --git a/ydb/core/protos/replication.proto b/ydb/core/protos/replication.proto index 3d2ed8cbea8b..6aa742156189 100644 --- a/ydb/core/protos/replication.proto +++ b/ydb/core/protos/replication.proto @@ -58,6 +58,7 @@ message TReplicationConfig { message TReplicationState { message TStandBy { + optional uint32 LagMilliSeconds = 1; } message TPaused { @@ -223,10 +224,12 @@ message TEvWorkerStatus { REASON_UNSPECIFIED = 0; REASON_ACK = 1; REASON_ERROR = 2; + REASON_INFO = 3; } optional TWorkerIdentity Worker = 1; optional EStatus Status = 2; optional EReason Reason = 3; optional string ErrorDescription = 4; + optional uint32 LagMilliSeconds = 5; } diff --git a/ydb/core/tx/replication/controller/controller.cpp b/ydb/core/tx/replication/controller/controller.cpp index 19da0260f75f..e49503a3fe85 100644 --- a/ydb/core/tx/replication/controller/controller.cpp +++ b/ydb/core/tx/replication/controller/controller.cpp @@ -341,6 +341,8 @@ void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorCont case NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING: if (!session.HasWorker(id)) { StopQueue.emplace(id, nodeId); + } else if (record.GetReason() == NKikimrReplication::TEvWorkerStatus::REASON_INFO) { + UpdateLag(id, TDuration::MilliSeconds(record.GetLagMilliSeconds())); } break; case NKikimrReplication::TEvWorkerStatus::STATUS_STOPPED: @@ -368,6 +370,20 @@ void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorCont ScheduleProcessQueues(); } +void TController::UpdateLag(const TWorkerId& id, TDuration lag) { + auto replication = Find(id.ReplicationId()); + if (!replication) { + return; + } + + auto* target = replication->FindTarget(id.TargetId()); + if (!target) { + return; + } + + target->UpdateLag(id.WorkerId(), lag); +} + void TController::Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx) { CLOG_T(ctx, "Handle " << ev->Get()->ToString()); diff --git a/ydb/core/tx/replication/controller/controller_impl.h b/ydb/core/tx/replication/controller/controller_impl.h index 9f21ca8ec984..8663690a246d 100644 --- a/ydb/core/tx/replication/controller/controller_impl.h +++ b/ydb/core/tx/replication/controller/controller_impl.h @@ -101,6 +101,7 @@ class TController void StopWorker(ui32 nodeId, const TWorkerId& id); void RemoveWorker(const TWorkerId& id, const TActorContext& ctx); bool MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx); + void UpdateLag(const TWorkerId& id, TDuration lag); // local transactions class TTxInitSchema; diff --git a/ydb/core/tx/replication/controller/lag_provider.cpp b/ydb/core/tx/replication/controller/lag_provider.cpp new file mode 100644 index 000000000000..abbaa0a473c6 --- /dev/null +++ b/ydb/core/tx/replication/controller/lag_provider.cpp @@ -0,0 +1,46 @@ +#include "lag_provider.h" + +namespace NKikimr::NReplication::NController { + +void TLagProvider::AddPendingLag(ui64 childId) { + Pending.insert(childId); +} + +bool TLagProvider::UpdateLag(TItemWithLag& child, ui64 childId, TDuration lag) { + bool updated = false; + + if (const auto& prevLag = child.Lag) { + auto it = ChildrenByLag.find(*prevLag); + Y_ABORT_UNLESS(it != ChildrenByLag.end()); + + it->second.erase(childId); + if (it->second.empty()) { + updated = true; + ChildrenByLag.erase(it); + } + } + + child.Lag = lag; + ChildrenByLag[lag].insert(childId); + + const bool pending = !Pending.empty(); + Pending.erase(childId); + + if (Pending) { + return false; + } else if (pending) { + return true; + } + + return updated; +} + +const TMaybe TLagProvider::GetLag() const { + if (ChildrenByLag.empty() || !Pending.empty()) { + return Nothing(); + } + + return ChildrenByLag.rbegin()->first; +} + +} diff --git a/ydb/core/tx/replication/controller/lag_provider.h b/ydb/core/tx/replication/controller/lag_provider.h new file mode 100644 index 000000000000..790f6da1732e --- /dev/null +++ b/ydb/core/tx/replication/controller/lag_provider.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include +#include +#include + +namespace NKikimr::NReplication::NController { + +struct TItemWithLag { + TMaybe Lag; +}; + +class TLagProvider { +public: + void AddPendingLag(ui64 childId); + bool UpdateLag(TItemWithLag& child, ui64 childId, TDuration lag); + const TMaybe GetLag() const; + +private: + TMap> ChildrenByLag; + THashSet Pending; +}; + +} diff --git a/ydb/core/tx/replication/controller/replication.cpp b/ydb/core/tx/replication/controller/replication.cpp index f73de7a9f407..983f89fa4eb2 100644 --- a/ydb/core/tx/replication/controller/replication.cpp +++ b/ydb/core/tx/replication/controller/replication.cpp @@ -1,3 +1,4 @@ +#include "lag_provider.h" #include "private_events.h" #include "replication.h" #include "secret_resolver.h" @@ -17,9 +18,18 @@ namespace NKikimr::NReplication::NController { -class TReplication::TImpl { +class TReplication::TImpl: public TLagProvider { friend class TReplication; + struct TTarget: public TItemWithLag { + THolder Ptr; + + explicit TTarget(ITarget* iface) + : Ptr(iface) + { + } + }; + void ResolveSecret(const TString& secretName, const TActorContext& ctx) { if (SecretResolver) { return; @@ -62,7 +72,7 @@ class TReplication::TImpl { void ProgressTargets(const TActorContext& ctx) { for (auto& [_, target] : Targets) { - target->Progress(ctx); + target.Ptr->Progress(ctx); } } @@ -79,6 +89,7 @@ class TReplication::TImpl { ui64 AddTarget(TReplication* self, ui64 id, ETargetKind kind, Args&&... args) { const auto res = Targets.emplace(id, CreateTarget(self, id, kind, std::forward(args)...)); Y_VERIFY_S(res.second, "Duplicate target: " << id); + TLagProvider::AddPendingLag(id); return id; } @@ -90,7 +101,7 @@ class TReplication::TImpl { ITarget* FindTarget(ui64 id) { auto it = Targets.find(id); return it != Targets.end() - ? it->second.Get() + ? it->second.Ptr.Get() : nullptr; } @@ -151,7 +162,7 @@ class TReplication::TImpl { void Shutdown(const TActorContext& ctx) { for (auto& [_, target] : Targets) { - target->Shutdown(ctx); + target.Ptr->Shutdown(ctx); } for (auto* x : TVector{&SecretResolver, &TargetDiscoverer, &TenantResolver, &YdbProxy}) { @@ -174,6 +185,15 @@ class TReplication::TImpl { SetState(EState::Error, issue); } + void UpdateLag(ui64 targetId, TDuration lag) { + auto it = Targets.find(targetId); + if (it == Targets.end()) { + return; + } + + TLagProvider::UpdateLag(it->second, targetId, lag); + } + private: const ui64 ReplicationId; const TPathId PathId; @@ -183,7 +203,7 @@ class TReplication::TImpl { EState State = EState::Ready; TString Issue; ui64 NextTargetId = 1; - THashMap> Targets; + THashMap Targets; THashSet PendingAlterTargets; TActorId SecretResolver; TActorId YdbProxy; @@ -329,6 +349,14 @@ bool TReplication::CheckAlterDone() const { return Impl->State == EState::Ready && Impl->PendingAlterTargets.empty(); } +void TReplication::UpdateLag(ui64 targetId, TDuration lag) { + Impl->UpdateLag(targetId, lag); +} + +const TMaybe TReplication::GetLag() const { + return Impl->GetLag(); +} + } Y_DECLARE_OUT_SPEC(, NKikimrReplication::TReplicationConfig::TargetCase, stream, value) { diff --git a/ydb/core/tx/replication/controller/replication.h b/ydb/core/tx/replication/controller/replication.h index 06a14350864e..88ece4eed306 100644 --- a/ydb/core/tx/replication/controller/replication.h +++ b/ydb/core/tx/replication/controller/replication.h @@ -5,7 +5,9 @@ #include #include +#include #include +#include #include #include @@ -76,7 +78,7 @@ class TReplication: public TSimpleRefCount { virtual void AddWorker(ui64 id) = 0; virtual void RemoveWorker(ui64 id) = 0; - virtual const THashSet& GetWorkers() const = 0; + virtual void UpdateLag(ui64 workerId, TDuration lag) = 0; virtual void Progress(const TActorContext& ctx) = 0; virtual void Shutdown(const TActorContext& ctx) = 0; @@ -88,6 +90,7 @@ class TReplication: public TSimpleRefCount { friend class TTargetBase; void AddPendingAlterTarget(ui64 id); void RemovePendingAlterTarget(ui64 id); + void UpdateLag(ui64 targetId, TDuration lag); struct TDropOp { TActorId Sender; @@ -117,6 +120,7 @@ class TReplication: public TSimpleRefCount { void SetState(EState state, TString issue = {}); EState GetState() const; const TString& GetIssue() const; + const TMaybe GetLag() const; void SetNextTargetId(ui64 value); ui64 GetNextTargetId() const; diff --git a/ydb/core/tx/replication/controller/target_base.cpp b/ydb/core/tx/replication/controller/target_base.cpp index 1795119e2727..590cdd393884 100644 --- a/ydb/core/tx/replication/controller/target_base.cpp +++ b/ydb/core/tx/replication/controller/target_base.cpp @@ -89,26 +89,38 @@ void TTargetBase::SetIssue(const TString& value) { } void TTargetBase::AddWorker(ui64 id) { - Workers.insert(id); + Workers.emplace(id, TWorker{}); + TLagProvider::AddPendingLag(id); } void TTargetBase::RemoveWorker(ui64 id) { Workers.erase(id); } -const THashSet& TTargetBase::GetWorkers() const { +const THashMap& TTargetBase::GetWorkers() const { return Workers; } void TTargetBase::RemoveWorkers(const TActorContext& ctx) { if (!PendingRemoveWorkers) { PendingRemoveWorkers = true; - for (const auto& id : Workers) { + for (const auto& [id, _] : Workers) { ctx.Send(ctx.SelfID, new TEvPrivate::TEvRemoveWorker(Replication->GetId(), Id, id)); } } } +void TTargetBase::UpdateLag(ui64 workerId, TDuration lag) { + auto it = Workers.find(workerId); + if (it == Workers.end()) { + return; + } + + if (TLagProvider::UpdateLag(it->second, workerId, lag)) { + Replication->UpdateLag(GetId(), GetLag().GetRef()); + } +} + void TTargetBase::Progress(const TActorContext& ctx) { switch (DstState) { case EDstState::Creating: diff --git a/ydb/core/tx/replication/controller/target_base.h b/ydb/core/tx/replication/controller/target_base.h index 40bc810c0f33..d1b71710c2bc 100644 --- a/ydb/core/tx/replication/controller/target_base.h +++ b/ydb/core/tx/replication/controller/target_base.h @@ -1,19 +1,25 @@ #pragma once +#include "lag_provider.h" #include "replication.h" namespace NKikimr::NReplication::NController { -class TTargetBase: public TReplication::ITarget { +class TTargetBase + : public TReplication::ITarget + , public TLagProvider +{ protected: using ETargetKind = TReplication::ETargetKind; using EDstState = TReplication::EDstState; using EStreamState = TReplication::EStreamState; + struct TWorker: public TItemWithLag {}; inline TReplication* GetReplication() const { return Replication; } + const THashMap& GetWorkers() const; void RemoveWorkers(const TActorContext& ctx); public: @@ -43,7 +49,7 @@ class TTargetBase: public TReplication::ITarget { void AddWorker(ui64 id) override; void RemoveWorker(ui64 id) override; - const THashSet& GetWorkers() const override; + void UpdateLag(ui64 workerId, TDuration lag) override; void Progress(const TActorContext& ctx) override; void Shutdown(const TActorContext& ctx) override; @@ -65,7 +71,7 @@ class TTargetBase: public TReplication::ITarget { TActorId DstAlterer; TActorId DstRemover; TActorId WorkerRegistar; - THashSet Workers; + THashMap Workers; bool PendingRemoveWorkers = false; }; // TTargetBase diff --git a/ydb/core/tx/replication/controller/tx_describe_replication.cpp b/ydb/core/tx/replication/controller/tx_describe_replication.cpp index 5e9e8e0f930b..65b6518c737b 100644 --- a/ydb/core/tx/replication/controller/tx_describe_replication.cpp +++ b/ydb/core/tx/replication/controller/tx_describe_replication.cpp @@ -56,6 +56,9 @@ class TController::TTxDescribeReplication: public TTxBase { case TReplication::EState::Ready: case TReplication::EState::Removing: state.MutableStandBy(); + if (const auto lag = replication->GetLag()) { + state.MutableStandBy()->SetLagMilliSeconds(lag->MilliSeconds()); + } break; case TReplication::EState::Done: state.MutableDone(); diff --git a/ydb/core/tx/replication/controller/ya.make b/ydb/core/tx/replication/controller/ya.make index c3e9d9cbc78d..52bdfbbabdcb 100644 --- a/ydb/core/tx/replication/controller/ya.make +++ b/ydb/core/tx/replication/controller/ya.make @@ -19,6 +19,7 @@ SRCS( dst_alterer.cpp dst_creator.cpp dst_remover.cpp + lag_provider.cpp logging.cpp nodes_manager.cpp private_events.cpp diff --git a/ydb/core/tx/replication/service/service.cpp b/ydb/core/tx/replication/service/service.cpp index 7a5aa3512e04..a8ea5d64ef5f 100644 --- a/ydb/core/tx/replication/service/service.cpp +++ b/ydb/core/tx/replication/service/service.cpp @@ -55,6 +55,12 @@ class TSessionInfo { return it->second; } + TWorkerId GetWorkerId(const TActorId& id) const { + auto it = ActorIdToWorkerId.find(id); + Y_ABORT_UNLESS(it != ActorIdToWorkerId.end()); + return it->second; + } + TActorId RegisterWorker(IActorOps* ops, const TWorkerId& id, IActor* actor) { auto res = Workers.emplace(id, ops->Register(actor)); Y_ABORT_UNLESS(res.second); @@ -329,33 +335,49 @@ class TReplicationService: public TActorBootstrapped { void Handle(TEvWorker::TEvGone::TPtr& ev) { LOG_T("Handle " << ev->Get()->ToString()); - auto wit = WorkerActorIdToSession.find(ev->Sender); - if (wit == WorkerActorIdToSession.end()) { - LOG_W("Unknown worker has gone" - << ": worker# " << ev->Sender); - return; - } - - auto it = Sessions.find(wit->second); - if (it == Sessions.end()) { - LOG_E("Cannot find session" - << ": worker# " << ev->Sender - << ", session# " << wit->second); + auto* session = SessionFromWorker(ev->Sender); + if (!session) { return; } - auto& session = it->second; - if (!session.HasWorker(ev->Sender)) { + if (!session->HasWorker(ev->Sender)) { LOG_E("Cannot find worker" - << ": worker# " << ev->Sender - << ", session# " << wit->second); + << ": worker# " << ev->Sender); return; } LOG_I("Worker has gone" << ": worker# " << ev->Sender); WorkerActorIdToSession.erase(ev->Sender); - session.StopWorker(this, ev->Sender, ToReason(ev->Get()->Status), ev->Get()->ErrorDescription); + session->StopWorker(this, ev->Sender, ToReason(ev->Get()->Status), ev->Get()->ErrorDescription); + } + + void Handle(TEvWorker::TEvStatus::TPtr& ev) { + LOG_T("Handle " << ev->Get()->ToString()); + + auto* session = SessionFromWorker(ev->Sender); + if (session && session->HasWorker(ev->Sender)) { + session->SendWorkerStatus(this, session->GetWorkerId(ev->Sender), ev->Get()->Lag); + } + } + + TSessionInfo* SessionFromWorker(const TActorId& id) { + auto wit = WorkerActorIdToSession.find(id); + if (wit == WorkerActorIdToSession.end()) { + LOG_W("Unknown worker has gone" + << ": worker# " << id); + return nullptr; + } + + auto it = Sessions.find(wit->second); + if (it == Sessions.end()) { + LOG_E("Cannot find session" + << ": worker# " << id + << ", session# " << wit->second); + return nullptr; + } + + return &it->second; } static NKikimrReplication::TEvWorkerStatus::EReason ToReason(TEvWorker::TEvGone::EStatus status) { @@ -399,6 +421,7 @@ class TReplicationService: public TActorBootstrapped { hFunc(TEvService::TEvRunWorker, Handle); hFunc(TEvService::TEvStopWorker, Handle); hFunc(TEvWorker::TEvGone, Handle); + hFunc(TEvWorker::TEvStatus, Handle); sFunc(TEvents::TEvPoison, PassAway); } } diff --git a/ydb/core/tx/replication/service/service.h b/ydb/core/tx/replication/service/service.h index 92a88099d092..c0da22f9af2c 100644 --- a/ydb/core/tx/replication/service/service.h +++ b/ydb/core/tx/replication/service/service.h @@ -63,6 +63,13 @@ struct TEvService { Record.SetReason(reason); Record.SetErrorDescription(errorDescription); } + + explicit TEvWorkerStatus(const TWorkerId& id, TDuration lag) { + id.Serialize(*Record.MutableWorker()); + Record.SetStatus(NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING); + Record.SetReason(NKikimrReplication::TEvWorkerStatus::REASON_INFO); + Record.SetLagMilliSeconds(lag.MilliSeconds()); + } }; }; diff --git a/ydb/core/tx/replication/service/topic_reader.cpp b/ydb/core/tx/replication/service/topic_reader.cpp index 52f2899a6277..96ec39d13972 100644 --- a/ydb/core/tx/replication/service/topic_reader.cpp +++ b/ydb/core/tx/replication/service/topic_reader.cpp @@ -58,7 +58,7 @@ class TRemoteTopicReader: public TActor { for (auto& msg : result.Messages) { Y_ABORT_UNLESS(msg.GetCodec() == NYdb::NTopic::ECodec::RAW); - records.emplace_back(msg.GetOffset(), std::move(msg.GetData())); + records.emplace_back(msg.GetOffset(), std::move(msg.GetData()), msg.GetCreateTime()); } Send(Worker, new TEvWorker::TEvData(ToString(result.PartitionId), std::move(records))); diff --git a/ydb/core/tx/replication/service/worker.cpp b/ydb/core/tx/replication/service/worker.cpp index b51986f894d4..5bbac37f23bf 100644 --- a/ydb/core/tx/replication/service/worker.cpp +++ b/ydb/core/tx/replication/service/worker.cpp @@ -1,6 +1,7 @@ #include "logging.h" #include "worker.h" +#include #include #include #include @@ -11,15 +12,17 @@ namespace NKikimr::NReplication::NService { -TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data) +TEvWorker::TEvData::TRecord::TRecord(ui64 offset, const TString& data, TInstant createTime) : Offset(offset) , Data(data) + , CreateTime(createTime) { } -TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data) +TEvWorker::TEvData::TRecord::TRecord(ui64 offset, TString&& data, TInstant createTime) : Offset(offset) , Data(std::move(data)) + , CreateTime(createTime) { } @@ -39,6 +42,7 @@ void TEvWorker::TEvData::TRecord::Out(IOutputStream& out) const { out << "{" << " Offset: " << Offset << " Data: " << Data.size() << "b" + << " CreateTime: " << CreateTime.ToStringUpToSeconds() << " }"; } @@ -62,6 +66,17 @@ TString TEvWorker::TEvGone::ToString() const { << " }"; } +TEvWorker::TEvStatus::TEvStatus(TDuration lag) + : Lag(lag) +{ +} + +TString TEvWorker::TEvStatus::ToString() const { + return TStringBuilder() << ToStringHeader() << " {" + << " Lag: " << Lag + << " }"; +} + class TWorker: public TActorBootstrapped { class TActorInfo { std::function CreateFn; @@ -147,6 +162,17 @@ class TWorker: public TActorBootstrapped { return; } + if (InFlightData) { + const auto& records = InFlightData->Records; + auto it = MinElementBy(records, [](const auto& record) { + return record.CreateTime; + }); + + if (it != records.end()) { + Lag = TlsActivationContext->Now() - it->CreateTime; + } + } + InFlightData.Reset(); if (Reader) { Send(ev->Forward(Reader)); @@ -208,6 +234,22 @@ class TWorker: public TActorBootstrapped { PassAway(); } + void ScheduleLagReport() { + const auto random = TDuration::MicroSeconds(TAppData::RandomProvider->GenRand64() % LagReportInterval.MicroSeconds()); + Schedule(LagReportInterval + random, new TEvents::TEvWakeup()); + } + + void ReportLag() { + ScheduleLagReport(); + + if (!Reader || !Writer) { + return; + } + + Send(Parent, new TEvWorker::TEvStatus(Lag)); + Lag = TDuration::Zero(); + } + void PassAway() override { for (auto* actor : {&Reader, &Writer}) { Send(*actor, new TEvents::TEvPoison()); @@ -228,6 +270,7 @@ class TWorker: public TActorBootstrapped { : Parent(parent) , Reader(std::move(createReaderFn)) , Writer(std::move(createWriterFn)) + , Lag(TDuration::Zero()) { } @@ -237,6 +280,7 @@ class TWorker: public TActorBootstrapped { } Become(&TThis::StateWork); + ScheduleLagReport(); } STATEFN(StateWork) { @@ -245,17 +289,21 @@ class TWorker: public TActorBootstrapped { hFunc(TEvWorker::TEvPoll, Handle); hFunc(TEvWorker::TEvData, Handle); hFunc(TEvWorker::TEvGone, Handle); + sFunc(TEvents::TEvWakeup, ReportLag); sFunc(TEvents::TEvPoison, PassAway); } } private: static constexpr ui32 MaxAttempts = 3; + static constexpr TDuration LagReportInterval = TDuration::Seconds(7); + const TActorId Parent; mutable TMaybe LogPrefix; TActorInfo Reader; TActorInfo Writer; THolder InFlightData; + TDuration Lag; }; IActor* CreateWorker( diff --git a/ydb/core/tx/replication/service/worker.h b/ydb/core/tx/replication/service/worker.h index c46eee5f3083..ec2fd4197230 100644 --- a/ydb/core/tx/replication/service/worker.h +++ b/ydb/core/tx/replication/service/worker.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -17,6 +18,7 @@ struct TEvWorker { EvPoll, EvData, EvGone, + EvStatus, EvEnd, }; @@ -30,9 +32,10 @@ struct TEvWorker { struct TRecord { ui64 Offset; TString Data; + TInstant CreateTime; - explicit TRecord(ui64 offset, const TString& data); - explicit TRecord(ui64 offset, TString&& data); + explicit TRecord(ui64 offset, const TString& data, TInstant createTime = TInstant::Zero()); + explicit TRecord(ui64 offset, TString&& data, TInstant createTime = TInstant::Zero()); void Out(IOutputStream& out) const; }; @@ -58,6 +61,13 @@ struct TEvWorker { explicit TEvGone(EStatus status, const TString& errorDescription = {}); TString ToString() const override; }; + + struct TEvStatus: public TEventLocal { + TDuration Lag; + + explicit TEvStatus(TDuration lag); + TString ToString() const override; + }; }; IActor* CreateWorker( diff --git a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h index 059449e4180d..8eba5e9a00a2 100644 --- a/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h +++ b/ydb/core/tx/replication/ydb_proxy/ydb_proxy.h @@ -158,6 +158,7 @@ struct TEvYdbProxy { explicit TMessage(const TDataEvent::TMessageBase& msg, ECodec codec) : Offset(msg.GetOffset()) , Data(msg.GetData()) + , CreateTime(msg.GetCreateTime()) , Codec(codec) { } @@ -176,12 +177,14 @@ struct TEvYdbProxy { ui64 GetOffset() const { return Offset; } const TString& GetData() const { return Data; } TString& GetData() { return Data; } + TInstant GetCreateTime() const { return CreateTime; } ECodec GetCodec() const { return Codec; } void Out(IOutputStream& out) const; private: ui64 Offset; TString Data; + TInstant CreateTime; ECodec Codec; }; diff --git a/ydb/public/api/protos/draft/ydb_replication.proto b/ydb/public/api/protos/draft/ydb_replication.proto index 4f28d886b380..08a78ca58621 100644 --- a/ydb/public/api/protos/draft/ydb_replication.proto +++ b/ydb/public/api/protos/draft/ydb_replication.proto @@ -6,6 +6,8 @@ import "ydb/public/api/protos/ydb_issue_message.proto"; import "ydb/public/api/protos/ydb_operation.proto"; import "ydb/public/api/protos/ydb_scheme.proto"; +import "google/protobuf/duration.proto"; + package Ydb.Replication; option java_package = "com.yandex.ydb.replication"; @@ -48,6 +50,7 @@ message DescribeReplicationResult { } message RunningState { + optional google.protobuf.Duration lag = 1; } message ErrorState { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp index b176e5c8c37c..14d9092fc152 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp @@ -407,6 +407,11 @@ int TCommandDescribe::PrintReplicationResponsePretty(const NYdb::NReplication::T Cout << Endl << "State: " << desc.GetState(); switch (desc.GetState()) { + case NReplication::TReplicationDescription::EState::Running: + if (const auto& lag = desc.GetRunningState().GetLag()) { + Cout << Endl << "Lag: " << *lag; + } + break; case NReplication::TReplicationDescription::EState::Error: Cout << Endl << "Issues: " << desc.GetErrorState().GetIssues().ToOneLineString(); break; diff --git a/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp b/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp index a950ba7311aa..acfb0484e077 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_replication.cpp @@ -10,6 +10,7 @@ #include #include +#include #include namespace NYdb { @@ -58,6 +59,15 @@ const TOAuthCredentials& TConnectionParams::GetOAuthCredentials() const { return std::get(Credentials_); } +TRunningState::TRunningState(const std::optional& lag) + : Lag_(lag) +{ +} + +const std::optional& TRunningState::GetLag() const { + return Lag_; +} + class TErrorState::TImpl { public: NYql::TIssues Issues; @@ -77,6 +87,10 @@ const NYql::TIssues& TErrorState::GetIssues() const { return Impl_->Issues; } +TDuration DurationToDuration(const google::protobuf::Duration& value) { + return TDuration::MilliSeconds(google::protobuf::util::TimeUtil::DurationToMilliseconds(value)); +} + template NYql::TIssues IssuesFromMessage(const ::google::protobuf::RepeatedPtrField& message) { NYql::TIssues issues; @@ -100,7 +114,8 @@ TReplicationDescription::TReplicationDescription(const Ydb::Replication::Describ switch (desc.state_case()) { case Ydb::Replication::DescribeReplicationResult::kRunning: - State_ = TRunningState(); + State_ = TRunningState(desc.running().has_lag() + ? std::make_optional(DurationToDuration(desc.running().lag())) : std::nullopt); break; case Ydb::Replication::DescribeReplicationResult::kError: diff --git a/ydb/public/sdk/cpp/client/draft/ydb_replication.h b/ydb/public/sdk/cpp/client/draft/ydb_replication.h index 1a6778125f93..a5b732dedac5 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_replication.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_replication.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -58,7 +59,17 @@ class TConnectionParams: private TCommonClientSettings { > Credentials_; }; -struct TRunningState {}; +struct TRunningState { +public: + TRunningState() = default; + explicit TRunningState(const std::optional& lag); + + const std::optional& GetLag() const; + +private: + std::optional Lag_; +}; + struct TDoneState {}; class TErrorState {