Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

24-2: CDC Initial Scan fixes #5992

Merged
merged 3 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/protos/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,9 @@ message TSchemeShardConfig {
optional uint32 StatsMaxExecuteMs = 3 [default = 10];

repeated TInFlightCounterConfig InFlightCounterConfig = 4;

// number of shards per table
optional uint32 MaxCdcInitialScanShardsInFlight = 5 [default = 10];
}

message TCompactionConfig {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/protos/counters_datashard.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum ESimpleCounters {
COUNTER_CHANGE_RECORDS_REQUESTED = 17 [(CounterOpts) = {Name: "ChangeRecordsRequested"}];
COUNTER_CHANGE_DELIVERY_LAG = 18 [(CounterOpts) = {Name: "ChangeDeliveryLag"}];
COUNTER_CHANGE_DATA_LAG = 19 [(CounterOpts) = {Name: "ChangeDataLag"}];
COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY = 20 [(CounterOpts) = {Name: "ChangeQueueReservedCapacity"}];
}

enum ECumulativeCounters {
Expand Down
48 changes: 37 additions & 11 deletions ydb/core/tx/datashard/cdc_stream_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,26 @@
#include "datashard_impl.h"

#include <ydb/core/protos/datashard_config.pb.h>
#include <ydb/core/protos/tx_datashard.pb.h>

#include <util/generic/maybe.h>
#include <util/string/builder.h>

#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream)
#define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream)
#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan] " << stream)
#define LOG_D(stream) LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream)
#define LOG_I(stream) LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream)
#define LOG_W(stream) LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD, "[CdcStreamScan][" << TabletID() << "] " << stream)

namespace NKikimr::NDataShard {

using namespace NActors;
using namespace NTable;
using namespace NTabletFlatExecutor;

void TCdcStreamScanManager::TStats::Serialize(NKikimrTxDataShard::TEvCdcStreamScanResponse_TStats& proto) const {
proto.SetRowsProcessed(RowsProcessed);
proto.SetBytesProcessed(BytesProcessed);
}

void TCdcStreamScanManager::Reset() {
Scans.clear();
TxIdToPathId.clear();
Expand Down Expand Up @@ -95,6 +101,7 @@ void TCdcStreamScanManager::Complete(const TPathId& streamPathId) {
return;
}

CompletedScans[streamPathId] = it->second.Stats;
TxIdToPathId.erase(it->second.TxId);
Scans.erase(it);
}
Expand All @@ -104,6 +111,15 @@ void TCdcStreamScanManager::Complete(ui64 txId) {
Complete(TxIdToPathId.at(txId));
}

bool TCdcStreamScanManager::IsCompleted(const TPathId& streamPathId) const {
return CompletedScans.contains(streamPathId);
}

const TCdcStreamScanManager::TStats& TCdcStreamScanManager::GetCompletedStats(const TPathId& streamPathId) const {
Y_ABORT_UNLESS(CompletedScans.contains(streamPathId));
return CompletedScans.at(streamPathId);
}

TCdcStreamScanManager::TScanInfo* TCdcStreamScanManager::Get(const TPathId& streamPathId) {
return Scans.FindPtr(streamPathId);
}
Expand Down Expand Up @@ -203,6 +219,10 @@ class TDataShard::TTxCdcStreamScanProgress
return row;
}

ui64 TabletID() const {
return Self->TabletID();
}

public:
explicit TTxCdcStreamScanProgress(TDataShard* self, TDataShard::TEvPrivate::TEvCdcStreamScanProgress::TPtr ev)
: TBase(self)
Expand Down Expand Up @@ -452,8 +472,7 @@ class TCdcStreamScan: public IActorCallback, public IScan {
PathIdFromPathId(StreamPathId, response->Record.MutableStreamPathId());
response->Record.SetStatus(status);
response->Record.SetErrorDescription(error);
response->Record.MutableStats()->SetRowsProcessed(Stats.RowsProcessed);
response->Record.MutableStats()->SetBytesProcessed(Stats.BytesProcessed);
Stats.Serialize(*response->Record.MutableStats());

Send(ReplyTo, std::move(response));
}
Expand Down Expand Up @@ -568,14 +587,17 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
TEvDataShard::TEvCdcStreamScanRequest::TPtr Request;
THolder<IEventHandle> Response; // response to sender or forward to scanner

THolder<IEventHandle> MakeResponse(const TActorContext& ctx,
NKikimrTxDataShard::TEvCdcStreamScanResponse::EStatus status, const TString& error = {}) const
{
template <typename... Args>
THolder<IEventHandle> MakeResponse(const TActorContext& ctx, Args&&... args) const {
return MakeHolder<IEventHandle>(Request->Sender, ctx.SelfID, new TEvDataShard::TEvCdcStreamScanResponse(
Request->Get()->Record, Self->TabletID(), status, error
Request->Get()->Record, Self->TabletID(), std::forward<Args>(args)...
));
}

ui64 TabletID() const {
return Self->TabletID();
}

public:
explicit TTxCdcStreamScanRun(TDataShard* self, TEvDataShard::TEvCdcStreamScanRequest::TPtr ev)
: TBase(self)
Expand Down Expand Up @@ -635,6 +657,11 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
} else if (info->ScanId) {
return true; // nop, scan actor will report state when it starts
}
} else if (Self->CdcStreamScanManager.IsCompleted(streamPathId)) {
Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE);
Self->CdcStreamScanManager.GetCompletedStats(streamPathId).Serialize(
*Response->Get<TEvDataShard::TEvCdcStreamScanResponse>()->Record.MutableStats());
return true;
} else if (Self->CdcStreamScanManager.Size()) {
Response = MakeResponse(ctx, NKikimrTxDataShard::TEvCdcStreamScanResponse::OVERLOADED);
return true;
Expand Down Expand Up @@ -714,8 +741,7 @@ void TDataShard::Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const T

void TDataShard::Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx) {
if (!CdcStreamScanManager.Has(ev->Get()->TxId)) {
LOG_W("Unknown cdc stream scan actor registered"
<< ": at: " << TabletID());
LOG_W("Unknown cdc stream scan actor registered");
return;
}

Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/datashard/cdc_stream_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@
#include <util/generic/hash.h>
#include <util/generic/maybe.h>

namespace NKikimrTxDataShard {
class TEvCdcStreamScanResponse_TStats;
}

namespace NKikimr::NDataShard {

class TCdcStreamScanManager {
public:
struct TStats {
ui64 RowsProcessed = 0;
ui64 BytesProcessed = 0;

void Serialize(NKikimrTxDataShard::TEvCdcStreamScanResponse_TStats& proto) const;
};

private:
Expand All @@ -39,6 +45,8 @@ class TCdcStreamScanManager {

void Complete(const TPathId& streamPathId);
void Complete(ui64 txId);
bool IsCompleted(const TPathId& streamPathId) const;
const TStats& GetCompletedStats(const TPathId& streamPathId) const;

TScanInfo* Get(const TPathId& streamPathId);
const TScanInfo* Get(const TPathId& streamPathId) const;
Expand All @@ -57,6 +65,7 @@ class TCdcStreamScanManager {

private:
THashMap<TPathId, TScanInfo> Scans;
THashMap<TPathId, TStats> CompletedScans;
THashMap<ui64, TPathId> TxIdToPathId;
};

Expand Down
9 changes: 9 additions & 0 deletions ydb/core/tx/datashard/datashard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,7 @@ void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {

IncCounter(COUNTER_CHANGE_RECORDS_REMOVED);
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);

CheckChangesQueueNoOverflow();
}
Expand Down Expand Up @@ -992,10 +993,16 @@ void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange
}
}
}

