Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare common library for locks #1605

Merged
merged 3 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/client/locks_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <ydb/core/engine/mkql_engine_flat_impl.h>
#include <ydb/core/testlib/test_client.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/datashard/datashard_locks.h>
#include <ydb/core/tx/locks/locks.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>

#include <library/cpp/testing/unittest/tests_data.h>
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/engine/minikql/minikql_engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <ydb/library/yql/minikql/computation/mkql_custom_list.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h>
#include <ydb/core/tx/datashard/sys_tables.h>
#include <ydb/core/tx/locks/sys_tables.h>

#include <library/cpp/containers/stack_vector/stack_vec.h>

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/tx/datashard/sys_tables.h>
#include <ydb/core/tx/locks/sys_tables.h>
#include <ydb/core/tx/datashard/datashard.h>

namespace NKikimr {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
#include <ydb/core/tx/datashard/datashard_impl.h>

namespace NKikimr::NKqp {

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/tx/datashard/datashard_failpoints.h>
#include <ydb/core/tx/datashard/datashard_impl.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>

#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/scan/kqp_split_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/datashard/datashard_impl.h>

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/sys_view/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "path.h"

#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/tx/datashard/sys_tables.h>
#include <ydb/core/tx/locks/sys_tables.h>

namespace NKikimr {
namespace NSysView {
Expand Down
19 changes: 19 additions & 0 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "columnshard_schema.h"
#include "hooks/abstract/abstract.h"
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/transactions/locks_db.h>

#include <ydb/core/tablet/tablet_exception.h>
#include <ydb/core/tx/columnshard/operations/write.h>
Expand Down Expand Up @@ -36,6 +37,7 @@ void TTxInit::SetDefaults() {
Self->LastWriteId = TWriteId{0};
Self->LastPlannedStep = 0;
Self->LastPlannedTxId = 0;
Self->LastCompletedTx = NOlap::TSnapshot::Zero();
Self->OwnerPathId = 0;
Self->OwnerPath.clear();
Self->LongTxWrites.clear();
Expand Down Expand Up @@ -71,6 +73,14 @@ bool TTxInit::Precharge(TTransactionContext& txc) {
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::OwnerPathId, Self->OwnerPathId);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::OwnerPath, Self->OwnerPath);

{
ui64 lastCompletedStep = 0;
ui64 lastCompletedTx = 0;
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastCompletedStep, lastCompletedStep);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastCompletedTxId, lastCompletedTx);
Self->LastCompletedTx = NOlap::TSnapshot(lastCompletedStep, lastCompletedTx);
}

if (!ready) {
return false;
}
Expand Down Expand Up @@ -177,6 +187,15 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
}
}
{
TMemoryProfileGuard g("TTxInit/LocksDB");
if (txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
TColumnShardLocksDb locksDb(*Self, txc);
if (!Self->SysLocks.Load(locksDb)) {
return false;
}
}
}

Self->UpdateInsertTableCounters();
Self->UpdateIndexCounters();
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/columnshard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
if (!!plannedItem) {
ui64 step = plannedItem->PlanStep;
ui64 txId = plannedItem->TxId;
LastCompletedTx = NOlap::TSnapshot(step, txId);
if (LastCompletedTx > Self->LastCompletedTx) {
NIceDb::TNiceDb db(txc.DB);
Schema::SaveSpecialValue(db, Schema::EValueIds::LastCompletedStep, LastCompletedTx->GetPlanStep());
Schema::SaveSpecialValue(db, Schema::EValueIds::LastCompletedTxId, LastCompletedTx->GetTxId());
}

TxOperator = Self->ProgressTxController->GetVerifiedTxOperator(txId);
AFL_VERIFY(TxOperator->Progress(*Self, NOlap::TSnapshot(step, txId), txc));
Expand All @@ -50,12 +56,16 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
if (TxOperator) {
TxOperator->Complete(*Self, ctx);
}
if (LastCompletedTx) {
Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);
}
Self->SetupIndexation();
}

