Skip to content

Commit

Permalink
Merge e144c69 into c9a38da
Browse files Browse the repository at this point in the history
  • Loading branch information
FloatingCrowbar authored Mar 5, 2024
2 parents c9a38da + e144c69 commit 46da84a
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 28 deletions.
25 changes: 10 additions & 15 deletions ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace NKikimr::NPQ {
#define INFO(message) LOG_INFO_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);
#define ERROR(message) LOG_ERROR_S(*NActors::TlsActivationContext, NKikimrServices::PQ_WRITE_PROXY, LOG_PREFIX << message);

static const ui64 WRITE_BLOCK_SIZE = 4_KB;
static const ui64 WRITE_BLOCK_SIZE = 4_KB;

TString TEvPartitionWriter::TEvInitResult::TSuccess::ToString() const {
auto out = TStringBuilder() << "Success {"
Expand Down Expand Up @@ -106,7 +106,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
using EErrorCode = TEvPartitionWriter::TEvWriteResponse::EErrorCode;

static constexpr size_t MAX_QUOTA_INFLIGHT = 3;

static void FillHeader(NKikimrClient::TPersQueuePartitionRequest& request,
ui32 partitionId, const TActorId& pipeClient)
{
Expand Down Expand Up @@ -272,12 +272,9 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
void GetOwnership() {
auto ev = MakeRequest(PartitionId, PipeClient);

auto& cmd = *ev->Record.MutablePartitionRequest()->MutableCmdGetOwnership();
if (Opts.UseDeduplication) {
cmd.SetOwner(SourceId);
} else {
cmd.SetOwner(CreateGuidAsString());
}
auto& request = *ev->Record.MutablePartitionRequest();
auto& cmd = *request.MutableCmdGetOwnership();
cmd.SetOwner(SourceId);
cmd.SetForce(true);

NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
Expand Down Expand Up @@ -720,17 +717,15 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
case EWakeupTag::RlAllowed:
ReceivedQuota.insert(ReceivedQuota.end(), PendingQuota.begin(), PendingQuota.end());
PendingQuota.clear();

ProcessQuotaAndWrite();

ProcessQuotaAndWrite();
break;

case EWakeupTag::RlNoResource:
// Re-requesting the quota. We do this until we get a quota.
// Re-requesting the quota. We do this until we get a quota.
// We do not request a quota with a long waiting time because the writer may already be a destroyer, and the quota will still be waiting to be received.
RequestDataQuota(PendingQuotaAmount, ctx);
break;

default:
Y_VERIFY_DEBUG_S(false, "Unsupported tag: " << static_cast<ui64>(tag));
}
Expand All @@ -751,7 +746,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
, TabletId(tabletId)
, PartitionId(partitionId)
, ExpectedGeneration(opts.ExpectedGeneration)
, SourceId(opts.SourceId)
, SourceId(opts.UseDeduplication ? opts.SourceId : CreateGuidAsString())
, Opts(opts)
{
if (Opts.MeteringMode) {
Expand Down Expand Up @@ -837,7 +832,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
IActor* CreatePartitionWriter(const TActorId& client,
// const NKikimrSchemeOp::TPersQueueGroupDescription& config,
ui64 tabletId,
ui32 partitionId,
ui32 partitionId,
const TPartitionWriterOpts& opts) {
return new TPartitionWriter(client, tabletId, partitionId, opts);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,6 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
};

THandleResult OnErrorImpl(NYdb::TPlainStatus&& status); // true - should Start(), false - should Close(), empty - no action

public:
TWriteSessionImpl(const TWriteSessionSettings& settings,
std::shared_ptr<TTopicClient::TImpl> client,
Expand Down
26 changes: 23 additions & 3 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/basic_usage_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -697,9 +697,29 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
UNIT_ASSERT_VALUES_EQUAL(stats->GetEndOffset(), count);

}
} // Y_UNIT_TEST_SUITE(BasicUsage)

Y_UNIT_TEST_SUITE(TSettingsValidation) {
Y_UNIT_TEST(TWriteSessionProducerSettings) {
TTopicSdkTestSetup setup(TEST_CASE_NAME);
TTopicClient client = setup.MakeClient();

{
auto writeSettings = TWriteSessionSettings()
.Path(TEST_TOPIC)
.ProducerId("something")
.DeduplicationEnabled(false);
try {
auto writeSession = client.CreateWriteSession(writeSettings);
auto event = writeSession->GetEvent(true);
UNIT_ASSERT(event.Defined());
auto* closedEvent = std::get_if<TSessionClosedEvent>(&event.GetRef());
UNIT_ASSERT(closedEvent);
} catch (NYdb::TContractViolation&) {
//pass
}
}
}
} // Y_UNIT_TEST_SUITE(TSettingsValidation)

}

}
} // namespace
24 changes: 15 additions & 9 deletions ydb/services/persqueue_v1/actors/write_session_actor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,12 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
// 1.2. non-empty partition_id (explicit partitioning)
// 1.3. non-empty partition_with_generation (explicit partitioning && direct write to partition host)
// 2. Empty producer id (no deduplication, partition is selected using round-robin).
bool isScenarioSupported =
bool isScenarioSupported =
!InitRequest.producer_id().empty() && (
InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
InitRequest.has_message_group_id() && InitRequest.message_group_id() == InitRequest.producer_id() ||
InitRequest.has_partition_id() ||
InitRequest.has_partition_with_generation()) ||
InitRequest.has_partition_with_generation())
||
InitRequest.producer_id().empty();

