Skip to content

Commit

Permalink
TFederatedTopicWriteSession: use SelfContext instead of shared_ptr (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq authored Apr 2, 2024
1 parent 9b8ca15 commit 77c6779
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ bool DatabasesAreSame(std::shared_ptr<TDbInfo> lhs, std::shared_ptr<TDbInfo> rhs

NTopic::TTopicClientSettings FromFederated(const TFederatedTopicClientSettings& settings);

TFederatedWriteSession::TFederatedWriteSession(const TFederatedWriteSessionSettings& settings,
std::shared_ptr<TGRpcConnectionsImpl> connections,
const TFederatedTopicClientSettings& clientSetttings,
std::shared_ptr<TFederatedDbObserver> observer,
std::shared_ptr<std::unordered_map<NTopic::ECodec, THolder<NTopic::ICodec>>> codecs)
TFederatedWriteSessionImpl::TFederatedWriteSessionImpl(
const TFederatedWriteSessionSettings& settings,
std::shared_ptr<TGRpcConnectionsImpl> connections,
const TFederatedTopicClientSettings& clientSetttings,
std::shared_ptr<TFederatedDbObserver> observer,
std::shared_ptr<std::unordered_map<NTopic::ECodec, THolder<NTopic::ICodec>>> codecs
)
: Settings(settings)
, Connections(std::move(connections))
, SubClientSetttings(FromFederated(clientSetttings))
Expand All @@ -42,28 +44,32 @@ TFederatedWriteSession::TFederatedWriteSession(const TFederatedWriteSessionSetti
{
}

TStringBuilder TFederatedWriteSession::GetLogPrefix() const {
TStringBuilder TFederatedWriteSessionImpl::GetLogPrefix() const {
return TStringBuilder() << GetDatabaseLogPrefix(SubClientSetttings.Database_.GetOrElse("")) << "[" << SessionId << "] ";
}

void TFederatedWriteSession::Start() {
void TFederatedWriteSessionImpl::Start() {
// TODO validate settings?
Settings.EventHandlers_.HandlersExecutor_->Start();
with_lock(Lock) {
ClientEventsQueue->PushEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
ClientHasToken = true;
}

AsyncInit.Subscribe([self = shared_from_this()](const auto& f){
AsyncInit.Subscribe([selfCtx = SelfContext](const auto& f){
Y_UNUSED(f);
with_lock(self->Lock) {
self->FederationState = self->Observer->GetState();
self->OnFederatedStateUpdateImpl();
if (auto self = selfCtx->LockShared()) {
with_lock(self->Lock) {
if (!self->Closing) {
self->FederationState = self->Observer->GetState();
self->OnFederatedStateUpdateImpl();
}
}
}
});
}

void TFederatedWriteSession::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db) {
void TFederatedWriteSessionImpl::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db) {
if (Subsession) {
PendingToken.Clear();
Subsession->Close(TDuration::Zero());
Expand All @@ -77,32 +83,40 @@ void TFederatedWriteSession::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db) {

auto handlers = NTopic::TWriteSessionSettings::TEventHandlers()
.HandlersExecutor(Settings.EventHandlers_.HandlersExecutor_)
.ReadyToAcceptHandler([self = shared_from_this()](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& ev){
TDeferredWrite deferred(self->Subsession);
with_lock(self->Lock) {
Y_ABORT_UNLESS(self->PendingToken.Empty());
self->PendingToken = std::move(ev.ContinuationToken);
self->PrepareDeferredWrite(deferred);
.ReadyToAcceptHandler([selfCtx = SelfContext](NTopic::TWriteSessionEvent::TReadyToAcceptEvent& ev) {
if (auto self = selfCtx->LockShared()) {
TDeferredWrite deferred(self->Subsession);
with_lock(self->Lock) {
Y_ABORT_UNLESS(self->PendingToken.Empty());
self->PendingToken = std::move(ev.ContinuationToken);
self->PrepareDeferredWrite(deferred);
}
deferred.DoWrite();
}
deferred.DoWrite();
})
.AcksHandler([self = shared_from_this()](NTopic::TWriteSessionEvent::TAcksEvent& ev){
with_lock(self->Lock) {
Y_ABORT_UNLESS(ev.Acks.size() <= self->OriginalMessagesToGetAck.size());
for (size_t i = 0; i < ev.Acks.size(); ++i) {
self->BufferFreeSpace += self->OriginalMessagesToGetAck.front().Data.size();
self->OriginalMessagesToGetAck.pop_front();
}
self->ClientEventsQueue->PushEvent(std::move(ev));
if (self->BufferFreeSpace > 0 && !self->ClientHasToken) {
self->ClientEventsQueue->PushEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
self->ClientHasToken = true;
.AcksHandler([selfCtx = SelfContext](NTopic::TWriteSessionEvent::TAcksEvent& ev) {
if (auto self = selfCtx->LockShared()) {
with_lock(self->Lock) {
Y_ABORT_UNLESS(ev.Acks.size() <= self->OriginalMessagesToGetAck.size());
for (size_t i = 0; i < ev.Acks.size(); ++i) {
self->BufferFreeSpace += self->OriginalMessagesToGetAck.front().Data.size();
self->OriginalMessagesToGetAck.pop_front();
}
self->ClientEventsQueue->PushEvent(std::move(ev));
if (self->BufferFreeSpace > 0 && !self->ClientHasToken) {
self->ClientEventsQueue->PushEvent(NTopic::TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
self->ClientHasToken = true;
}
}
}
})
.SessionClosedHandler([self = shared_from_this()](const NTopic::TSessionClosedEvent & ev){
with_lock(self->Lock) {
self->ClientEventsQueue->PushEvent(ev);
.SessionClosedHandler([selfCtx = SelfContext](const NTopic::TSessionClosedEvent & ev) {
if (auto self = selfCtx->LockShared()) {
with_lock(self->Lock) {
if (!self->Closing) {
self->CloseImpl(ev);
}
}
}
});

Expand All @@ -115,7 +129,7 @@ void TFederatedWriteSession::OpenSubSessionImpl(std::shared_ptr<TDbInfo> db) {
CurrentDatabase = db;
}

std::shared_ptr<TDbInfo> TFederatedWriteSession::SelectDatabaseImpl() {
std::shared_ptr<TDbInfo> TFederatedWriteSessionImpl::SelectDatabaseImpl() {
std::vector<std::shared_ptr<TDbInfo>> availableDatabases;
ui64 totalWeight = 0;

Expand Down Expand Up @@ -159,7 +173,7 @@ std::shared_ptr<TDbInfo> TFederatedWriteSession::SelectDatabaseImpl() {
Y_UNREACHABLE();
}

void TFederatedWriteSession::OnFederatedStateUpdateImpl() {
void TFederatedWriteSessionImpl::OnFederatedStateUpdateImpl() {
if (!FederationState->Status.IsSuccess()) {
CloseImpl(FederationState->Status.GetStatus(), NYql::TIssues(FederationState->Status.GetIssues()));
return;
Expand All @@ -186,16 +200,18 @@ void TFederatedWriteSession::OnFederatedStateUpdateImpl() {
ScheduleFederatedStateUpdateImpl(UPDATE_FEDERATION_STATE_DELAY);
}

void TFederatedWriteSession::ScheduleFederatedStateUpdateImpl(TDuration delay) {
void TFederatedWriteSessionImpl::ScheduleFederatedStateUpdateImpl(TDuration delay) {
Y_ABORT_UNLESS(Lock.IsLocked());
auto cb = [self = shared_from_this()](bool ok) {
auto cb = [selfCtx = SelfContext](bool ok) {
if (ok) {
with_lock(self->Lock) {
if (self->Closing) {
return;
if (auto self = selfCtx->LockShared()) {
with_lock(self->Lock) {
if (self->Closing) {
return;
}
self->FederationState = self->Observer->GetState();
self->OnFederatedStateUpdateImpl();
}
self->FederationState = self->Observer->GetState();
self->OnFederatedStateUpdateImpl();
}
}
};
Expand All @@ -211,24 +227,24 @@ void TFederatedWriteSession::ScheduleFederatedStateUpdateImpl(TDuration delay) {
UpdateStateDelayContext);
}

NThreading::TFuture<void> TFederatedWriteSession::WaitEvent() {
NThreading::TFuture<void> TFederatedWriteSessionImpl::WaitEvent() {
return ClientEventsQueue->WaitEvent();
}

TVector<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSession::GetEvents(bool block, TMaybe<size_t> maxEventsCount) {
TVector<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSessionImpl::GetEvents(bool block, TMaybe<size_t> maxEventsCount) {
return ClientEventsQueue->GetEvents(block, maxEventsCount);
}

TMaybe<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSession::GetEvent(bool block) {
TMaybe<NTopic::TWriteSessionEvent::TEvent> TFederatedWriteSessionImpl::GetEvent(bool block) {
auto events = GetEvents(block, 1);
return events.empty() ? Nothing() : TMaybe<NTopic::TWriteSessionEvent::TEvent>{std::move(events.front())};
}

NThreading::TFuture<ui64> TFederatedWriteSession::GetInitSeqNo() {
NThreading::TFuture<ui64> TFederatedWriteSessionImpl::GetInitSeqNo() {
return NThreading::MakeFuture<ui64>(0u);
}

void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo,
void TFederatedWriteSessionImpl::Write(NTopic::TContinuationToken&& token, TStringBuf data, TMaybe<ui64> seqNo,
TMaybe<TInstant> createTimestamp) {
NTopic::TWriteMessage message{std::move(data)};
if (seqNo.Defined())
Expand All @@ -238,11 +254,11 @@ void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, TStringBu
return WriteInternal(std::move(token), std::move(message));
}

void TFederatedWriteSession::Write(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
void TFederatedWriteSessionImpl::Write(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
return WriteInternal(std::move(token), TWrappedWriteMessage(std::move(message)));
}

void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, TStringBuf data, NTopic::ECodec codec,
void TFederatedWriteSessionImpl::WriteEncoded(NTopic::TContinuationToken&& token, TStringBuf data, NTopic::ECodec codec,
ui32 originalSize, TMaybe<ui64> seqNo, TMaybe<TInstant> createTimestamp) {
auto message = NTopic::TWriteMessage::CompressedMessage(std::move(data), codec, originalSize);
if (seqNo.Defined())
Expand All @@ -252,11 +268,11 @@ void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, TS
return WriteInternal(std::move(token), TWrappedWriteMessage(std::move(message)));
}

void TFederatedWriteSession::WriteEncoded(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
void TFederatedWriteSessionImpl::WriteEncoded(NTopic::TContinuationToken&& token, NTopic::TWriteMessage&& message) {
return WriteInternal(std::move(token), TWrappedWriteMessage(std::move(message)));
}

void TFederatedWriteSession::WriteInternal(NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) {
void TFederatedWriteSessionImpl::WriteInternal(NTopic::TContinuationToken&&, TWrappedWriteMessage&& wrapped) {
ClientHasToken = false;
if (!wrapped.Message.CreateTimestamp_.Defined()) {
wrapped.Message.CreateTimestamp_ = TInstant::Now();
Expand All @@ -278,7 +294,7 @@ void TFederatedWriteSession::WriteInternal(NTopic::TContinuationToken&&, TWrappe
}
}

bool TFederatedWriteSession::PrepareDeferredWrite(TDeferredWrite& deferred) {
bool TFederatedWriteSessionImpl::PrepareDeferredWrite(TDeferredWrite& deferred) {
if (PendingToken.Empty()) {
return false;
}
Expand All @@ -293,17 +309,25 @@ bool TFederatedWriteSession::PrepareDeferredWrite(TDeferredWrite& deferred) {
return true;
}

void TFederatedWriteSession::CloseImpl(EStatus statusCode, NYql::TIssues&& issues) {
void TFederatedWriteSessionImpl::CloseImpl(EStatus statusCode, NYql::TIssues&& issues, TDuration timeout) {
CloseImpl(TPlainStatus(statusCode, std::move(issues)), timeout);
}

void TFederatedWriteSessionImpl::CloseImpl(NTopic::TSessionClosedEvent const& ev, TDuration timeout) {
if (Closing) {
return;
}
Closing = true;
if (Subsession) {
Subsession->Close(TDuration::Zero());
Subsession->Close(timeout);
}
ClientEventsQueue->Close(TSessionClosedEvent(statusCode, std::move(issues)));
ClientEventsQueue->Close(ev);
NTopic::Cancel(UpdateStateDelayContext);
}

bool TFederatedWriteSession::Close(TDuration timeout) {
if (Subsession) {
return Subsession->Close(timeout);
bool TFederatedWriteSessionImpl::Close(TDuration timeout) {
with_lock (Lock) {
CloseImpl(EStatus::SUCCESS, {}, timeout);
}
return true;
}
Expand Down
Loading

0 comments on commit 77c6779

Please sign in to comment.