private:
TTxController::ITransactionOperatior::TPtr TxOperator;
const ui32 TabletTxNo;
std::optional<NOlap::TSnapshot> LastCompletedTx;
};

void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, ScanCounters("Scan")
, WritesMonitor(*this)
, NormalizerController(StoragesManager, SubscribeCounters)
, SysLocks(this)
{
TabletCountersPtr.reset(new TProtobufTabletCounters<
ESimpleCounters_descriptor,
Expand Down
38 changes: 34 additions & 4 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tx/tx_processing.h>
#include <ydb/core/tx/locks/locks.h>
#include <ydb/services/metadata/service.h>

namespace NKikimr::NOlap {
Expand Down Expand Up @@ -206,10 +207,6 @@ class TColumnShard
TabletCounters->Cumulative()[counter].Increment(num);
}

void IncCounter(NColumnShard::EPercentileCounters counter, const TDuration& latency) const {
TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds());
}

void ActivateTiering(const ui64 pathId, const TString& useTiering, const bool onTabletInit = false);
void OnTieringModified();
public:
Expand All @@ -220,6 +217,37 @@ class TColumnShard
None /* "none" */
};

void IncCounter(NColumnShard::EPercentileCounters counter, const TDuration& latency) const {
TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds());
}

void IncCounter(NDataShard::ESimpleCounters counter, ui64 num = 1) const {
TabletCounters->Simple()[counter].Add(num);
}

// For syslocks
void IncCounter(NDataShard::ECumulativeCounters counter, ui64 num = 1) const {
TabletCounters->Cumulative()[counter].Increment(num);
}

void IncCounter(NDataShard::EPercentileCounters counter, ui64 num) const {
TabletCounters->Percentile()[counter].IncrementFor(num);
}

void IncCounter(NDataShard::EPercentileCounters counter, const TDuration& latency) const {
TabletCounters->Percentile()[counter].IncrementFor(latency.MilliSeconds());
}

inline TRowVersion LastCompleteTxVersion() const {
return TRowVersion(LastCompletedTx.GetPlanStep(), LastCompletedTx.GetTxId());
}

ui32 Generation() const { return Executor()->Generation(); }

bool IsUserTable(const TTableId&) const {
return true;
}

private:
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
EOverloadStatus CheckOverloaded(const ui64 tableId) const;
Expand Down Expand Up @@ -360,6 +388,7 @@ class TColumnShard
TWriteId LastWriteId = TWriteId{0};
ui64 LastPlannedStep = 0;
ui64 LastPlannedTxId = 0;
NOlap::TSnapshot LastCompletedTx = NOlap::TSnapshot::Zero();
ui64 LastExportNo = 0;

ui64 OwnerPathId = 0;
Expand Down Expand Up @@ -412,6 +441,7 @@ class TColumnShard
TLimits Limits;
TCompactionLimits CompactionLimits;
NOlap::TNormalizationController NormalizerController;
NDataShard::TSysLocks SysLocks;

void TryRegisterMediatorTimeCast();
void UnregisterMediatorTimeCast();
Expand Down
48 changes: 47 additions & 1 deletion ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ struct Schema : NIceDb::Schema {
ColumnsTableId,
CountersTableId,
OperationsTableId,
IndexesTableId
IndexesTableId,
LocksTableId,
LockRangesTableId,
LockConflictsTableId,
LockVolatileDependenciesTableId
};

