Skip to content

Commit

Permalink
Fix locking
Browse files Browse the repository at this point in the history
  • Loading branch information
dcherednik committed Jan 10, 2025
1 parent cc8d78d commit f96c8d4
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
3 changes: 2 additions & 1 deletion ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
DoClosedSessionRemovedWhileActiveTest(false);
}
*/
// Copy paster from table service but with some modifications for query service
// Copy paste from table service but with some modifications for query service
// Checks read iterators/session/sdk counters have expected values
Y_UNIT_TEST(CloseSessionsWithLoad) {
auto kikimr = std::make_shared<TKikimrRunner>();
Expand Down Expand Up @@ -194,6 +194,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {

auto result = session.ExecuteQuery(query, TTxControl::BeginTx()).GetValueSync();
if (!result.IsSuccess()) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::BAD_SESSION);
Cerr << "received non-success status for session " << id << Endl;
return;
}
Expand Down
35 changes: 18 additions & 17 deletions ydb/public/sdk/cpp/client/ydb_query/impl/client_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ namespace NYdb::NQuery {
// Why just not std::mutex? - Requirement do not destroy a mutex while it is locked
// makes it difficult to use here.

// TODO: Proably we can add sync version of Cancell method in to grpc reader to make sure
// Why thread id? - We destroy session from CloseFromServer call, so the session dtor called from thread
// which already got the lock.

// TODO: Proably we can add sync version of Cancel method in to grpc reader to make sure
// no more callback will be called.

class TSafeTSessionImplHolder {
TSession::TImpl* Ptr;
std::atomic_uint32_t Semaphore;
std::atomic<std::thread::id> OwnerThread;
public:
TSafeTSessionImplHolder(TSession::TImpl* p)
: Ptr(p)
Expand All @@ -36,33 +40,30 @@ class TSafeTSessionImplHolder {
TSession::TImpl* TrySharedOwning() noexcept {
auto old = Semaphore.fetch_add(1);
if (old == 0) {
OwnerThread.store(std::this_thread::get_id());
return Ptr;
} else {
Y_ABORT_UNLESS(old == 1);
return nullptr;
}
}

void Release() noexcept {
OwnerThread.store(std::thread::id());
Semaphore.store(0);
}

void WhaitAndLock() noexcept {
uint32_t cur;
void WaitAndLock() noexcept {
if (OwnerThread.load() == std::this_thread::get_id()) {
return;
}

uint32_t cur = 0;
uint32_t newVal = 1;
do {
cur = Semaphore.load(std::memory_order_relaxed);
#ifndef NDEBUG
if (cur > 1) {
Y_ABORT_UNLESS(false, "unexpected semaphore value");
}
#endif
if (cur != 0) {
while (!Semaphore.compare_exchange_weak(cur, newVal,
std::memory_order_release, std::memory_order_relaxed)) {
std::this_thread::yield();
continue;
}
} while (!Semaphore.compare_exchange_weak(cur, newVal,
std::memory_order_release, std::memory_order_relaxed));
cur = 0;
}
}
};

Expand Down Expand Up @@ -100,7 +101,7 @@ TSession::TImpl::TImpl(TStreamProcessorPtr ptr, const TString& sessionId, const
TSession::TImpl::~TImpl()
{
StreamProcessor_->Cancel();
SessionHolder->WhaitAndLock();
SessionHolder->WaitAndLock();
}

void TSession::TImpl::MakeImplAsync(TStreamProcessorPtr ptr,
Expand Down

0 comments on commit f96c8d4

Please sign in to comment.