Skip to content

Commit

Permalink
Fallback to "indirect" write if the server doesn't support direct write
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq committed Jul 16, 2024
1 parent 3de9c00 commit 3afab8f
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
28 changes: 22 additions & 6 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ void TWriteSessionImpl::Start(const TDuration& delay) {
with_lock (Lock) {
++ConnectionAttemptsDone;
Started = true;
if (Settings.DirectWriteToPartition_ && (Settings.PartitionId_.Defined() || DirectWriteToPartitionId.Defined())) {
if (DirectWriteEnabledImpl() && (Settings.PartitionId_.Defined() || DirectWriteToPartitionId.Defined())) {
PreferredPartitionLocation = {};
ConnectToPreferredPartitionLocation(delay);
return;
Expand Down Expand Up @@ -214,7 +214,7 @@ TString FullTopicPath(TStringBuf dbPath, TStringBuf topic) {
void TWriteSessionImpl::ConnectToPreferredPartitionLocation(const TDuration& delay)
{
Y_ABORT_UNLESS(Lock.IsLocked());
Y_ABORT_UNLESS(Settings.DirectWriteToPartition_ && (Settings.PartitionId_.Defined() || DirectWriteToPartitionId.Defined()));
Y_ABORT_UNLESS(DirectWriteEnabledImpl() && (Settings.PartitionId_.Defined() || DirectWriteToPartitionId.Defined()));

if (AtomicGet(Aborting)) {
return;
Expand Down Expand Up @@ -292,6 +292,12 @@ void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::To
}

if (!status.IsSuccess()) {
if (status.GetStatus() == EStatus::CLIENT_CALL_UNIMPLEMENTED) {
LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "The server does not support direct write, connect to an arbitrary node");
DisableDirectWrite();
Connect(TDuration::Zero());
return;
}
with_lock (Lock) {
handleResult = OnErrorImpl({status.GetStatus(), MakeIssueWithSubIssues("Failed to get partition location", status.GetIssues())});
}
Expand Down Expand Up @@ -711,7 +717,7 @@ void TWriteSessionImpl::InitImpl() {
init->set_path(Settings.Path_);
init->set_producer_id(Settings.ProducerId_);

if (Settings.DirectWriteToPartition_ && (Settings.PartitionId_.Defined() || DirectWriteToPartitionId.Defined())) {
if (DirectWriteEnabledImpl() && (Settings.PartitionId_.Defined() || DirectWriteToPartitionId.Defined())) {
auto partition_id = Settings.PartitionId_.Defined() ? *Settings.PartitionId_ : *DirectWriteToPartitionId;
auto* p = init->mutable_partition_with_generation();
p->set_partition_id(partition_id);
Expand Down Expand Up @@ -859,7 +865,7 @@ TStringBuilder TWriteSessionImpl::LogPrefix() const {
if (Settings.PartitionId_.Defined() || DirectWriteToPartitionId.Defined()) {
auto partition_id = Settings.PartitionId_.Defined() ? *Settings.PartitionId_ : *DirectWriteToPartitionId;
ret << " PartitionId [" << partition_id << "] ";
if (Settings.DirectWriteToPartition_) {
if (DirectWriteEnabledImpl()) {
ret << " Generation [" << PreferredPartitionLocation.Generation << "] ";
}
} else {
Expand Down Expand Up @@ -918,8 +924,8 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
SessionId = initResponse.session_id();

auto prevDirectWriteToPartitionId = DirectWriteToPartitionId;
if (Settings.DirectWriteToPartition_ && !Settings.PartitionId_.Defined()) {
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: set DirectWriteToPartitionId " << initResponse.partition_id());
if (DirectWriteEnabledImpl() && !Settings.PartitionId_.Defined()) {
DirectWriteToPartitionId = initResponse.partition_id();
}
PartitionId = initResponse.partition_id();
Expand All @@ -935,7 +941,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess

OnErrorResolved();

if (Settings.DirectWriteToPartition_ && DirectWriteToPartitionId.Defined() && !prevDirectWriteToPartitionId.Defined()) {
if (DirectWriteEnabledImpl() && DirectWriteToPartitionId.Defined() && !prevDirectWriteToPartitionId.Defined()) {
result.HandleResult.DoRestart = true;
result.HandleResult.StartDelay = TDuration::Zero();
break;
Expand Down Expand Up @@ -1335,6 +1341,16 @@ bool TWriteSessionImpl::TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRe
return GetTransactionId(*writeRequest) != GetTransactionId(OriginalMessagesToSend.front().Tx);
}

void TWriteSessionImpl::DisableDirectWrite() {
with_lock (Lock) {
DirectWriteServerSupport = false;
}
}

bool TWriteSessionImpl::DirectWriteEnabledImpl() const {
return Settings.DirectWriteToPartition_ && DirectWriteServerSupport;
}

void TWriteSessionImpl::SendImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());

Expand Down
4 changes: 4 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,

bool TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRequest* writeRequest) const;

void DisableDirectWrite();
bool DirectWriteEnabledImpl() const;

private:
TWriteSessionSettings Settings;
std::shared_ptr<TTopicClient::TImpl> Client;
Expand Down Expand Up @@ -470,6 +473,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,

// Set by the write session, if Settings.DirectWriteToPartition is true and Settings.PartitionId is unset. Otherwise ignored.
TMaybe<ui64> DirectWriteToPartitionId;
bool DirectWriteServerSupport = true;
protected:
ui64 MessagesAcquired = 0;
};
Expand Down

0 comments on commit 3afab8f

Please sign in to comment.