Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove session from pool in case of BAD_SESSION... #10437

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ NThreading::TFuture<TResponse> InjectSessionStatusInterception(
// Exclude CLIENT_RESOURCE_EXHAUSTED from transport errors which can cause to session disconnect
// since we have guarantee this request wasn't been started to execute.

if (status.IsTransportError() && status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED) {
if (status.IsTransportError()
&& status.GetStatus() != EStatus::CLIENT_RESOURCE_EXHAUSTED && status.GetStatus() != EStatus::CLIENT_OUT_OF_RANGE)
{
impl->MarkBroken();
} else if (status.GetStatus() == EStatus::SESSION_BUSY) {
impl->MarkBroken();
Expand All @@ -71,6 +73,7 @@ NThreading::TFuture<TResponse> InjectSessionStatusInterception(
impl->ScheduleTimeToTouch(RandomizeThreshold(timeout), impl->GetState() == TKqpSessionCommon::EState::S_ACTIVE);
}
}

if (cb) {
cb(value, *impl);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/public/sdk/cpp/client/ydb_query/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class TTransaction;
class TSession {
friend class TQueryClient;
friend class TTransaction;
friend class TExecuteQueryIterator;
public:
const TString& GetId() const;

Expand Down
16 changes: 15 additions & 1 deletion ydb/public/sdk/cpp/client/ydb_query/impl/exec_query.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
#define INCLUDE_YDB_INTERNAL_H
#include "exec_query.h"
#include "client_session.h"

#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/kqp_session_common/kqp_session_common.h>
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/session_pool/session_pool.h>
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
#undef INCLUDE_YDB_INTERNAL_H

Expand Down Expand Up @@ -59,7 +61,7 @@ class TExecuteQueryIterator::TReaderImpl {
return Finished_;
}

TAsyncExecuteQueryPart ReadNext(std::shared_ptr<TSelf> self) {
TAsyncExecuteQueryPart DoReadNext(std::shared_ptr<TSelf> self) {
auto promise = NThreading::NewPromise<TExecuteQueryPart>();
// Capture self - guarantee no dtor call during the read
auto readCb = [self, promise](TGRpcStatus&& grpcStatus) mutable {
Expand Down Expand Up @@ -100,6 +102,18 @@ class TExecuteQueryIterator::TReaderImpl {
StreamProcessor_->Read(&Response_, readCb);
return promise.GetFuture();
}

TAsyncExecuteQueryPart ReadNext(std::shared_ptr<TSelf> self) {
if (!Session_)
return DoReadNext(std::move(self));

return NSessionPool::InjectSessionStatusInterception(
Session_->SessionImpl_,
DoReadNext(std::move(self)),
false, // no need to ping stream session
TDuration::Zero());
}

private:
TStreamProcessorPtr StreamProcessor_;
TResponse Response_;
Expand Down
150 changes: 150 additions & 0 deletions ydb/services/ydb/sdk_sessions_ut/sdk_sessions_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,57 @@

#include <ydb/public/api/grpc/ydb_table_v1.grpc.pb.h>

#include <ydb/public/lib/ut_helpers/ut_helpers_query.h>

using namespace NYdb;
using namespace NYdb::NTable;

namespace {

void CreateTestTable(NYdb::TDriver& driver) {
NYdb::NTable::TTableClient client(driver);
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
auto result = session.ExecuteSchemeQuery(R"___(
CREATE TABLE `Root/Test` (
Key Uint32,
Value String,
PRIMARY KEY (Key)
);
)___").ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());
UNIT_ASSERT_VALUES_EQUAL(client.GetActiveSessionCount(), 1);
}

TString WarmPoolCreateSession(NYdb::NQuery::TQueryClient& client) {
TString sessionId;
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
sessionId = session.GetId();
auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());

TResultSetParser resultSet(res.GetResultSetParser(0));
UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 2);

return sessionId;
}

void WaitForSessionsInPool(NYdb::NQuery::TQueryClient& client, i64 expected) {
int attempt = 10;
while (attempt--) {
if (client.GetCurrentPoolSize() == expected)
break;
Sleep(TDuration::MilliSeconds(100));
}
UNIT_ASSERT_VALUES_EQUAL(client.GetCurrentPoolSize(), expected);
}

}

Y_UNIT_TEST_SUITE(YdbSdkSessions) {
Y_UNIT_TEST(TestSessionPool) {
TKikimrWithGrpcAndRootSchema server;
Expand Down Expand Up @@ -128,6 +176,108 @@ Y_UNIT_TEST_SUITE(YdbSdkSessions) {
driver.Stop(true);
}

Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryService) {
TKikimrWithGrpcAndRootSchema server;
ui16 grpc = server.GetPort();

TString location = TStringBuilder() << "localhost:" << grpc;
auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);

auto driver = NYdb::TDriver(
TDriverConfig()
.SetEndpoint(location));

CreateTestTable(driver);

NYdb::NQuery::TQueryClient client(driver);
TString sessionId = WarmPoolCreateSession(client);
WaitForSessionsInPool(client, 1);

bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);
UNIT_ASSERT(allDoneOk);

{
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId);

auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString());
}

WaitForSessionsInPool(client, 0);

{
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId);
auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}

WaitForSessionsInPool(client, 1);

driver.Stop(true);
}

Y_UNIT_TEST(TestSdkFreeSessionAfterBadSessionQueryServiceStreamCall) {
TKikimrWithGrpcAndRootSchema server;
ui16 grpc = server.GetPort();

TString location = TStringBuilder() << "localhost:" << grpc;
auto clientConfig = NGRpcProxy::TGRpcClientConfig(location);

auto driver = NYdb::TDriver(
TDriverConfig()
.SetEndpoint(location));

CreateTestTable(driver);

NYdb::NQuery::TQueryClient client(driver);
TString sessionId = WarmPoolCreateSession(client);
WaitForSessionsInPool(client, 1);

bool allDoneOk = true;
NTestHelpers::CheckDelete(clientConfig, sessionId, Ydb::StatusIds::SUCCESS, allDoneOk);
UNIT_ASSERT(allDoneOk);

{
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
UNIT_ASSERT_VALUES_EQUAL(session.GetId(), sessionId);

auto it = session.StreamExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());

auto res = it.ReadNext().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::BAD_SESSION, res.GetIssues().ToString());
}

WaitForSessionsInPool(client, 0);

{
auto sessionResponse = client.GetSession().ExtractValueSync();
UNIT_ASSERT(sessionResponse.IsSuccess());
auto session = sessionResponse.GetSession();
UNIT_ASSERT_VALUES_UNEQUAL(session.GetId(), sessionId);

auto res = session.ExecuteQuery("SELECT * FROM `Root/Test`", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(res.GetStatus(), EStatus::SUCCESS, res.GetIssues().ToString());
}

WaitForSessionsInPool(client, 1);

driver.Stop(true);
}

Y_UNIT_TEST(TestActiveSessionCountAfterTransportError) {
TKikimrWithGrpcAndRootSchema server;
ui16 grpc = server.GetPort();
Expand Down
1 change: 1 addition & 0 deletions ydb/services/ydb/sdk_sessions_ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ PEERDIR(
ydb/core/testlib/default
ydb/core/testlib
ydb/public/sdk/cpp/client/ydb_table
ydb/public/lib/ut_helpers
)

YQL_LAST_ABI_VERSION()
Expand Down
Loading