Skip to content

Commit

Permalink
Merge a8416a5 into bc5b8c1
Browse files Browse the repository at this point in the history
  • Loading branch information
nsofya authored Feb 6, 2024
2 parents bc5b8c1 + a8416a5 commit f7167a8
Show file tree
Hide file tree
Showing 67 changed files with 456 additions and 2,329 deletions.
2 changes: 1 addition & 1 deletion ydb/core/client/locks_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include <ydb/core/engine/mkql_engine_flat_impl.h>
#include <ydb/core/testlib/test_client.h>
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/tx/datashard/datashard_locks.h>
#include <ydb/core/tx/locks/locks.h>
#include <ydb/public/lib/deprecated/kicli/kicli.h>

#include <library/cpp/testing/unittest/tests_data.h>
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/engine/minikql/minikql_engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include <ydb/library/yql/minikql/computation/mkql_custom_list.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h>
#include <ydb/core/tx/datashard/sys_tables.h>
#include <ydb/core/tx/locks/sys_tables.h>

#include <library/cpp/containers/stack_vector/stack_vec.h>

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#include <ydb/library/yql/minikql/mkql_node.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
#include <ydb/core/tx/datashard/sys_tables.h>
#include <ydb/core/tx/locks/sys_tables.h>
#include <ydb/core/tx/datashard/datashard.h>

namespace NKikimr {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
#include <ydb/core/tx/datashard/datashard_impl.h>

namespace NKikimr::NKqp {

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/tx/datashard/datashard_failpoints.h>
#include <ydb/core/tx/datashard/datashard_impl.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>

#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/scan/kqp_split_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/core/tx/datashard/datashard_impl.h>

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/sys_view/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "path.h"

#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/tx/datashard/sys_tables.h>
#include <ydb/core/tx/locks/sys_tables.h>

namespace NKikimr {
namespace NSysView {
Expand Down
19 changes: 19 additions & 0 deletions ydb/core/tx/columnshard/columnshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "columnshard_schema.h"
#include "hooks/abstract/abstract.h"
#include <ydb/core/tx/columnshard/blobs_action/blob_manager_db.h>
#include <ydb/core/tx/columnshard/transactions/locks_db.h>

#include <ydb/core/tablet/tablet_exception.h>
#include <ydb/core/tx/columnshard/operations/write.h>
Expand Down Expand Up @@ -36,6 +37,7 @@ void TTxInit::SetDefaults() {
Self->LastWriteId = TWriteId{0};
Self->LastPlannedStep = 0;
Self->LastPlannedTxId = 0;
Self->LastCompletedTx = NOlap::TSnapshot::Zero();
Self->OwnerPathId = 0;
Self->OwnerPath.clear();
Self->LongTxWrites.clear();
Expand Down Expand Up @@ -71,6 +73,14 @@ bool TTxInit::Precharge(TTransactionContext& txc) {
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::OwnerPathId, Self->OwnerPathId);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::OwnerPath, Self->OwnerPath);

{
ui64 lastCompletedStep = 0;
ui64 lastCompletedTx = 0;
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastCompletedStep, lastCompletedStep);
ready = ready && Schema::GetSpecialValue(db, Schema::EValueIds::LastCompletedTxId, lastCompletedTx);
Self->LastCompletedTx = NOlap::TSnapshot(lastCompletedStep, lastCompletedTx);
}

if (!ready) {
return false;
}
Expand Down Expand Up @@ -177,6 +187,15 @@ bool TTxInit::ReadEverything(TTransactionContext& txc, const TActorContext& ctx)
}
}
}
{
TMemoryProfileGuard g("TTxInit/LocksDB");
if (txc.DB.GetScheme().GetTableInfo(Schema::Locks::TableId)) {
TColumnShardLocksDb locksDb(*Self, txc);
if (!Self->SysLocks.Load(locksDb)) {
return false;
}
}
}

Self->UpdateInsertTableCounters();
Self->UpdateIndexCounters();
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/columnshard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
if (!!plannedItem) {
ui64 step = plannedItem->PlanStep;
ui64 txId = plannedItem->TxId;
LastCompletedTx = NOlap::TSnapshot(step, txId);
if (LastCompletedTx > Self->LastCompletedTx) {
NIceDb::TNiceDb db(txc.DB);
Schema::SaveSpecialValue(db, Schema::EValueIds::LastCompletedStep, LastCompletedTx->GetPlanStep());
Schema::SaveSpecialValue(db, Schema::EValueIds::LastCompletedTxId, LastCompletedTx->GetTxId());
}

TxOperator = Self->ProgressTxController->GetVerifiedTxOperator(txId);
AFL_VERIFY(TxOperator->Progress(*Self, NOlap::TSnapshot(step, txId), txc));
Expand All @@ -50,12 +56,16 @@ class TColumnShard::TTxProgressTx : public TTransactionBase<TColumnShard> {
if (TxOperator) {
TxOperator->Complete(*Self, ctx);
}
if (LastCompletedTx) {
Self->LastCompletedTx = std::max(*LastCompletedTx, Self->LastCompletedTx);
}
Self->SetupIndexation();
}

private:
TTxController::ITransactionOperatior::TPtr TxOperator;
const ui32 TabletTxNo;
std::optional<NOlap::TSnapshot> LastCompletedTx;
};

