Skip to content

Commit

Permalink
(refactoring) Split TEvChangeExchange into two parts: common & DS KIK…
Browse files Browse the repository at this point in the history
…IMR-20673 (#1152)
  • Loading branch information
CyberROFL authored Jan 19, 2024
1 parent d6ad10e commit e7c25d2
Show file tree
Hide file tree
Showing 16 changed files with 317 additions and 286 deletions.
3 changes: 2 additions & 1 deletion ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ struct TKikimrEvents : TEvents {
ES_HEALTH_CHECK,
ES_DQ = NYql::NDq::TDqEvents::ES_DQ_COMPUTE, // 4212
ES_YQ, // 4213
ES_CHANGE_EXCHANGE,
ES_CHANGE_EXCHANGE_DATASHARD,
ES_DATABASE_SERVICE, //4215
ES_SEQUENCESHARD, // 4216
ES_SEQUENCEPROXY, // 4217
Expand Down Expand Up @@ -172,6 +172,7 @@ struct TKikimrEvents : TEvents {
ES_PQ_PARTITION_CHOOSER,
ES_GRAPH,
ES_REPLICATION_SERVICE,
ES_CHANGE_EXCHANGE,
};
};

Expand Down
125 changes: 125 additions & 0 deletions ydb/core/change_exchange/change_exchange.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
#include "change_exchange.h"

#include <util/string/builder.h>
#include <util/string/join.h>

namespace NKikimr::NChangeExchange {

/// TEvEnqueueRecords
TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records)
: Records(records)
{
}

TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvEnqueueRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize)
: Order(order)
, PathId(pathId)
, BodySize(bodySize)
{
}

void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const {
out << "{"
<< " Order: " << Order
<< " PathId: " << PathId
<< " BodySize: " << BodySize
<< " }";
}

/// TEvRequestRecords
TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRequestRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize)
: Order(order)
, BodySize(bodySize)
{
}

bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const {
return Order < rhs.Order;
}

void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const {
out << "{"
<< " Order: " << Order
<< " BodySize: " << BodySize
<< " }";
}

/// TEvRemoveRecords
TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRemoveRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

/// TEvRecords
TEvChangeExchange::TEvRecords::TEvRecords(const TVector<IChangeRecord::TPtr>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRecords::TEvRecords(TVector<IChangeRecord::TPtr>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

/// TEvForgetRecords
TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records)
: Records(records)
{
}

TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvForgetRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

}
101 changes: 101 additions & 0 deletions ydb/core/change_exchange/change_exchange.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#pragma once

#include "change_record.h"

#include <ydb/core/base/defs.h>
#include <ydb/core/base/events.h>
#include <ydb/core/scheme/scheme_pathid.h>

#include <util/generic/vector.h>

namespace NKikimr::NChangeExchange {

struct TEvChangeExchange {
enum EEv {
// Enqueue for sending
EvEnqueueRecords = EventSpaceBegin(TKikimrEvents::ES_CHANGE_EXCHANGE),
// Request change record(s) by id
EvRequestRecords,
// Change record(s)
EvRecords,
// Remove change record(s) from local database
EvRemoveRecords,
// Already removed records that the sender should forget about
EvForgetRecods,

EvEnd,
};

static_assert(EvEnd < EventSpaceEnd(TKikimrEvents::ES_CHANGE_EXCHANGE));

struct TEvEnqueueRecords: public TEventLocal<TEvEnqueueRecords, EvEnqueueRecords> {
struct TRecordInfo {
ui64 Order;
TPathId PathId;
ui64 BodySize;

TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize);

void Out(IOutputStream& out) const;
};

TVector<TRecordInfo> Records;

explicit TEvEnqueueRecords(const TVector<TRecordInfo>& records);
explicit TEvEnqueueRecords(TVector<TRecordInfo>&& records);
TString ToString() const override;
};

struct TEvRequestRecords: public TEventLocal<TEvRequestRecords, EvRequestRecords> {
struct TRecordInfo {
ui64 Order;
ui64 BodySize;

TRecordInfo(ui64 order, ui64 bodySize = 0);

bool operator<(const TRecordInfo& rhs) const;
void Out(IOutputStream& out) const;
};

TVector<TRecordInfo> Records;

explicit TEvRequestRecords(const TVector<TRecordInfo>& records);
explicit TEvRequestRecords(TVector<TRecordInfo>&& records);
TString ToString() const override;
};

struct TEvRemoveRecords: public TEventLocal<TEvRemoveRecords, EvRemoveRecords> {
TVector<ui64> Records;

explicit TEvRemoveRecords(const TVector<ui64>& records);
explicit TEvRemoveRecords(TVector<ui64>&& records);
TString ToString() const override;
};

struct TEvRecords: public TEventLocal<TEvRecords, EvRecords> {
TVector<IChangeRecord::TPtr> Records;

explicit TEvRecords(const TVector<IChangeRecord::TPtr>& records);
explicit TEvRecords(TVector<IChangeRecord::TPtr>&& records);
TString ToString() const override;
};

struct TEvForgetRecords: public TEventLocal<TEvForgetRecords, EvForgetRecods> {
TVector<ui64> Records;

explicit TEvForgetRecords(const TVector<ui64>& records);
explicit TEvForgetRecords(TVector<ui64>&& records);
TString ToString() const override;
};

}; // TEvChangeExchange

}

Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvEnqueueRecords::TRecordInfo, o, x) {
return x.Out(o);
}