if (auto it = ChangeQueueReservations.find(cookie); it != ChangeQueueReservations.end()) {
ChangeQueueReservedCapacity -= it->second;
ChangeQueueReservedCapacity += records.size();
}

UpdateChangeExchangeLag(now);
IncCounter(COUNTER_CHANGE_RECORDS_ENQUEUED, forward.size());
SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);

Y_ABORT_UNLESS(OutChangeSender);
Send(OutChangeSender, new NChangeExchange::TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
Expand Down Expand Up @@ -1030,6 +1037,8 @@ ui64 TDataShard::ReserveChangeQueueCapacity(ui32 capacity) {
const auto cookie = NextChangeQueueReservationCookie++;
ChangeQueueReservations.emplace(cookie, capacity);
ChangeQueueReservedCapacity += capacity;
SetCounter(COUNTER_CHANGE_QUEUE_RESERVED_CAPACITY, ChangeQueueReservedCapacity);

return cookie;
}

Expand Down
43 changes: 43 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2634,6 +2634,49 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}

Y_UNIT_TEST(InitialScanRacyCompleteAndRequest) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableChangefeedInitialScan(true)
);

auto& runtime = *server->GetRuntime();
const auto edgeActor = runtime.AllocateEdgeActor();

SetupLogging(runtime);
InitRoot(server, edgeActor);
CreateShardedTable(server, edgeActor, "/Root", "Table", SimpleTable());