void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, ScanCounters("Scan")
, WritesMonitor(*this)
, NormalizerController(StoragesManager, SubscribeCounters)
, SysLocks(this)
{
TabletCountersPtr.reset(new TProtobufTabletCounters<
ESimpleCounters_descriptor,
Expand Down
38 changes: 34 additions & 4 deletions ydb/core/tx/columnshard/columnshard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <ydb/core/tx/tiering/manager.h>
#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tx/tx_processing.h>
#include <ydb/core/tx/locks/locks.h>
#include <ydb/services/metadata/service.h>

namespace NKikimr::NOlap {
Expand Down Expand Up @@ -206,10 +207,6 @@ class TColumnShard
TabletCounters->Cumulative()[counter].Increment(num);
}

void IncCounter(NColumnShard::EPercentileCounters counter, const TDuration& latency) const {
TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds());
}

void ActivateTiering(const ui64 pathId, const TString& useTiering, const bool onTabletInit = false);
void OnTieringModified();
public:
Expand All @@ -220,6 +217,37 @@ class TColumnShard
None /* "none" */
};

void IncCounter(NColumnShard::EPercentileCounters counter, const TDuration& latency) const {
TabletCounters->Percentile()[counter].IncrementFor(latency.MicroSeconds());
}

void IncCounter(NDataShard::ESimpleCounters counter, ui64 num = 1) const {
TabletCounters->Simple()[counter].Add(num);
}

// For syslocks
void IncCounter(NDataShard::ECumulativeCounters counter, ui64 num = 1) const {
TabletCounters->Cumulative()[counter].Increment(num);
}

void IncCounter(NDataShard::EPercentileCounters counter, ui64 num) const {
TabletCounters->Percentile()[counter].IncrementFor(num);
}

void IncCounter(NDataShard::EPercentileCounters counter, const TDuration& latency) const {
TabletCounters->Percentile()[counter].IncrementFor(latency.MilliSeconds());
}

inline TRowVersion LastCompleteTxVersion() const {
return TRowVersion(LastCompletedTx.GetPlanStep(), LastCompletedTx.GetTxId());
}

ui32 Generation() const { return Executor()->Generation(); }

bool IsUserTable(const TTableId&) const {
return true;
}

private:
void OverloadWriteFail(const EOverloadStatus overloadReason, const NEvWrite::TWriteData& writeData, std::unique_ptr<NActors::IEventBase>&& event, const TActorContext& ctx);
EOverloadStatus CheckOverloaded(const ui64 tableId) const;
Expand Down Expand Up @@ -360,6 +388,7 @@ class TColumnShard
TWriteId LastWriteId = TWriteId{0};
ui64 LastPlannedStep = 0;
ui64 LastPlannedTxId = 0;
NOlap::TSnapshot LastCompletedTx = NOlap::TSnapshot::Zero();
ui64 LastExportNo = 0;

ui64 OwnerPathId = 0;
Expand Down Expand Up @@ -412,6 +441,7 @@ class TColumnShard
TLimits Limits;
TCompactionLimits CompactionLimits;
NOlap::TNormalizationController NormalizerController;
NDataShard::TSysLocks SysLocks;

void TryRegisterMediatorTimeCast();
void UnregisterMediatorTimeCast();
Expand Down
48 changes: 47 additions & 1 deletion ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ struct Schema : NIceDb::Schema {
ColumnsTableId,
CountersTableId,
OperationsTableId,
IndexesTableId
IndexesTableId,
LocksTableId,
LockRangesTableId,
LockConflictsTableId,
LockVolatileDependenciesTableId
};

