Skip to content

Commit

Permalink
Merge 6239e78 into aab3037
Browse files Browse the repository at this point in the history
  • Loading branch information
CyberROFL authored Jun 14, 2024
2 parents aab3037 + 6239e78 commit 6a3da97
Show file tree
Hide file tree
Showing 22 changed files with 312 additions and 36 deletions.
6 changes: 6 additions & 0 deletions ydb/core/grpc_services/rpc_replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include <ydb/library/actors/core/hfunc.h>
#include <ydb/public/api/protos/draft/ydb_replication.pb.h>

#include <google/protobuf/util/time_util.h>

namespace NKikimr::NGRpcService {

using namespace Ydb;
Expand Down Expand Up @@ -171,6 +173,10 @@ class TDescribeReplicationRPC: public TRpcSchemeRequestActor<TDescribeReplicatio
switch (from.GetStateCase()) {
case NKikimrReplication::TReplicationState::kStandBy:
to.mutable_running();
if (from.GetStandBy().HasLagMilliSeconds()) {
*to.mutable_running()->mutable_lag() = google::protobuf::util::TimeUtil::MillisecondsToDuration(
from.GetStandBy().GetLagMilliSeconds());
}
break;
case NKikimrReplication::TReplicationState::kError:
*to.mutable_error()->mutable_issues() = std::move(*from.MutableError()->MutableIssues());
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/protos/replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ message TReplicationConfig {

message TReplicationState {
message TStandBy {
optional uint32 LagMilliSeconds = 1;
}

message TPaused {
Expand Down Expand Up @@ -223,10 +224,12 @@ message TEvWorkerStatus {
REASON_UNSPECIFIED = 0;
REASON_ACK = 1;
REASON_ERROR = 2;
REASON_INFO = 3;
}

optional TWorkerIdentity Worker = 1;
optional EStatus Status = 2;
optional EReason Reason = 3;
optional string ErrorDescription = 4;
optional uint32 LagMilliSeconds = 5;
}
16 changes: 16 additions & 0 deletions ydb/core/tx/replication/controller/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,8 @@ void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorCont
case NKikimrReplication::TEvWorkerStatus::STATUS_RUNNING:
if (!session.HasWorker(id)) {
StopQueue.emplace(id, nodeId);
} else if (record.GetReason() == NKikimrReplication::TEvWorkerStatus::REASON_INFO) {
UpdateLag(id, TDuration::MilliSeconds(record.GetLagMilliSeconds()));
}
break;
case NKikimrReplication::TEvWorkerStatus::STATUS_STOPPED:
Expand Down Expand Up @@ -368,6 +370,20 @@ void TController::Handle(TEvService::TEvWorkerStatus::TPtr& ev, const TActorCont
ScheduleProcessQueues();
}

void TController::UpdateLag(const TWorkerId& id, TDuration lag) {
auto replication = Find(id.ReplicationId());
if (!replication) {
return;
}

auto* target = replication->FindTarget(id.TargetId());
if (!target) {
return;
}

target->UpdateLag(id.WorkerId(), lag);
}

void TController::Handle(TEvService::TEvRunWorker::TPtr& ev, const TActorContext& ctx) {
CLOG_T(ctx, "Handle " << ev->Get()->ToString());

Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/controller_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class TController
void StopWorker(ui32 nodeId, const TWorkerId& id);
void RemoveWorker(const TWorkerId& id, const TActorContext& ctx);
bool MaybeRemoveWorker(const TWorkerId& id, const TActorContext& ctx);
void UpdateLag(const TWorkerId& id, TDuration lag);

// local transactions
class TTxInitSchema;
Expand Down
46 changes: 46 additions & 0 deletions ydb/core/tx/replication/controller/lag_provider.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#include "lag_provider.h"

namespace NKikimr::NReplication::NController {

void TLagProvider::AddPendingLag(ui64 childId) {
Pending.insert(childId);
}

bool TLagProvider::UpdateLag(TItemWithLag& child, ui64 childId, TDuration lag) {
bool updated = false;

if (const auto& prevLag = child.Lag) {
auto it = ChildrenByLag.find(*prevLag);
Y_ABORT_UNLESS(it != ChildrenByLag.end());

it->second.erase(childId);
if (it->second.empty()) {
updated = true;
ChildrenByLag.erase(it);
}
}

child.Lag = lag;
ChildrenByLag[lag].insert(childId);

const bool pending = !Pending.empty();
Pending.erase(childId);

if (Pending) {
return false;
} else if (pending) {
return true;
}

return updated;
}

const TMaybe<TDuration> TLagProvider::GetLag() const {
if (ChildrenByLag.empty() || !Pending.empty()) {
return Nothing();
}

return ChildrenByLag.rbegin()->first;
}

}
25 changes: 25 additions & 0 deletions ydb/core/tx/replication/controller/lag_provider.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <util/datetime/base.h>
#include <util/generic/hash_set.h>
#include <util/generic/map.h>
#include <util/generic/maybe.h>

namespace NKikimr::NReplication::NController {

struct TItemWithLag {
TMaybe<TDuration> Lag;
};

class TLagProvider {
public:
void AddPendingLag(ui64 childId);
bool UpdateLag(TItemWithLag& child, ui64 childId, TDuration lag);
const TMaybe<TDuration> GetLag() const;

private:
TMap<TDuration, THashSet<ui64>> ChildrenByLag;
THashSet<ui64> Pending;
};

}
38 changes: 33 additions & 5 deletions ydb/core/tx/replication/controller/replication.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "lag_provider.h"
#include "private_events.h"
#include "replication.h"
#include "secret_resolver.h"
Expand All @@ -17,9 +18,18 @@

namespace NKikimr::NReplication::NController {

class TReplication::TImpl {
class TReplication::TImpl: public TLagProvider {
friend class TReplication;

struct TTarget: public TItemWithLag {
THolder<ITarget> Ptr;

explicit TTarget(ITarget* iface)
: Ptr(iface)
{
}
};

void ResolveSecret(const TString& secretName, const TActorContext& ctx) {
if (SecretResolver) {
return;
Expand Down Expand Up @@ -62,7 +72,7 @@ class TReplication::TImpl {

void ProgressTargets(const TActorContext& ctx) {
for (auto& [_, target] : Targets) {
target->Progress(ctx);
target.Ptr->Progress(ctx);
}
}

Expand All @@ -79,6 +89,7 @@ class TReplication::TImpl {
ui64 AddTarget(TReplication* self, ui64 id, ETargetKind kind, Args&&... args) {
const auto res = Targets.emplace(id, CreateTarget(self, id, kind, std::forward<Args>(args)...));
Y_VERIFY_S(res.second, "Duplicate target: " << id);
TLagProvider::AddPendingLag(id);
return id;
}

Expand All @@ -90,7 +101,7 @@ class TReplication::TImpl {
ITarget* FindTarget(ui64 id) {
auto it = Targets.find(id);
return it != Targets.end()
? it->second.Get()
? it->second.Ptr.Get()
: nullptr;
}

Expand Down Expand Up @@ -151,7 +162,7 @@ class TReplication::TImpl {

void Shutdown(const TActorContext& ctx) {
for (auto& [_, target] : Targets) {
target->Shutdown(ctx);
target.Ptr->Shutdown(ctx);
}

for (auto* x : TVector<TActorId*>{&SecretResolver, &TargetDiscoverer, &TenantResolver, &YdbProxy}) {
Expand All @@ -174,6 +185,15 @@ class TReplication::TImpl {
SetState(EState::Error, issue);
}

void UpdateLag(ui64 targetId, TDuration lag) {
auto it = Targets.find(targetId);
if (it == Targets.end()) {
return;
}

TLagProvider::UpdateLag(it->second, targetId, lag);
}

private:
const ui64 ReplicationId;
const TPathId PathId;
Expand All @@ -183,7 +203,7 @@ class TReplication::TImpl {
EState State = EState::Ready;
TString Issue;
ui64 NextTargetId = 1;
THashMap<ui64, THolder<ITarget>> Targets;
THashMap<ui64, TTarget> Targets;
THashSet<ui64> PendingAlterTargets;
TActorId SecretResolver;
TActorId YdbProxy;
Expand Down Expand Up @@ -329,6 +349,14 @@ bool TReplication::CheckAlterDone() const {
return Impl->State == EState::Ready && Impl->PendingAlterTargets.empty();
}

void TReplication::UpdateLag(ui64 targetId, TDuration lag) {
Impl->UpdateLag(targetId, lag);
}

const TMaybe<TDuration> TReplication::GetLag() const {
return Impl->GetLag();
}

}

Y_DECLARE_OUT_SPEC(, NKikimrReplication::TReplicationConfig::TargetCase, stream, value) {
Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/replication/controller/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#include <ydb/core/base/defs.h>
#include <ydb/core/scheme/scheme_pathid.h>

#include <util/datetime/base.h>
#include <util/generic/hash_set.h>
#include <util/generic/maybe.h>
#include <util/generic/ptr.h>

#include <memory>
Expand Down Expand Up @@ -76,7 +78,7 @@ class TReplication: public TSimpleRefCount<TReplication> {

virtual void AddWorker(ui64 id) = 0;
virtual void RemoveWorker(ui64 id) = 0;
virtual const THashSet<ui64>& GetWorkers() const = 0;
virtual void UpdateLag(ui64 workerId, TDuration lag) = 0;

virtual void Progress(const TActorContext& ctx) = 0;
virtual void Shutdown(const TActorContext& ctx) = 0;
Expand All @@ -88,6 +90,7 @@ class TReplication: public TSimpleRefCount<TReplication> {
friend class TTargetBase;
void AddPendingAlterTarget(ui64 id);
void RemovePendingAlterTarget(ui64 id);
void UpdateLag(ui64 targetId, TDuration lag);

struct TDropOp {
TActorId Sender;
Expand Down Expand Up @@ -117,6 +120,7 @@ class TReplication: public TSimpleRefCount<TReplication> {
void SetState(EState state, TString issue = {});
EState GetState() const;
const TString& GetIssue() const;
const TMaybe<TDuration> GetLag() const;

void SetNextTargetId(ui64 value);
ui64 GetNextTargetId() const;
Expand Down
18 changes: 15 additions & 3 deletions ydb/core/tx/replication/controller/target_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,26 +89,38 @@ void TTargetBase::SetIssue(const TString& value) {
}

void TTargetBase::AddWorker(ui64 id) {
Workers.insert(id);
Workers.emplace(id, TWorker{});
TLagProvider::AddPendingLag(id);
}

void TTargetBase::RemoveWorker(ui64 id) {
Workers.erase(id);
}

const THashSet<ui64>& TTargetBase::GetWorkers() const {
const THashMap<ui64, TTargetBase::TWorker>& TTargetBase::GetWorkers() const {
return Workers;
}

void TTargetBase::RemoveWorkers(const TActorContext& ctx) {
if (!PendingRemoveWorkers) {
PendingRemoveWorkers = true;
for (const auto& id : Workers) {
for (const auto& [id, _] : Workers) {
ctx.Send(ctx.SelfID, new TEvPrivate::TEvRemoveWorker(Replication->GetId(), Id, id));
}
}
}

void TTargetBase::UpdateLag(ui64 workerId, TDuration lag) {
auto it = Workers.find(workerId);
if (it == Workers.end()) {
return;
}

if (TLagProvider::UpdateLag(it->second, workerId, lag)) {
Replication->UpdateLag(GetId(), GetLag().GetRef());
}
}

void TTargetBase::Progress(const TActorContext& ctx) {
switch (DstState) {
case EDstState::Creating:
Expand Down
12 changes: 9 additions & 3 deletions ydb/core/tx/replication/controller/target_base.h
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
#pragma once

#include "lag_provider.h"
#include "replication.h"

namespace NKikimr::NReplication::NController {

class TTargetBase: public TReplication::ITarget {
class TTargetBase
: public TReplication::ITarget
, public TLagProvider
{
protected:
using ETargetKind = TReplication::ETargetKind;
using EDstState = TReplication::EDstState;
using EStreamState = TReplication::EStreamState;
struct TWorker: public TItemWithLag {};

inline TReplication* GetReplication() const {
return Replication;
}

const THashMap<ui64, TWorker>& GetWorkers() const;
void RemoveWorkers(const TActorContext& ctx);

public:
Expand Down Expand Up @@ -43,7 +49,7 @@ class TTargetBase: public TReplication::ITarget {

void AddWorker(ui64 id) override;
void RemoveWorker(ui64 id) override;
const THashSet<ui64>& GetWorkers() const override;
void UpdateLag(ui64 workerId, TDuration lag) override;

void Progress(const TActorContext& ctx) override;
void Shutdown(const TActorContext& ctx) override;
Expand All @@ -65,7 +71,7 @@ class TTargetBase: public TReplication::ITarget {
TActorId DstAlterer;
TActorId DstRemover;
TActorId WorkerRegistar;
THashSet<ui64> Workers;
THashMap<ui64, TWorker> Workers;
bool PendingRemoveWorkers = false;

}; // TTargetBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ class TController::TTxDescribeReplication: public TTxBase {
case TReplication::EState::Ready:
case TReplication::EState::Removing:
state.MutableStandBy();
if (const auto lag = replication->GetLag()) {
state.MutableStandBy()->SetLagMilliSeconds(lag->MilliSeconds());
}
break;
case TReplication::EState::Done:
state.MutableDone();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/replication/controller/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ SRCS(
dst_alterer.cpp
dst_creator.cpp
dst_remover.cpp
lag_provider.cpp
logging.cpp
nodes_manager.cpp
private_events.cpp
Expand Down
Loading

0 comments on commit 6a3da97

Please sign in to comment.