From 1c344f08b075c44059a3060346fab6832d6e37e8 Mon Sep 17 00:00:00 2001 From: Artem Alekseev Date: Fri, 13 Sep 2024 22:21:08 +0300 Subject: [PATCH] Fix loading session without cursor --- .../data_sharing/manager/sessions.cpp | 14 ++++++++++---- .../transactions/tx_data_ack_to_source.cpp | 16 +++++++++++----- .../transactions/tx_write_source_cursor.cpp | 15 +++++++++++++-- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp b/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp index 18a30ac76061..daea14bd8edb 100644 --- a/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp +++ b/ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp @@ -67,11 +67,17 @@ bool TSessionsManager::Load(NTable::TDatabase& database, const TColumnEngineForL NKikimrColumnShardDataSharingProto::TSourceSession protoSession; AFL_VERIFY(protoSession.ParseFromString(rowset.GetValue())); - NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic protoSessionCursorDynamic; - AFL_VERIFY(protoSessionCursorDynamic.ParseFromString(rowset.GetValue())); + std::optional protoSessionCursorDynamic; + if (rowset.HaveValue()) { + protoSessionCursorDynamic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic{}; + AFL_VERIFY(protoSessionCursorDynamic->ParseFromString(rowset.GetValue())); + } - NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic protoSessionCursorStatic; - AFL_VERIFY(protoSessionCursorStatic.ParseFromString(rowset.GetValue())); + std::optional protoSessionCursorStatic; + if (rowset.HaveValue()) { + protoSessionCursorStatic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic{}; + AFL_VERIFY(protoSessionCursorStatic->ParseFromString(rowset.GetValue())); + } AFL_VERIFY(index); session->DeserializeFromProto(protoSession, protoSessionCursorDynamic, protoSessionCursorStatic).Validate(); diff --git a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp index 5a9bb1cf1274..33b7a8efde40 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_data_ack_to_source.cpp @@ -22,11 +22,17 @@ bool TTxDataAckToSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc } NIceDb::TNiceDb db(txc.DB); - db.Table().Key(Session->GetSessionId()) - .Update(NIceDb::TUpdate(Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString())); - if (!Session->GetCursorVerified()->GetStaticSaved()) { - db.Table().Key(Session->GetSessionId()) - .Update(NIceDb::TUpdate(Session->GetCursorVerified()->SerializeStaticToProto().SerializeAsString())); + + TString cursorDynamic = Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString(); + + if (Session->GetCursorVerified()->GetStaticSaved()) { + db.Table().Key(Session->GetSessionId()).Update(std::move(cursorDynamic)); + } else { + TString cursorStatic = Session->GetCursorVerified()->SerializeStaticToProto().SerializeAsString(); + db.Table() + .Key(Session->GetSessionId()) + .Update( + std::move(cursorDynamic), std::move(cursorStatic)); Session->GetCursorVerified()->SetStaticSaved(true); } std::swap(SharedBlobIds, sharedTabletBlobIds); diff --git a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.cpp b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.cpp index 4af96622de2b..64d5e113e0a2 100644 --- a/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.cpp +++ b/ydb/core/tx/columnshard/data_sharing/source/transactions/tx_write_source_cursor.cpp @@ -6,8 +6,19 @@ namespace NKikimr::NOlap::NDataSharing { bool TTxWriteSourceCursor::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { using namespace NColumnShard; NIceDb::TNiceDb db(txc.DB); - db.Table().Key(Session->GetSessionId()) - .Update(NIceDb::TUpdate(Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString())); + + TString cursorDynamic = Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString(); + + if (Session->GetCursorVerified()->GetStaticSaved()) { + db.Table().Key(Session->GetSessionId()).Update(std::move(cursorDynamic)); + } else { + TString cursorStatic = Session->GetCursorVerified()->SerializeStaticToProto().SerializeAsString(); + db.Table() + .Key(Session->GetSessionId()) + .Update( + std::move(cursorDynamic), std::move(cursorStatic)); + Session->GetCursorVerified()->SetStaticSaved(true); + } return true; }