From 3ab7dd796b22380f98b56f8c7fdb1cdce512cec3 Mon Sep 17 00:00:00 2001 From: Daniil Cherednik Date: Tue, 15 Oct 2024 17:05:06 +0200 Subject: [PATCH] Remove session from pool in case of BAD_SESSION... (#10437) --- .../ydb_internal/session_pool/session_pool.h | 5 +- ydb/public/sdk/cpp/client/ydb_query/client.h | 1 + .../cpp/client/ydb_query/impl/exec_query.cpp | 16 +- .../ydb/sdk_sessions_ut/sdk_sessions_ut.cpp | 150 ++++++++++++++++++ ydb/services/ydb/sdk_sessions_ut/ya.make | 1 + 5 files changed, 171 insertions(+), 2 deletions(-) diff --git a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h index be5129918104..6e98e1ded7b7 100644 --- a/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h +++ b/ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h @@ -52,7 +52,9 @@ NThreading::TFuture InjectSessionStatusInterception( // Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect // since we have guarantee this request wasn't been started to execute. - if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) { + if (status.IsTransportError() + && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED && status.GetStatus() != EStatus::CLIENT_OUT_OF_RANGE) + { impl->MarkBroken(); } else if (status.GetStatus() == EStatus::SESSION_BUSY) { impl->MarkBroken(); @@ -71,6 +73,7 @@ NThreading::TFuture InjectSessionStatusInterception( impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TKqpSessionCommon::EState::S_ACTIVE); } } + if (cb) { cb(value, *impl); } diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.h b/ydb/public/sdk/cpp/client/ydb_query/client.h index 32357382fb52..a459c4b3981a 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.h +++ b/ydb/public/sdk/cpp/client/ydb_query/client.h @@ -133,6 +133,7 @@ class TTransaction; class TSession { friend class TQueryClient; friend class TTransaction; + friend class TExecuteQueryIterator; public: const TString& GetId() const; diff --git a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp index 3cd62c29c097..c575c508c6ad 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp @@ -1,9 +1,11 @@ #define INCLUDE_YDB_INTERNAL_H #include "exec_query.h" +#include "client_session.h" #include #include #include +#include #include #undef INCLUDE_YDB_INTERNAL_H @@ -59,7 +61,7 @@ class TExecuteQueryIterator::TReaderImpl { return Finished_; } - TAsyncExecuteQueryPart ReadNext(std::shared_ptr self) { + TAsyncExecuteQueryPart DoReadNext(std::shared_ptr self) { auto promise = NThreading::NewPromise(); // Capture self - guarantee no dtor call during the read auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable { @@ -100,6 +102,18 @@ class TExecuteQueryIterator::TReaderImpl { StreamProcessor_->Read(&Response_, readCb); return promise.GetFuture(); } + + TAsyncExecuteQueryPart ReadNext(std::shared_ptr self) { + if (!Session_) + return DoReadNext(std::move(self)); + + return NSessionPool::InjectSessionStatusInterception( + Session_->SessionImpl_, + DoReadNext(std::move(self)), + false, // no need to ping stream session + TDuration::Zero()); + } + private: TStreamProcessorPtr StreamProcessor_; TResponse Response_; diff --git a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp index aa83d0cf8fbd..3cb04441ae9f 100644 --- a/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp +++ b/ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp @@ -4,9 +4,57 @@ #include +#include + using namespace NYdb; using namespace NYdb::NTable; +namespace { + +void CreateTestTable(NYdb::TDriver& driver) { + NYdb::NTable::TTableClient client(driver); + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + auto result = session.ExecuteSchemeQuery(R"___( + CREATE TABLE `Root/Test` ( + Key Uint32, + Value String, + PRIMARY KEY (Key) + ); + )___").ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); + UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1); +} + +TString WarmPoolCreateSession(NYdb::NQuery::TQueryClient& client) { + TString sessionId; + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + sessionId = session.GetId(); + auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); + + TResultSetParser resultSet(res.GetResultSetParser(0)); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2); + + return sessionId; +} + +void WaitForSessionsInPool(NYdb::NQuery::TQueryClient& client, i64 expected) { + int attempt = 10; + while (attempt--) { + if (client.GetCurrentPoolSize() == expected) + break; + Sleep(TDuration::MilliSeconds(100)); + } + UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), expected); +} + +} + Y_UNIT_TEST_SUITE(YdbSdkSessions) { Y_UNIT_TEST(TestSessionPool) { TKikimrWithGrpcAndRootSchema server; @@ -128,6 +176,108 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) { driver.Stop(true); } + Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryService) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + TString location = TStringBuilder() << "localhost:" << grpc; + auto clientConfig = NGRpcProxy::TGRpcClientConfig(location); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + CreateTestTable(driver); + + NYdb::NQuery::TQueryClient client(driver); + TString sessionId = WarmPoolCreateSession(client); + WaitForSessionsInPool(client, 1); + + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk); + UNIT_ASSERT(allDoneOk); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId); + + auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString()); + } + + WaitForSessionsInPool(client, 0); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId); + auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); + } + + WaitForSessionsInPool(client, 1); + + driver.Stop(true); + } + + Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) { + TKikimrWithGrpcAndRootSchema server; + ui16 grpc = server.GetPort(); + + TString location = TStringBuilder() << "localhost:" << grpc; + auto clientConfig = NGRpcProxy::TGRpcClientConfig(location); + + auto driver = NYdb::TDriver( + TDriverConfig() + .SetEndpoint(location)); + + CreateTestTable(driver); + + NYdb::NQuery::TQueryClient client(driver); + TString sessionId = WarmPoolCreateSession(client); + WaitForSessionsInPool(client, 1); + + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk); + UNIT_ASSERT(allDoneOk); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId); + + auto it = session.StreamExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + + auto res = it.ReadNext().GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString()); + } + + WaitForSessionsInPool(client, 0); + + { + auto sessionResponse = client.GetSession().ExtractValueSync(); + UNIT_ASSERT(sessionResponse.IsSuccess()); + auto session = sessionResponse.GetSession(); + UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId); + + auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString()); + } + + WaitForSessionsInPool(client, 1); + + driver.Stop(true); + } + Y_UNIT_TEST(TestActiveSessionCountAfterTransportError) { TKikimrWithGrpcAndRootSchema server; ui16 grpc = server.GetPort(); diff --git a/ydb/services/ydb/sdk_sessions_ut/ya.make b/ydb/services/ydb/sdk_sessions_ut/ya.make index 81a58f849780..cf01d8be58a6 100644 --- a/ydb/services/ydb/sdk_sessions_ut/ya.make +++ b/ydb/services/ydb/sdk_sessions_ut/ya.make @@ -21,6 +21,7 @@ PEERDIR( ydb/core/testlib/default ydb/core/testlib ydb/public/sdk/cpp/client/ydb_table + ydb/public/lib/ut_helpers ) YQL_LAST_ABI_VERSION()