Skip to content

Commit

Permalink
Remove session from pool in case of BAD_SESSION... (#10437)
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik authored Oct 15, 2024
1 parent 168df93 commit 3ab7dd7
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ NThreading::TFuture<TResponse> InjectSessionStatusInterception(
// Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect
// since we have guarantee this request wasn't been started to execute.

if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
if (status.IsTransportError()
&& status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED && status.GetStatus() != EStatus::CLIENT_OUT_OF_RANGE)
{
impl->MarkBroken();
} else if (status.GetStatus() == EStatus::SESSION_BUSY) {
impl->MarkBroken();
Expand All @@ -71,6 +73,7 @@ NThreading::TFuture<TResponse> InjectSessionStatusInterception(
impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TKqpSessionCommon::EState::S_ACTIVE);
}
}

if (cb) {
cb(value, *impl);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/public/sdk/cpp/client/ydb_query/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class TTransaction;
class TSession {
friend class TQueryClient;
friend class TTransaction;
friend class TExecuteQueryIterator;
public:
const TString& GetId() const;

Expand Down
16 changes: 15 additions & 1 deletion ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#define INCLUDE_YDB_INTERNAL_H
#include "exec_query.h"
#include "client_session.h"

#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h>
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
#undef INCLUDE_YDB_INTERNAL_H

Expand Down Expand Up @@ -59,7 +61,7 @@ class TExecuteQueryIterator::TReaderImpl {
return Finished_;
}

TAsyncExecuteQueryPart ReadNext(std::shared_ptr<TSelf> self) {
TAsyncExecuteQueryPart DoReadNext(std::shared_ptr<TSelf> self) {
auto promise = NThreading::NewPromise<TExecuteQueryPart>();
// Capture self - guarantee no dtor call during the read
auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable {
Expand Down Expand Up @@ -100,6 +102,18 @@ class TExecuteQueryIterator::TReaderImpl {
StreamProcessor_->Read(&Response_, readCb);
return promise.GetFuture();
}

TAsyncExecuteQueryPart ReadNext(std::shared_ptr<TSelf> self) {
if (!Session_)
return DoReadNext(std::move(self));

return NSessionPool::InjectSessionStatusInterception(
Session_->SessionImpl_,
DoReadNext(std::move(self)),
false, // no need to ping stream session
TDuration::Zero());
}

private:
TStreamProcessorPtr StreamProcessor_;
TResponse Response_;
Expand Down
150 changes: 150 additions & 0 deletions ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,57 @@

#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>

#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>

using namespace NYdb;
using namespace NYdb::NTable;

namespace {

void CreateTestTable(NYdb::TDriver& driver) {
NYdb::NTable::TTableClient client(driver);
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
auto result = session.ExecuteSchemeQuery(R"___(
CREATE TABLE `Root/Test` (
Key Uint32,
Value String,
PRIMARY KEY (Key)
);
)___").ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
}

TString WarmPoolCreateSession(NYdb::NQuery::TQueryClient& client) {
TString sessionId;
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
sessionId = session.GetId();
auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());

TResultSetParser resultSet(res.GetResultSetParser(0));
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);

return sessionId;
}

void WaitForSessionsInPool(NYdb::NQuery::TQueryClient& client, i64 expected) {
int attempt = 10;
while (attempt--) {
if (client.GetCurrentPoolSize() == expected)
break;
Sleep(TDuration::MilliSeconds(100));
}
UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), expected);
}

}

Y_UNIT_TEST_SUITE(YdbSdkSessions) {
Y_UNIT_TEST(TestSessionPool) {
TKikimrWithGrpcAndRootSchema server;
Expand Down Expand Up @@ -128,6 +176,108 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) {
driver.Stop(true);
}

Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryService) {
TKikimrWithGrpcAndRootSchema server;
ui16 grpc = server.GetPort();

TString location = TStringBuilder() << "localhost:" << grpc;
auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);

auto driver = NYdb::TDriver(
TDriverConfig()
.SetEndpoint(location));

CreateTestTable(driver);

NYdb::NQuery::TQueryClient client(driver);
TString sessionId = WarmPoolCreateSession(client);
WaitForSessionsInPool(client, 1);

bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);
UNIT_ASSERT(allDoneOk);

{
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId);

auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString());
}

WaitForSessionsInPool(client, 0);

{
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId);
auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}

WaitForSessionsInPool(client, 1);

driver.Stop(true);
}

Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) {
TKikimrWithGrpcAndRootSchema server;
ui16 grpc = server.GetPort();

TString location = TStringBuilder() << "localhost:" << grpc;
auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);

auto driver = NYdb::TDriver(
TDriverConfig()
.SetEndpoint(location));

CreateTestTable(driver);

NYdb::NQuery::TQueryClient client(driver);
TString sessionId = WarmPoolCreateSession(client);
WaitForSessionsInPool(client, 1);

bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);
UNIT_ASSERT(allDoneOk);

{
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId);

auto it = session.StreamExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());

auto res = it.ReadNext().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString());
}

WaitForSessionsInPool(client, 0);

{
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId);

auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}

WaitForSessionsInPool(client, 1);

driver.Stop(true);
}

Y_UNIT_TEST(TestActiveSessionCountAfterTransportError) {
TKikimrWithGrpcAndRootSchema server;
ui16 grpc = server.GetPort();
Expand Down
1 change: 1 addition & 0 deletions ydb/services/ydb/sdk_sessions_ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ PEERDIR(
ydb/core/testlib/default
ydb/core/testlib
ydb/public/sdk/cpp/client/ydb_table
ydb/public/lib/ut_helpers
)

YQL_LAST_ABI_VERSION()
Expand Down

0 comments on commit 3ab7dd7

Please sign in to comment.