std::unique_ptr<IEventHandle> doneResponse;
auto blockDone = runtime.AddObserver<TEvDataShard::TEvCdcStreamScanResponse>(
[&](TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev) {
if (ev->Get()->Record.GetStatus() == NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE) {
doneResponse.reset(ev.Release());
}
}
);

WaitTxNotification(server, edgeActor, AsyncAlterAddStream(server, "/Root", "Table",
WithInitialScan(Updates(NKikimrSchemeOp::ECdcStreamFormatJson))));
WaitFor(runtime, [&]{ return bool(doneResponse); }, "doneResponse");
blockDone.Remove();

bool done = false;
auto waitDone = runtime.AddObserver<TEvDataShard::TEvCdcStreamScanResponse>(
[&](TEvDataShard::TEvCdcStreamScanResponse::TPtr& ev) {
if (ev->Get()->Record.GetStatus() == NKikimrTxDataShard::TEvCdcStreamScanResponse::DONE) {
done = true;
}
}
);

const auto& record = doneResponse->Get<TEvDataShard::TEvCdcStreamScanResponse>()->Record;
RebootTablet(runtime, record.GetTablePathId().GetOwnerId(), edgeActor);
WaitFor(runtime, [&]{ return done; }, "done");
}

Y_UNIT_TEST(InitialScanUpdatedRows) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ struct TSchemeShard::TCdcStreamScan::TTxProgress: public TTransactionBase<TSchem
}

while (!streamInfo->PendingShards.empty()) {
if (streamInfo->InProgressShards.size() >= streamInfo->MaxInProgressShards) {
if (streamInfo->InProgressShards.size() >= Self->MaxCdcInitialScanShardsInFlight) {
break;
}

Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4346,6 +4346,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
ConfigureCompactionQueues(appData->CompactionConfig, ctx);
ConfigureStatsBatching(appData->SchemeShardConfig, ctx);
ConfigureStatsOperations(appData->SchemeShardConfig, ctx);
MaxCdcInitialScanShardsInFlight = appData->SchemeShardConfig.GetMaxCdcInitialScanShardsInFlight();

ConfigureBackgroundCleaningQueue(appData->BackgroundCleaningConfig, ctx);

Expand Down Expand Up @@ -6835,6 +6836,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
const auto& schemeShardConfig = appConfig.GetSchemeShardConfig();
ConfigureStatsBatching(schemeShardConfig, ctx);
ConfigureStatsOperations(schemeShardConfig, ctx);
MaxCdcInitialScanShardsInFlight = schemeShardConfig.GetMaxCdcInitialScanShardsInFlight();
}

if (appConfig.HasTableProfilesConfig()) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,9 @@ class TSchemeShard
TActorId SysPartitionStatsCollector;

TActorId TabletMigrator;

TActorId CdcStreamScanFinalizer;
ui32 MaxCdcInitialScanShardsInFlight = 10;

TDuration StatsMaxExecuteTime;
TDuration StatsBatchTimeout;
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2547,8 +2547,6 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
{}
};

static constexpr ui32 MaxInProgressShards = 10;

TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion, EState state)
: AlterVersion(version)
, Mode(mode)
Expand Down
Loading