enum class ETierTables: ui32 {
Expand All @@ -60,6 +64,8 @@ struct Schema : NIceDb::Schema {
LastExportNumber = 10,
OwnerPathId = 11,
OwnerPath = 12,
LastCompletedStep = 13,
LastCompletedTxId = 14,
};

enum class EInsertTableIds : ui8 {
Expand Down Expand Up @@ -305,6 +311,46 @@ struct Schema : NIceDb::Schema {
using TColumns = TableColumns<PathId, PortionId, IndexId, ChunkIdx, Blob, Offset, Size, RecordsCount, RawBytes>;
};

struct Locks : Table<LocksTableId> {
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
struct LockNodeId : Column<2, NScheme::NTypeIds::Uint32> {};
struct Generation : Column<3, NScheme::NTypeIds::Uint32> {};
struct Counter : Column<4, NScheme::NTypeIds::Uint64> {};
struct CreateTimestamp : Column<5, NScheme::NTypeIds::Uint64> {};
struct Flags : Column<6, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<LockId>;
using TColumns = TableColumns<LockId, LockNodeId, Generation, Counter, CreateTimestamp, Flags>;
};

struct LockRanges : Table<LockRangesTableId> {
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
struct RangeId : Column<2, NScheme::NTypeIds::Uint64> {};
struct PathOwnerId : Column<3, NScheme::NTypeIds::Uint64> {};
struct LocalPathId : Column<4, NScheme::NTypeIds::Uint64> {};
struct Flags : Column<5, NScheme::NTypeIds::Uint64> {};
struct Data : Column<6, NScheme::NTypeIds::String> {};

using TKey = TableKey<LockId, RangeId>;
using TColumns = TableColumns<LockId, RangeId, PathOwnerId, LocalPathId, Flags, Data>;
};

struct LockConflicts : Table<LockConflictsTableId> {
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
struct ConflictId : Column<2, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<LockId, ConflictId>;
using TColumns = TableColumns<LockId, ConflictId>;
};

struct LockVolatileDependencies : Table<LockVolatileDependenciesTableId> {
struct LockId : Column<1, NScheme::NTypeIds::Uint64> {};
struct TxId : Column<2, NScheme::NTypeIds::Uint64> {};

using TKey = TableKey<LockId, TxId>;
using TColumns = TableColumns<LockId, TxId>;
};

using TTables = SchemaTables<
Value,
TxInfo,
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/transactions/locks_db.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "locks_db.h"

namespace NKikimr::NColumnShard {

}
27 changes: 27 additions & 0 deletions ydb/core/tx/columnshard/transactions/locks_db.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once
#include <ydb/core/tx/locks/locks_db.h>
#include <ydb/core/tx/columnshard/columnshard_impl.h>


namespace NKikimr::NColumnShard {

class TColumnShardLocksDb : public NLocks::TShardLocksDb<TColumnShard, NColumnShard::Schema> {
private:
using TBase = NLocks::TShardLocksDb<TColumnShard, NColumnShard::Schema>;

public:
using TBase::TBase;

void PersistRemoveLock(ui64 lockId) override {
NIceDb::TNiceDb db(DB);
db.Table<NColumnShard::Schema::Locks>().Key(lockId).Delete();
HasChanges_ = true;
}

bool MayAddLock(ui64) override {
return true;
}

};

}
7 changes: 1 addition & 6 deletions ydb/core/tx/columnshard/transactions/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,14 @@ LIBRARY()

SRCS(
tx_controller.cpp
locks_db.cpp
)

PEERDIR(
ydb/core/tablet_flat
ydb/core/tx/data_events
)

IF (OS_WINDOWS)
CFLAGS(
-DKIKIMR_DISABLE_S3_OPS
)
ENDIF()

YQL_LAST_ABI_VERSION()

END()
3 changes: 2 additions & 1 deletion ydb/core/tx/datashard/build_and_wait_dependencies_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#include "datashard_impl.h"
#include "datashard_counters.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

#include <ydb/core/tx/locks/time_counters.h>

namespace NKikimr {
namespace NDataShard {

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard__engine_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#include "datashard_impl.h"
#include "datashard_user_db.h"
#include "datashard__engine_host.h"
#include "sys_tables.h"
#include <ydb/core/tx/locks/sys_tables.h>

#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

#include "datashard_active_transaction.h"
#include "datashard_kqp.h"
#include "datashard_locks.h"
#include "datashard_impl.h"
#include "datashard_failpoints.h"
#include "key_conflicts.h"

#include <ydb/core/tx/locks/locks.h>
#include <ydb/library/actors/util/memory_track.h>

namespace NKikimr {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_active_transaction.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#pragma once

#include "datashard.h"
#include "datashard_locks.h"
#include <ydb/core/tx/locks/locks.h>
#include "datashard__engine_host.h"
#include "operation.h"

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/datashard_dep_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#include "datashard.h"
#include "datashard_user_table.h"
#include "datashard_active_transaction.h"
#include "range_treap.h"
#include <ydb/core/tx/locks/range_treap.h>

#include <library/cpp/containers/absl_flat_hash/flat_hash_map.h>

Expand Down
Loading

0 comments on commit f7167a8

Please sign in to comment.