Skip to content

Commit

Permalink
Merge e1d86cb into 0ef6729
Browse files Browse the repository at this point in the history
  • Loading branch information
ildar-khisambeev authored Apr 12, 2024
2 parents 0ef6729 + e1d86cb commit fd2423d
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 43 deletions.
30 changes: 8 additions & 22 deletions ydb/library/persqueue/topic_parser/topic_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ bool TDiscoveryConverter::BuildFromFederationPath(const TString& rootPrefix) {
auto res = topic.TrySplit("/", fst, snd);
CHECK_SET_VALID(res, TStringBuilder() << "Could not split federation path: " << OriginalTopic, return false);
Account_ = fst;

if (!ParseModernPath(snd))
return false;
if (!BuildFromShortModernName()) {
Expand All @@ -341,7 +341,7 @@ bool TDiscoveryConverter::BuildFromFederationPath(const TString& rootPrefix) {

bool TDiscoveryConverter::TryParseModernMirroredPath(TStringBuf path) {
if (!path.Contains("-mirrored-from-")) {
CHECK_SET_VALID(!path.Contains("mirrored-from"), "Federation topics cannot contain 'mirrored-from' in name unless this is a mirrored topic", return false);
CHECK_SET_VALID(!path.Contains("mirrored-from"), "Federation topics cannot contain 'mirrored-from' in name unless this is a mirrored topic", return false);
return false;
}
TStringBuf fst, snd;
Expand Down Expand Up @@ -447,7 +447,7 @@ bool TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool fo
return false);
}
if (Dc.empty() && !hasDcInName) {
CHECK_SET_VALID(!FstClass, TStringBuilder() << "Internal error: FirstClass mode enabled, but trying to parse Legacy-style name: "
CHECK_SET_VALID(!FstClass, TStringBuilder() << "Internal error: FirstClass mode enabled, but trying to parse Legacy-style name: "
<< OriginalTopic, return false;);
CHECK_SET_VALID(!LocalDc.empty(),
"Cannot determine DC: should specify either in topic name, Dc option or LocalDc option",
Expand All @@ -466,7 +466,7 @@ bool TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool fo
Dc = fst;
topic = snd;
} else {
CHECK_SET_VALID(!Dc.empty(), TStringBuilder() << "Internal error: Could not determine DC (despite beleiving the name contins one) for topic "
CHECK_SET_VALID(!Dc.empty(), TStringBuilder() << "Internal error: Could not determine DC (despite beleiving the name contins one) for topic "
<< OriginalTopic, return false;);
TStringBuilder builder;
builder << "rt3." << Dc << "--" << topic;
Expand Down Expand Up @@ -513,7 +513,7 @@ bool TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool fo
topicName = topic;
}
modernName << topicName;
CHECK_SET_VALID(!Dc.empty(), TStringBuilder() << "Internal error: Could not determine DC for topic: "
CHECK_SET_VALID(!Dc.empty(), TStringBuilder() << "Internal error: Could not determine DC for topic: "
<< OriginalTopic, return false);

bool isMirrored = (!LocalDc.empty() && Dc != LocalDc);
Expand All @@ -522,7 +522,7 @@ bool TDiscoveryConverter::BuildFromLegacyName(const TString& rootPrefix, bool fo
} else {
fullModernName << topicName;
}
CHECK_SET_VALID(!fullLegacyName.empty(), TStringBuilder() << "Could not form a full legacy name for topic: "
CHECK_SET_VALID(!fullLegacyName.empty(), TStringBuilder() << "Could not form a full legacy name for topic: "
<< OriginalTopic, return false);

ShortLegacyName = shortLegacyName;
Expand Down Expand Up @@ -669,21 +669,7 @@ TTopicConverterPtr TTopicNameConverter::ForFederation(
if (!res->IsValid()) {
return res;
}
if (parsed) {
Y_ABORT_UNLESS(!res->Dc.empty());
if (!localDc.empty() && localDc == res->Dc) {
res->Valid = false;
res->Reason = TStringBuilder() << "Topic in modern mirrored-like style: " << schemeName
<< " cannot be created in the same cluster " << res->Dc;
return res;
}
}
if (isLocal) {
if(parsed) {
res->Valid = false;
res->Reason = TStringBuilder() << "Topic in modern mirrored-like style: " << schemeName << ", created as local";
return res;
}
if (localDc.empty()) {
res->Valid = false;
res->Reason = "Local DC option is mandatory when creating local modern-style topic";
Expand All @@ -694,7 +680,8 @@ TTopicConverterPtr TTopicNameConverter::ForFederation(
if (!ok) {
return res;
}
} else {
}
else {
if (!parsed) {
res->Valid = false;
res->Reason = TStringBuilder() << "Topic in modern style with non-mirrored-name: " << schemeName
Expand Down Expand Up @@ -943,4 +930,3 @@ TConverterFactoryPtr TTopicsListController::GetConverterFactory() const {
};

} // namespace NPersQueue

Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ Y_UNIT_TEST_SUITE(DiscoveryConverterTest) {
);
UNIT_ASSERT_VALUES_EQUAL(converter->GetFullModernName(), "account/account");
UNIT_ASSERT_VALUES_EQUAL(converter->GetSecondaryPath("account"), "/account/account/account");

converter = converterFactory.MakeDiscoveryConverter(
"account/", {}, "dc1", "account"
);
Expand Down Expand Up @@ -431,10 +431,11 @@ Y_UNIT_TEST_SUITE(TopicNameConverterForCPTest) {
);
UNIT_ASSERT(!converter->IsValid());

// this is valid, corresponding check relaxed
converter = TTopicNameConverter::ForFederation(
"", "", "topic-mirrored-from-sas", "/LbCommunal/account", "/LbCommunal/account", true, "sas", "account"
);
UNIT_ASSERT(!converter->IsValid());
UNIT_ASSERT(converter->IsValid());

converter = TTopicNameConverter::ForFederation(
"", "", "topic-mirrored-from-", "/LbCommunal/account", "/LbCommunal/account", true, "sas", "account"
Expand Down
30 changes: 13 additions & 17 deletions ydb/services/persqueue_v1/actors/schema_actors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,6 @@ void TCreateTopicActor::FillProposeRequest(TEvTxUserProxy::TEvProposeTransaction
<< "' instead of " << LocalCluster, Ydb::PersQueue::ErrorCode::BAD_REQUEST));
return RespondWithCode(Ydb::StatusIds::BAD_REQUEST);
}
if (Count(Clusters, config.GetDC()) == 0 && !Clusters.empty()) {
Request_->RaiseIssue(FillIssue(TStringBuilder() << "Unknown cluster '" << config.GetDC() << "'", Ydb::PersQueue::ErrorCode::BAD_REQUEST));
return RespondWithCode(Ydb::StatusIds::BAD_REQUEST);
}
}


Expand Down Expand Up @@ -596,7 +592,7 @@ void TDescribeTopicActorImpl::Handle(TEvPQProxy::TEvRequestTablet::TPtr& ev, con
Y_ABORT_UNLESS(RequestsInfly > 0);
--RequestsInfly;
}

RequestTablet(tabletInfo, ctx);
}

Expand Down Expand Up @@ -635,7 +631,7 @@ void TDescribeTopicActorImpl::RequestBalancer(const TActorContext& ctx) {
GotLocation = true;
}

if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribeConsumer && Settings.RequireStats) {
if (Settings.Mode == TDescribeTopicActorSettings::EMode::DescribeConsumer && Settings.RequireStats) {
if (!GotReadSessions) {
RequestReadSessionsInfo(ctx);
}
Expand Down Expand Up @@ -671,7 +667,7 @@ void TDescribeTopicActorImpl::RequestPartitionsLocation(const TActorContext& ctx
return RaiseError(
TStringBuilder() << "No partition " << Settings.Partitions[0] << " in topic",
Ydb::PersQueue::ErrorCode::BAD_REQUEST, Ydb::StatusIds::BAD_REQUEST, ctx
);
);
}
auto res = partIds.insert(p);
if (res.second) {
Expand Down Expand Up @@ -705,7 +701,7 @@ void TDescribeTopicActorImpl::Handle(NKikimr::TEvPersQueue::TEvStatusResponse::T

auto& record = ev->Get()->Record;
bool doRestart = (record.PartResultSize() == 0);

for (auto& partResult : record.GetPartResult()) {
if (partResult.GetStatus() == NKikimrPQ::TStatusResponse::STATUS_INITIALIZING ||
partResult.GetStatus() == NKikimrPQ::TStatusResponse::STATUS_UNKNOWN) {
Expand Down Expand Up @@ -917,7 +913,7 @@ bool TDescribeTopicActor::ApplyResponse(
}
return true;
}



void TDescribeTopicActor::Reply(const TActorContext& ctx) {
Expand Down Expand Up @@ -1030,7 +1026,7 @@ bool TDescribeConsumerActor::ApplyResponse(
}
return true;
}


bool FillConsumerProto(Ydb::Topic::Consumer *rr, const NKikimrPQ::TPQTabletConfig& config, ui32 i,
const NActors::TActorContext& ctx, Ydb::StatusIds::StatusCode& status, TString& error)
Expand Down Expand Up @@ -1272,11 +1268,11 @@ bool TDescribeTopicActorImpl::ProcessTablets(
Tablets[pi.GetTabletId()].Partitions.push_back(pi.GetPartitionId());
Tablets[pi.GetTabletId()].TabletId = pi.GetTabletId();
}

for (auto& pair : Tablets) {
RequestTablet(pair.second, ctx);
}

if (RequestsInfly == 0) {
Reply(ctx);
return false;
Expand Down Expand Up @@ -1332,7 +1328,7 @@ void TDescribePartitionActor::Bootstrap(const NActors::TActorContext& ctx)

void TDescribePartitionActor::StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
default:
default:
if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) {
TBase::StateWork(ev);
};
Expand All @@ -1359,12 +1355,12 @@ void TDescribePartitionActor::ApplyResponse(TTabletInfo&, NKikimr::TEvPersQueue:

void TDescribePartitionActor::ApplyResponse(TTabletInfo& tabletInfo, NKikimr::TEvPersQueue::TEvStatusResponse::TPtr& ev, const TActorContext&) {
auto* partResult = Result.mutable_partition();

const auto& record = ev->Get()->Record;
for (auto partData : record.GetPartResult()) {
if ((ui32)partData.GetPartition() != Settings.Partitions[0])
continue;

Y_ABORT_UNLESS((ui32)(partData.GetPartition()) == Settings.Partitions[0]);
partResult->set_partition_id(partData.GetPartition());
partResult->set_active(true);
Expand Down Expand Up @@ -1411,7 +1407,7 @@ void TDescribePartitionActor::Reply(const TActorContext& ctx) {

using namespace NIcNodeCache;

TPartitionsLocationActor::TPartitionsLocationActor(const TGetPartitionsLocationRequest& request, const TActorId& requester)
TPartitionsLocationActor::TPartitionsLocationActor(const TGetPartitionsLocationRequest& request, const TActorId& requester)
: TBase(request, requester)
, TDescribeTopicActorImpl(TDescribeTopicActorSettings::GetPartitionsLocation(request.PartitionIds))
{
Expand All @@ -1429,7 +1425,7 @@ void TPartitionsLocationActor::Bootstrap(const NActors::TActorContext&)
void TPartitionsLocationActor::StateWork(TAutoPtr<IEventHandle>& ev) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvICNodesInfoCache::TEvGetAllNodesInfoResponse, Handle);
default:
default:
if (!TDescribeTopicActorImpl::StateWork(ev, ActorContext())) {
TBase::StateWork(ev);
};
Expand Down
6 changes: 4 additions & 2 deletions ydb/services/persqueue_v1/persqueue_compat_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,15 @@ Y_UNIT_TEST_SUITE(TPQCompatTest) {
// Local topic with client write disabled
testServer.CreateTopic("/Root/LbCommunal/some-topic", "account", false, true);
// Non-local topic with client write enabled
testServer.CreateTopic("/Root/LbCommunal/some-topic-mirrored-from-dc2", "account", true, true);
testServer.CreateTopic("/Root/LbCommunal/.some-topic/mirrored-from-dc2", "account", true, true);
// this validation was relaxed
testServer.CreateTopic("/Root/LbCommunal/some-topic-mirrored-from-dc2", "account", true, false);
// No account
testServer.CreateTopic("/Root/LbCommunal/some-topic", "", true, true);
// Mirrored-from local
testServer.CreateTopic("/Root/LbCommunal/.some-topic/mirrored-from-dc1", "account", false, true);
testServer.CreateTopic("/Root/LbCommunal/some-topic-mirrored-from-dc1", "account", false, true);
// this validation was relaxed
testServer.CreateTopic("/Root/LbCommunal/some-topic-mirrored-from-dc1", "account", false, false);
// Bad mirrored names
testServer.CreateTopic("/Root/LbCommunal/.some-topic/some-topic", "account", false, true);
// Mirrored-from non-existing
Expand Down

0 comments on commit fd2423d

Please sign in to comment.