if (!isScenarioSupported) {
Expand Down Expand Up @@ -424,7 +425,6 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWriteInit::TPt
return InitRequest.has_message_group_id() ? InitRequest.message_group_id() : InitRequest.producer_id();
}
}();

LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << " " << InitRequest.ShortDebugString() << " from " << PeerName);
if (!UseDeduplication) {
LOG_DEBUG_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session request cookie: " << Cookie << ". Disable deduplication for empty producer id");
Expand Down Expand Up @@ -467,8 +467,9 @@ template<bool UseMigrationProtocol>
void TWriteSessionActor<UseMigrationProtocol>::InitAfterDiscovery(const TActorContext& ctx) {
Y_UNUSED(ctx);

if (SourceId.empty()) {
Y_ABORT_UNLESS(!UseDeduplication);
if (SourceId.empty() && UseDeduplication) {
CloseSession("Internal server error: got empty SourceId with enabled deduplication", PersQueue::ErrorCode::ERROR, ctx);
return;
}

InitMeta = GetInitialDataChunk(InitRequest, FullConverter->GetClientsideName(), PeerName); // ToDo[migration] - check?
Expand Down Expand Up @@ -835,9 +836,14 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
OwnerCookie = result.GetResult().OwnerCookie;

const auto& maxSeqNo = result.GetResult().SourceIdInfo.GetSeqNo();
if (!UseDeduplication) {
Y_ABORT_UNLESS(maxSeqNo == 0);
}

// ToDo: uncomment after fixing KIKIMR-21124
// if (!UseDeduplication) {
// if (maxSeqNo != 0) {
// return CloseSession("Internal server error: have maxSeqNo != with deduplication disabled",
// PersQueue::ErrorCode::ERROR, ctx);
// }
// }

OwnerCookie = result.GetResult().OwnerCookie;
MakeAndSentInitResponse(maxSeqNo, ctx);
Expand Down
49 changes: 49 additions & 0 deletions ydb/services/persqueue_v1/persqueue_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6694,6 +6694,55 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
}
}

Y_UNIT_TEST(DisableWrongSettings) {
NPersQueue::TTestServer server;
server.EnableLogs({NKikimrServices::PQ_READ_PROXY, NKikimrServices::BLACKBOX_VALIDATOR });
server.EnableLogs({NKikimrServices::PERSQUEUE}, NActors::NLog::EPriority::PRI_INFO);
TString topicFullName = "rt3.dc1--acc--topic1";
auto driver = SetupTestAndGetDriver(server, topicFullName, 3);

std::shared_ptr<grpc::Channel> Channel_;
std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> TopicStubP_;
{
Channel_ = grpc::CreateChannel("localhost:" + ToString(server.GrpcPort), grpc::InsecureChannelCredentials());
TopicStubP_ = Ydb::Topic::V1::TopicService::NewStub(Channel_);
}

{
grpc::ClientContext rcontext1;
auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
UNIT_ASSERT(writeStream1);
Ydb::Topic::StreamWriteMessage::FromClient req;
Ydb::Topic::StreamWriteMessage::FromServer resp;

req.mutable_init_request()->set_path("acc/topic1");
req.mutable_init_request()->set_message_group_id("some-group");
if (!writeStream1->Write(req)) {
ythrow yexception() << "write fail";
}
UNIT_ASSERT(writeStream1->Read(&resp));
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT(resp.status() == Ydb::StatusIds::SUCCESS);
}
{
grpc::ClientContext rcontext1;
auto writeStream1 = TopicStubP_->StreamWrite(&rcontext1);
UNIT_ASSERT(writeStream1);
Ydb::Topic::StreamWriteMessage::FromClient req;
Ydb::Topic::StreamWriteMessage::FromServer resp;

req.mutable_init_request()->set_path("acc/topic1");
req.mutable_init_request()->set_message_group_id("some-group");
req.mutable_init_request()->set_producer_id("producer");
if (!writeStream1->Write(req)) {
ythrow yexception() << "write fail";
}
UNIT_ASSERT(writeStream1->Read(&resp));
Cerr << "===Got response: " << resp.ShortDebugString() << Endl;
UNIT_ASSERT(resp.status() == Ydb::StatusIds::BAD_REQUEST);
}
}

Y_UNIT_TEST(DisableDeduplication) {
NPersQueue::TTestServer server;
TString topicFullName = "rt3.dc1--topic1";
Expand Down

0 comments on commit 46da84a

Please sign in to comment.