Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix initialization of explicit messages groups #8739

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -576,13 +576,15 @@ struct TEvPQ {
};

struct TEvChangePartitionConfig : public TEventLocal<TEvChangePartitionConfig, EvChangePartitionConfig> {
TEvChangePartitionConfig(const NPersQueue::TTopicConverterPtr& topicConverter, const NKikimrPQ::TPQTabletConfig& config)
TEvChangePartitionConfig(const NPersQueue::TTopicConverterPtr& topicConverter, const NKikimrPQ::TPQTabletConfig& config, const NKikimrPQ::TBootstrapConfig& bootstrapConfig)
: TopicConverter(topicConverter)
, Config(config)
, BootstrapConfig(bootstrapConfig)
{}

NPersQueue::TTopicConverterPtr TopicConverter;
NKikimrPQ::TPQTabletConfig Config;
NKikimrPQ::TBootstrapConfig BootstrapConfig;
};

struct TEvPartitionConfigChanged : public TEventLocal<TEvPartitionConfigChanged, EvPartitionConfigChanged> {
Expand Down Expand Up @@ -845,6 +847,7 @@ struct TEvPQ {
ui64 TxId;
NPersQueue::TTopicConverterPtr TopicConverter;
NKikimrPQ::TPQTabletConfig Config;
NKikimrPQ::TBootstrapConfig BootstrapConfig;
};

struct TEvProposePartitionConfigResult : public TEventLocal<TEvProposePartitionConfigResult, EvProposePartitionConfigResult> {
Expand Down
26 changes: 21 additions & 5 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2037,7 +2037,8 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
} else if (t->ProposeConfig) {
Y_ABORT_UNLESS(ChangingConfig);
ChangeConfig = MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
t->ProposeConfig->Config);
t->ProposeConfig->Config,
t->ProposeConfig->BootstrapConfig);
PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config);
SendChangeConfigReply = false;
}
Expand Down Expand Up @@ -2123,7 +2124,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
{
ChangeConfig =
MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
event.Config);
event.Config,
event.BootstrapConfig);
PendingPartitionConfig = GetPartitionConfig(ChangeConfig->Config);

SendChangeConfigReply = false;
Expand Down Expand Up @@ -2360,6 +2362,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)

if (ChangeConfig) {
EndChangePartitionConfig(std::move(ChangeConfig->Config),
std::move(ChangeConfig->BootstrapConfig),
ChangeConfig->TopicConverter,
ctx);
}
Expand Down Expand Up @@ -2426,12 +2429,24 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
}

void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
NKikimrPQ::TBootstrapConfig&& bootstrapConfig,
NPersQueue::TTopicConverterPtr topicConverter,
const TActorContext& ctx)
{
Config = std::move(config);
PartitionConfig = GetPartitionConfig(Config);
PartitionGraph = MakePartitionGraph(Config);

for (const auto& mg : bootstrapConfig.GetExplicitMessageGroups()) {
TMaybe<TPartitionKeyRange> keyRange;
if (mg.HasKeyRange()) {
keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange());
}

TSourceIdInfo sourceId(0, 0, ctx.Now(), std::move(keyRange), false);
SourceIdStorage.RegisterSourceIdInfo(mg.GetId(), std::move(sourceId), true);
}

TopicConverter = topicConverter;
NewPartition = false;

Expand All @@ -2441,14 +2456,15 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
InitSplitMergeSlidingWindow();
}

Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
Send(ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, bootstrapConfig));
Send(WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, bootstrapConfig));
TotalPartitionWriteSpeed = config.GetPartitionConfig().GetWriteSpeedInBytesPerSecond();

if (Config.GetPartitionConfig().HasMirrorFrom()) {
if (Mirrorer) {
ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter,
Config));
Config,
bootstrapConfig));
} else {
CreateMirrorerActor();
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx);

void EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
NKikimrPQ::TBootstrapConfig&& bootstrapConfig,
NPersQueue::TTopicConverterPtr topicConverter,
const TActorContext& ctx);
TString GetKeyConfig() const;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ void TInitConfigStep::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorCon

if (Partition()->Config.GetVersion() < Partition()->TabletConfig.GetVersion()) {
auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(Partition()->TopicConverter,
Partition()->TabletConfig);
Partition()->TabletConfig,
NKikimrPQ::TBootstrapConfig());
Partition()->PushFrontDistrTx(event.Release());
}
break;
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
ClearNewConfig();

for (auto& p : Partitions) { //change config for already created partitions
ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config));
ctx.Send(p.second.Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter, Config, BootstrapConfigTx ? *BootstrapConfigTx : NKikimrPQ::TBootstrapConfig()));
}
ChangePartitionConfigInflight += Partitions.size();

Expand Down Expand Up @@ -1807,7 +1807,7 @@ void TPersQueue::AddCmdWriteConfig(TEvKeyValue::TEvRequest* request,
keyRange = TPartitionKeyRange::Parse(mg.GetKeyRange());
}

sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange));
sourceIdWriter.RegisterSourceId(mg.GetId(), 0, 0, ctx.Now(), std::move(keyRange), false);
}

for (const auto& partition : cfg.GetPartitions()) {
Expand Down Expand Up @@ -4545,6 +4545,7 @@ void TPersQueue::SendEvProposePartitionConfig(const TActorContext& ctx,

event->TopicConverter = tx.TopicConverter;
event->Config = tx.TabletConfig;
event->BootstrapConfig = tx.BootstrapConfig;

ctx.Send(partition.Actor, std::move(event));
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/sourceid.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class TSourceIdStorage: private THeartbeatProcessor {
void RegisterSourceId(const TString& sourceId, Args&&... args) {
RegisterSourceIdInfo(sourceId, TSourceIdInfo(std::forward<Args>(args)...), false);
}
void RegisterSourceIdInfo(const TString& sourceId, TSourceIdInfo&& sourceIdInfo, bool load);

void DeregisterSourceId(const TString& sourceId);

Expand All @@ -65,7 +66,6 @@ class TSourceIdStorage: private THeartbeatProcessor {
private:
void LoadRawSourceIdInfo(const TString& key, const TString& data, TInstant now);
void LoadProtoSourceIdInfo(const TString& key, const TString& data);
void RegisterSourceIdInfo(const TString& sourceId, TSourceIdInfo&& sourceIdInfo, bool load);

private:
TSourceIdMap InMemorySourceIds;
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,8 @@ void TPartitionFixture::SendChangePartitionConfig(const TConfigParams& config)
auto event = MakeHolder<TEvPQ::TEvChangePartitionConfig>(TopicConverter, MakeConfig(config.Version,
config.Consumers,
1,
config.MeteringMode));
config.MeteringMode),
NKikimrPQ::TBootstrapConfig());
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
}

Expand Down
11 changes: 10 additions & 1 deletion ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2654,12 +2654,13 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
}

Y_UNIT_TEST(InitialScan) {
void InitialScanTest(bool withTopicSchemeTx) {
TPortManager portManager;
TServer::TPtr server = new TServer(TServerSettings(portManager.GetPort(2134), {}, DefaultPQConfig())
.SetUseRealThreads(false)
.SetDomainName("Root")
.SetEnableChangefeedInitialScan(true)
.SetEnablePQConfigTransactionsAtSchemeShard(withTopicSchemeTx)
);

auto& runtime = *server->GetRuntime();
Expand Down Expand Up @@ -2702,6 +2703,14 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}

Y_UNIT_TEST(InitialScan) {
InitialScanTest(false);
}

Y_UNIT_TEST(InitialScan_WithTopicSchemeTx) {
InitialScanTest(true);
}

Y_UNIT_TEST(InitialScanDebezium) {
TTestTopicEnv env(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatDebeziumJson, "UnusedStream"));
auto& client = env.GetClient();
Expand Down
Loading