Y_DECLARE_OUT_SPEC(inline, NKikimr::NChangeExchange::TEvChangeExchange::TEvRequestRecords::TRecordInfo, o, x) {
return x.Out(o);
}
6 changes: 6 additions & 0 deletions ydb/core/change_exchange/ya.make
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
LIBRARY()

SRCS(
change_exchange.cpp
change_record.cpp
)

GENERATE_ENUM_SERIALIZATION(change_record.h)

PEERDIR(
ydb/core/base
ydb/core/scheme
)

YQL_LAST_ABI_VERSION()

END()
124 changes: 2 additions & 122 deletions ydb/core/tx/datashard/change_exchange.cpp
Original file line number Diff line number Diff line change
@@ -1,127 +1,8 @@
#include "change_exchange.h"

#include <util/string/builder.h>
#include <util/string/join.h>

namespace NKikimr {
namespace NDataShard {

/// TEvEnqueueRecords
TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(const TVector<TRecordInfo>& records)
: Records(records)
{
}

TEvChangeExchange::TEvEnqueueRecords::TEvEnqueueRecords(TVector<TRecordInfo>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvEnqueueRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::TRecordInfo(ui64 order, const TPathId& pathId, ui64 bodySize)
: Order(order)
, PathId(pathId)
, BodySize(bodySize)
{
}

void TEvChangeExchange::TEvEnqueueRecords::TRecordInfo::Out(IOutputStream& out) const {
out << "{"
<< " Order: " << Order
<< " PathId: " << PathId
<< " BodySize: " << BodySize
<< " }";
}

/// TEvRequestRecords
TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(const TVector<TRecordInfo>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRequestRecords::TEvRequestRecords(TVector<TRecordInfo>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRequestRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

TEvChangeExchange::TEvRequestRecords::TRecordInfo::TRecordInfo(ui64 order, ui64 bodySize)
: Order(order)
, BodySize(bodySize)
{
}

bool TEvChangeExchange::TEvRequestRecords::TRecordInfo::operator<(const TRecordInfo& rhs) const {
return Order < rhs.Order;
}

void TEvChangeExchange::TEvRequestRecords::TRecordInfo::Out(IOutputStream& out) const {
out << "{"
<< " Order: " << Order
<< " BodySize: " << BodySize
<< " }";
}

/// TEvRemoveRecords
TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(const TVector<ui64>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRemoveRecords::TEvRemoveRecords(TVector<ui64>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRemoveRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

/// TEvRecords
TEvChangeExchange::TEvRecords::TEvRecords(const TVector<NChangeExchange::IChangeRecord::TPtr>& records)
: Records(records)
{
}

TEvChangeExchange::TEvRecords::TEvRecords(TVector<NChangeExchange::IChangeRecord::TPtr>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}

/// TEvForgetRecords
TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(const TVector<ui64>& records)
: Records(records)
{
}

TEvChangeExchange::TEvForgetRecords::TEvForgetRecords(TVector<ui64>&& records)
: Records(std::move(records))
{
}

TString TEvChangeExchange::TEvForgetRecords::ToString() const {
return TStringBuilder() << ToStringHeader() << " {"
<< " Records [" << JoinSeq(",", Records) << "]"
<< " }";
}
namespace NKikimr::NDataShard {

/// TEvAddSender
TEvChangeExchange::TEvAddSender::TEvAddSender(const TTableId& userTableId, TEvChangeExchange::ESenderType type, const TPathId& pathId)
Expand Down Expand Up @@ -151,5 +32,4 @@ TString TEvChangeExchange::TEvRemoveSender::ToString() const {
<< " }";
}

} // NDataShard
} // NKikimr
}
Loading

0 comments on commit e7c25d2

Please sign in to comment.