Skip to content

Commit

Permalink
Merge 1c344f0 into 3dce9fe
Browse files Browse the repository at this point in the history
  • Loading branch information
fexolm authored Sep 13, 2024
2 parents 3dce9fe + 1c344f0 commit 0411b3d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 11 deletions.
14 changes: 10 additions & 4 deletions ydb/core/tx/columnshard/data_sharing/manager/sessions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,17 @@ bool TSessionsManager::Load(NTable::TDatabase& database, const TColumnEngineForL
NKikimrColumnShardDataSharingProto::TSourceSession protoSession;
AFL_VERIFY(protoSession.ParseFromString(rowset.GetValue<Schema::SourceSessions::Details>()));

NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic protoSessionCursorDynamic;
AFL_VERIFY(protoSessionCursorDynamic.ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorDynamic>()));
std::optional<NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic> protoSessionCursorDynamic;
if (rowset.HaveValue<Schema::SourceSessions::CursorDynamic>()) {
protoSessionCursorDynamic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorDynamic{};
AFL_VERIFY(protoSessionCursorDynamic->ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorDynamic>()));
}

NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic protoSessionCursorStatic;
AFL_VERIFY(protoSessionCursorStatic.ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorStatic>()));
std::optional<NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic> protoSessionCursorStatic;
if (rowset.HaveValue<Schema::SourceSessions::CursorStatic>()) {
protoSessionCursorStatic = NKikimrColumnShardDataSharingProto::TSourceSession::TCursorStatic{};
AFL_VERIFY(protoSessionCursorStatic->ParseFromString(rowset.GetValue<Schema::SourceSessions::CursorStatic>()));
}

AFL_VERIFY(index);
session->DeserializeFromProto(protoSession, protoSessionCursorDynamic, protoSessionCursorStatic).Validate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ bool TTxDataAckToSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc
}

NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId())
.Update(NIceDb::TUpdate<Schema::SourceSessions::CursorDynamic>(Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString()));
if (!Session->GetCursorVerified()->GetStaticSaved()) {
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId())
.Update(NIceDb::TUpdate<Schema::SourceSessions::CursorStatic>(Session->GetCursorVerified()->SerializeStaticToProto().SerializeAsString()));

TString cursorDynamic = Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString();

if (Session->GetCursorVerified()->GetStaticSaved()) {
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId()).Update<Schema::SourceSessions::CursorDynamic>(std::move(cursorDynamic));
} else {
TString cursorStatic = Session->GetCursorVerified()->SerializeStaticToProto().SerializeAsString();
db.Table<Schema::SourceSessions>()
.Key(Session->GetSessionId())
.Update<Schema::SourceSessions::CursorDynamic, Schema::SourceSessions::CursorStatic>(
std::move(cursorDynamic), std::move(cursorStatic));
Session->GetCursorVerified()->SetStaticSaved(true);
}
std::swap(SharedBlobIds, sharedTabletBlobIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@ namespace NKikimr::NOlap::NDataSharing {
bool TTxWriteSourceCursor::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId())
.Update(NIceDb::TUpdate<Schema::SourceSessions::CursorDynamic>(Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString()));

TString cursorDynamic = Session->GetCursorVerified()->SerializeDynamicToProto().SerializeAsString();

if (Session->GetCursorVerified()->GetStaticSaved()) {
db.Table<Schema::SourceSessions>().Key(Session->GetSessionId()).Update<Schema::SourceSessions::CursorDynamic>(std::move(cursorDynamic));
} else {
TString cursorStatic = Session->GetCursorVerified()->SerializeStaticToProto().SerializeAsString();
db.Table<Schema::SourceSessions>()
.Key(Session->GetSessionId())
.Update<Schema::SourceSessions::CursorDynamic, Schema::SourceSessions::CursorStatic>(
std::move(cursorDynamic), std::move(cursorStatic));
Session->GetCursorVerified()->SetStaticSaved(true);
}
return true;
}

Expand Down

0 comments on commit 0411b3d

Please sign in to comment.