enum class ETierTables: ui32 {
Expand All @@ -60,6 +64,8 @@ struct Schema : NIceDb::Schema {
LastExportNumber = 10,
OwnerPathId = 11,
OwnerPath = 12,
LastCompletedStep = 13,
LastCompletedTxId = 14,
};

enum class EInsertTableIds : ui8 {
Expand Down Expand Up @@ -305,6 +311,46 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size, RecordsCount, RawBytes>;
};

struct Locks : Table<LocksTableId> {
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
struct LockNodeId : Column<2, NScheme::NTypeIds::Uint32> {};
struct Generation : Column<3, NScheme::NTypeIds::Uint32> {};
struct Counter : Column<4, NScheme::NTypeIds::Uint64> {};
struct CreateTimestamp : Column<5, NScheme::NTypeIds::Uint64> {};
struct Flags : Column<6, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<LockId>;
using TColumns = TableColumns<LockId, LockNodeId, Generation, Counter, CreateTimestamp, Flags>;
};

struct LockRanges : Table<LockRangesTableId> {
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
struct RangeId : Column<2, NScheme::NTypeIds::Uint64> {};
struct PathOwnerId : Column<3, NScheme::NTypeIds::Uint64> {};
struct LocalPathId : Column<4, NScheme::NTypeIds::Uint64> {};
struct Flags : Column<5, NScheme::NTypeIds::Uint64> {};
struct Data : Column<6, NScheme::NTypeIds::String> {};

using TKey = TableKey<LockId, RangeId>;
using TColumns = TableColumns<LockId, RangeId, PathOwnerId, LocalPathId, Flags, Data>;
};

struct LockConflicts : Table<LockConflictsTableId> {
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
struct ConflictId : Column<2, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<LockId, ConflictId>;
using TColumns = TableColumns<LockId, ConflictId>;
};

struct LockVolatileDependencies : Table<LockVolatileDependenciesTableId> {
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
struct TxId : Column<2, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<LockId, TxId>;
using TColumns = TableColumns<LockId, TxId>;
};

using TTables = SchemaTables<
Value,
TxInfo,
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/transactions/locks_db.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "locks_db.h"

namespace NKikimr::NColumnShard {

}
27 changes: 27 additions & 0 deletions ydb/core/tx/columnshard/transactions/locks_db.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once
#include <ydb/core/tx/locks/locks_db.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>


namespace NKikimr::NColumnShard {

class TColumnShardLocksDb : public NLocks::TShardLocksDb<TColumnShard, NColumnShard::Schema> {
private:
using TBase = NLocks::TShardLocksDb<TColumnShard, NColumnShard::Schema>;

public:
using TBase::TBase;

void PersistRemoveLock(ui64 lockId) override {
NIceDb::TNiceDb db(DB);
db.Table<NColumnShard::Schema::Locks>().Key(lockId).Delete();
HasChanges_ = true;
}

bool MayAddLock(ui64) override {
return true;
}

};

}
7 changes: 1 addition & 6 deletions ydb/core/tx/columnshard/transactions/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,14 @@ LIBRARY()

SRCS(
tx_controller.cpp
locks_db.cpp
)

PEERDIR(
ydb/core/tablet_flat
ydb/core/tx/data_events
)

IF (OS_WINDOWS)
CFLAGS(
-DKIKIMR_DISABLE_S3_OPS
)
ENDIF()

YQL_LAST_ABI_VERSION()

END()
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#include "datashard_impl.h"
#include "datashard_counters.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

#include <ydb/core/tx/locks/time_counters.h>

namespace NKikimr {
namespace NDataShard {

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include "datashard_impl.h"
#include "datashard_user_db.h"
#include "datashard__engine_host.h"
#include "sys_tables.h"
#include <ydb/core/tx/locks/sys_tables.h>

#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

#include "datashard_active_transaction.h"
#include "datashard_kqp.h"
#include "datashard_locks.h"
#include "datashard_impl.h"
#include "datashard_failpoints.h"
#include "key_conflicts.h"

#include <ydb/core/tx/locks/locks.h>
#include <ydb/library/actors/util/memory_track.h>

namespace NKikimr {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_active_transaction.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "datashard.h"
#include "datashard_locks.h"
#include <ydb/core/tx/locks/locks.h>
#include "datashard__engine_host.h"
#include "operation.h"

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_dep_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "datashard.h"
#include "datashard_user_table.h"
#include "datashard_active_transaction.h"
#include "range_treap.h"
#include <ydb/core/tx/locks/range_treap.h>

#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>

Expand Down
Loading
Loading