Skip to content

Commit

Permalink
Reuse the same session if the server doesn't support direct write
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq committed Jul 16, 2024
1 parent 3afab8f commit 1d2e75b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 12 deletions.
43 changes: 31 additions & 12 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,11 @@ void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::To

if (!status.IsSuccess()) {
if (status.GetStatus() == EStatus::CLIENT_CALL_UNIMPLEMENTED) {
LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "The server does not support direct write, connect to an arbitrary node");
LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "The server does not support direct write, keep using the same session");
DisableDirectWrite();
Connect(TDuration::Zero());
if (auto readyToAccept = CompleteInit()) {
EventsQueue->PushEvent(std::move(*readyToAccept));
}
return;
}
with_lock (Lock) {
Expand Down Expand Up @@ -901,6 +903,29 @@ void TPrintable<TWriteSessionEvent::TReadyToAcceptEvent>::DebugString(TStringBui
res << "ReadyToAcceptEvent";
}

TMaybe<TWriteSessionEvent::TReadyToAcceptEvent> TWriteSessionImpl::CompleteInit() {
with_lock (Lock) {
return CompleteInitImpl();
}
}

TMaybe<TWriteSessionEvent::TReadyToAcceptEvent> TWriteSessionImpl::CompleteInitImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());

SessionEstablished = true;
LastCountersUpdateTs = TInstant::Now();
SessionStartedTs = TInstant::Now();

// Kickstart send after session reestablishment
SendImpl();

if (FirstTokenSent) {
return Nothing();
}
FirstTokenSent = true;
return TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()};
}

TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMessageImpl() {
Y_ABORT_UNLESS(Lock.IsLocked());

Expand All @@ -924,7 +949,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
SessionId = initResponse.session_id();

auto prevDirectWriteToPartitionId = DirectWriteToPartitionId;
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: set DirectWriteToPartitionId " << initResponse.partition_id());
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: set DirectWriteToPartitionId " << initResponse.partition_id());
if (DirectWriteEnabledImpl() && !Settings.PartitionId_.Defined()) {
DirectWriteToPartitionId = initResponse.partition_id();
}
Expand All @@ -947,16 +972,10 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
break;
}

SessionEstablished = true;
LastCountersUpdateTs = TInstant::Now();
SessionStartedTs = TInstant::Now();

if (!FirstTokenSent) {
result.Events.emplace_back(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
FirstTokenSent = true;
if (auto readyToAccept = CompleteInitImpl()) {
result.Events.push_back(*readyToAccept);
}
// Kickstart send after session reestablishment
SendImpl();

break;
}
case TServerMessage::kWriteResponse: {
Expand Down
2 changes: 2 additions & 0 deletions ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,8 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
THandleResult RestartImpl(const TPlainStatus& status);
void Connect(const TDuration& delay);
void InitImpl();
TMaybe<TWriteSessionEvent::TReadyToAcceptEvent> CompleteInitImpl();
TMaybe<TWriteSessionEvent::TReadyToAcceptEvent> CompleteInit();
void ReadFromProcessor(); // Assumes that we're under lock.
void WriteToProcessorImpl(TClientMessage&& req); // Assumes that we're under lock.
void OnReadDone(NYdbGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration);
Expand Down

0 comments on commit 1d2e75b

Please sign in to comment.