Skip to content

Commit

Permalink
Merge 2aae8d1 into 0c2b198
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Oct 28, 2024
2 parents 0c2b198 + 2aae8d1 commit 8fd095b
Show file tree
Hide file tree
Showing 25 changed files with 57 additions and 171 deletions.
3 changes: 1 addition & 2 deletions ydb/core/tx/columnshard/columnshard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ void TColumnShard::UpdateIndexCounters() {
auto& stats = TablesManager.MutablePrimaryIndex().GetTotalStats();
const std::shared_ptr<const TTabletCountersHandle>& counters = Counters.GetTabletCounters();
counters->SetCounter(COUNTER_INDEX_TABLES, stats.Tables);
counters->SetCounter(COUNTER_INDEX_COLUMN_RECORDS, stats.ColumnRecords);
counters->SetCounter(COUNTER_INSERTED_PORTIONS, stats.GetInsertedStats().Portions);
counters->SetCounter(COUNTER_INSERTED_BLOBS, stats.GetInsertedStats().Blobs);
counters->SetCounter(COUNTER_INSERTED_ROWS, stats.GetInsertedStats().Rows);
Expand Down Expand Up @@ -300,7 +299,7 @@ void TColumnShard::UpdateIndexCounters() {
LOG_S_DEBUG("Index: tables " << stats.Tables << " inserted " << stats.GetInsertedStats().DebugString() << " compacted "
<< stats.GetCompactedStats().DebugString() << " s-compacted " << stats.GetSplitCompactedStats().DebugString()
<< " inactive " << stats.GetInactiveStats().DebugString() << " evicted "
<< stats.GetEvictedStats().DebugString() << " column records " << stats.ColumnRecords << " at tablet "
<< stats.GetEvictedStats().DebugString() << " at tablet "
<< TabletID());
}

Expand Down
31 changes: 4 additions & 27 deletions ydb/core/tx/columnshard/counters/engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,20 +86,8 @@ void TEngineLogsCounters::OnActualizationTask(const ui32 evictCount, const ui32
void TEngineLogsCounters::TPortionsInfoGuard::OnNewPortion(const std::shared_ptr<NOlap::TPortionInfo>& portion) const {
const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced);
Y_ABORT_UNLESS(producedId < BlobGuards.size());
THashSet<NOlap::TUnifiedBlobId> blobIds;
for (auto&& i : portion->GetRecords()) {
const auto blobId = portion->GetBlobId(i.GetBlobRange().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Add(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& i : portion->GetIndexes()) {
if (i.HasBlobRange()) {
const auto blobId = portion->GetBlobId(i.GetBlobRangeVerified().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Add(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& blobId : portion->GetBlobIds()) {
BlobGuards[producedId]->Add(blobId.BlobSize(), blobId.BlobSize());
}
PortionRecordCountGuards[producedId]->Add(portion->GetRecordsCount(), 1);
PortionSizeGuards[producedId]->Add(portion->GetTotalBlobBytes(), 1);
Expand All @@ -109,19 +97,8 @@ void TEngineLogsCounters::TPortionsInfoGuard::OnDropPortion(const std::shared_pt
const ui32 producedId = (ui32)(portion->HasRemoveSnapshot() ? NOlap::NPortion::EProduced::INACTIVE : portion->GetMeta().Produced);
Y_ABORT_UNLESS(producedId < BlobGuards.size());
THashSet<NOlap::TUnifiedBlobId> blobIds;
for (auto&& i : portion->GetRecords()) {
const auto blobId = portion->GetBlobId(i.GetBlobRange().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Sub(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& i : portion->GetIndexes()) {
if (i.HasBlobRange()) {
const auto blobId = portion->GetBlobId(i.GetBlobRangeVerified().GetBlobIdxVerified());
if (blobIds.emplace(blobId).second) {
BlobGuards[producedId]->Sub(blobId.BlobSize(), blobId.BlobSize());
}
}
for (auto&& blobId : portion->GetBlobIds()) {
BlobGuards[producedId]->Sub(blobId.BlobSize(), blobId.BlobSize());
}
PortionRecordCountGuards[producedId]->Sub(portion->GetRecordsCount(), 1);
PortionSizeGuards[producedId]->Sub(portion->GetTotalBlobBytes(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class TPathIdData {
void InitPortionIds(ui64* lastPortionId, const std::optional<ui64> pathId = {}) {
AFL_VERIFY(lastPortionId);
for (auto&& i : Portions) {
i.SetPortion(++*lastPortionId);
i.SetPortionId(++*lastPortionId);
if (pathId) {
i.SetPathId(*pathId);
}
Expand Down
12 changes: 4 additions & 8 deletions ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,14 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("portions", sb)("task_id", GetTaskIdentifier());
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size());

THashSet<TUnifiedBlobId> blobsDeactivated;
for (auto& [_, portionInfo] : PortionsToRemove) {
for (auto& rec : portionInfo.Records) {
blobsDeactivated.emplace(portionInfo.GetBlobId(rec.BlobRange.GetBlobIdxVerified()));
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_BLOBS_DEACTIVATED, portionInfo.GetBlobIdsCount());
for (auto& blobId : portionInfo.GetBlobIds()) {
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_BYTES_DEACTIVATED, blobId.BlobSize());
}
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_RAW_BYTES_DEACTIVATED, portionInfo.GetTotalRawBytes());
}

self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_BLOBS_DEACTIVATED, blobsDeactivated.size());
for (auto& blobId : blobsDeactivated) {
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_BYTES_DEACTIVATED, blobId.BlobSize());
}
}
if (PortionsToMove.size()) {
THashMap<ui32, TSimplePortionsGroupInfo> portionGroups;
Expand All @@ -104,7 +100,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
for (auto& [_, portionInfo] : PortionsToRemove) {
context.EngineLogs.AddCleanupPortion(portionInfo);
const TPortionInfo& oldInfo =
context.EngineLogs.GetGranuleVerified(portionInfo.GetPathId()).GetPortionVerified(portionInfo.GetPortion());
context.EngineLogs.GetGranuleVerified(portionInfo.GetPathId()).GetPortionVerified(portionInfo.GetPortionId());
context.EngineLogs.UpsertPortion(portionInfo, &oldInfo);
}
for (auto& portionBuilder : AppendedPortions) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/column_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ TSelectInfo::TStats TSelectInfo::Stats() const {
for (auto& portionInfo : PortionsOrderedPK) {
out.Records += portionInfo->NumChunks();
out.Rows += portionInfo->NumRows();
for (auto& rec : portionInfo->Records) {
out.Bytes += rec.BlobRange.Size;
for (auto& blobId : portionInfo->GetBlobIds()) {
out.Bytes += blobId.BlobSize();
}
out.Blobs += portionInfo->GetBlobIdsCount();
}
Expand Down
20 changes: 0 additions & 20 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,6 @@ class TColumnEngineStats {
i64 Rows = 0;
i64 Bytes = 0;
i64 RawBytes = 0;
THashMap<ui32, size_t> BytesByColumn;
THashMap<ui32, size_t> RawBytesByColumn;

TString DebugString() const {
return TStringBuilder() << "portions=" << Portions << ";blobs=" << Blobs << ";rows=" << Rows << ";bytes=" << Bytes << ";raw_bytes=" << RawBytes << ";";
Expand All @@ -94,14 +92,6 @@ class TColumnEngineStats {
result.Rows = kff * Rows;
result.Bytes = kff * Bytes;
result.RawBytes = kff * RawBytes;

for (auto&& i : BytesByColumn) {
result.BytesByColumn[i.first] = kff * i.second;
}

for (auto&& i : RawBytesByColumn) {
result.RawBytesByColumn[i.first] = kff * i.second;
}
return result;
}

Expand All @@ -115,21 +105,11 @@ class TColumnEngineStats {
Rows = SumVerifiedPositive(Rows, item.Rows);
Bytes = SumVerifiedPositive(Bytes, item.Bytes);
RawBytes = SumVerifiedPositive(RawBytes, item.RawBytes);
for (auto&& i : item.BytesByColumn) {
auto& v = BytesByColumn[i.first];
v = SumVerifiedPositive(v, i.second);
}

for (auto&& i : item.RawBytesByColumn) {
auto& v = RawBytesByColumn[i.first];
v = SumVerifiedPositive(v, i.second);
}
return *this;
}
};

i64 Tables{};
i64 ColumnRecords{};
THashMap<TPortionMeta::EProduced, TPortionsStats> StatsByType;

std::vector<ui32> GetKinds() const {
Expand Down
13 changes: 1 addition & 12 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ void TColumnEngineForLogs::UpdatePortionStats(const TPortionInfo& portionInfo, E
TColumnEngineStats::TPortionsStats DeltaStats(const TPortionInfo& portionInfo) {
TColumnEngineStats::TPortionsStats deltaStats;
deltaStats.Bytes = 0;
for (auto& rec : portionInfo.Records) {
deltaStats.BytesByColumn[rec.ColumnId] += rec.BlobRange.Size;
deltaStats.RawBytesByColumn[rec.ColumnId] += rec.GetMeta().GetRawBytes();
}
deltaStats.Rows = portionInfo.NumRows();
deltaStats.Bytes = portionInfo.GetTotalBlobBytes();
deltaStats.RawBytes = portionInfo.GetTotalRawBytes();
Expand All @@ -96,7 +92,6 @@ TColumnEngineStats::TPortionsStats DeltaStats(const TPortionInfo& portionInfo) {
void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, const TPortionInfo& portionInfo,
EStatsUpdateType updateType,
const TPortionInfo* exPortionInfo) const {
ui64 columnRecords = portionInfo.Records.size();
TColumnEngineStats::TPortionsStats deltaStats = DeltaStats(portionInfo);

Y_ABORT_UNLESS(!exPortionInfo || exPortionInfo->GetMeta().Produced != TPortionMeta::EProduced::UNSPECIFIED);
Expand All @@ -115,20 +110,14 @@ void TColumnEngineForLogs::UpdatePortionStats(TColumnEngineStats& engineStats, c
const bool isAdd = updateType == EStatsUpdateType::ADD;

if (isErase) { // PortionsToDrop
engineStats.ColumnRecords -= columnRecords;

stats -= deltaStats;
} else if (isAdd) { // Load || AppendedPortions
engineStats.ColumnRecords += columnRecords;

stats += deltaStats;
} else if (&srcStats != &stats || exPortionInfo) { // SwitchedPortions || PortionsToEvict
stats += deltaStats;

if (exPortionInfo) {
srcStats -= DeltaStats(*exPortionInfo);

engineStats.ColumnRecords += columnRecords - exPortionInfo->Records.size();
} else {
srcStats -= deltaStats;
}
Expand Down Expand Up @@ -513,7 +502,7 @@ void TColumnEngineForLogs::UpsertPortion(const TPortionInfo& portionInfo, const
}

bool TColumnEngineForLogs::ErasePortion(const TPortionInfo& portionInfo, bool updateStats) {
const ui64 portion = portionInfo.GetPortion();
const ui64 portion = portionInfo.GetPortionId();
auto& spg = MutableGranuleVerified(portionInfo.GetPathId());
auto p = spg.GetPortionOptional(portion);

Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRe
using IndexColumns = NColumnShard::Schema::IndexColumns;
auto removeSnapshot = portion.GetRemoveSnapshotOptional();
db.Table<IndexColumns>().Key(0, 0, row.ColumnId,
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortion(), row.Chunk).Update(
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortionId(), row.Chunk).Update(
NIceDb::TUpdate<IndexColumns::XPlanStep>(removeSnapshot ? removeSnapshot->GetPlanStep() : 0),
NIceDb::TUpdate<IndexColumns::XTxId>(removeSnapshot ? removeSnapshot->GetTxId() : 0),
NIceDb::TUpdate<IndexColumns::Blob>(portion.GetBlobId(row.GetBlobRange().GetBlobIdxVerified()).SerializeBinary()),
Expand All @@ -72,7 +72,7 @@ void TDbWrapper::WritePortion(const NOlap::TPortionInfo& portion) {
const auto insertWriteId = portion.GetInsertWriteIdOptional();
const auto minSnapshotDeprecated = portion.GetMinSnapshotDeprecated();
db.Table<IndexPortions>()
.Key(portion.GetPathId(), portion.GetPortion())
.Key(portion.GetPathId(), portion.GetPortionId())
.Update(NIceDb::TUpdate<IndexPortions::SchemaVersion>(portion.GetSchemaVersionVerified()),
NIceDb::TUpdate<IndexPortions::ShardingVersion>(portion.GetShardingVersionDef(0)),
NIceDb::TUpdate<IndexPortions::CommitPlanStep>(commitSnapshot ? commitSnapshot->GetPlanStep() : 0),
Expand All @@ -88,14 +88,14 @@ void TDbWrapper::WritePortion(const NOlap::TPortionInfo& portion) {
void TDbWrapper::ErasePortion(const NOlap::TPortionInfo& portion) {
NIceDb::TNiceDb db(Database);
using IndexPortions = NColumnShard::Schema::IndexPortions;
db.Table<IndexPortions>().Key(portion.GetPathId(), portion.GetPortion()).Delete();
db.Table<IndexPortions>().Key(portion.GetPathId(), portion.GetPortionId()).Delete();
}

void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
NIceDb::TNiceDb db(Database);
using IndexColumns = NColumnShard::Schema::IndexColumns;
db.Table<IndexColumns>().Key(0, 0, row.ColumnId,
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortion(), row.Chunk).Delete();
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortionId(), row.Chunk).Delete();
}

bool TDbWrapper::LoadColumns(const std::function<void(NOlap::TPortionInfoConstructor&&, const TColumnChunkLoadContext&)>& callback) {
Expand Down
5 changes: 0 additions & 5 deletions ydb/core/tx/columnshard/engines/portion_info.cpp

This file was deleted.

7 changes: 0 additions & 7 deletions ydb/core/tx/columnshard/engines/portion_info.h

This file was deleted.

4 changes: 0 additions & 4 deletions ydb/core/tx/columnshard/engines/portions/column_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ class TColumnRecord {
return ColumnId == item.ColumnId && Chunk == item.Chunk;
}

bool Valid() const {
return ColumnId && BlobRange.IsValid();
}

TString DebugString() const {
return TStringBuilder() << "column_id:" << ColumnId << ";"
<< "chunk_idx:" << Chunk << ";"
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/portions/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization)
TPortionInfo result(MetaConstructor.Build());
AFL_VERIFY(PathId);
result.PathId = PathId;
result.Portion = GetPortionIdVerified();
result.PortionId = GetPortionIdVerified();

AFL_VERIFY(MinSnapshotDeprecated);
AFL_VERIFY(MinSnapshotDeprecated->Valid());
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/engines/portions/portion_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ ui64 TPortionInfo::GetIndexRawBytes(const bool validation) const {

TString TPortionInfo::DebugString(const bool withDetails) const {
TStringBuilder sb;
sb << "(portion_id:" << Portion << ";" <<
sb << "(portion_id:" << PortionId << ";" <<
"path_id:" << PathId << ";records_count:" << NumRows() << ";"
"min_schema_snapshot:(" << MinSnapshotDeprecated.DebugString() << ");"
"schema_version:" << SchemaVersion.value_or(0) << ";"
Expand Down Expand Up @@ -213,7 +213,7 @@ ui64 TPortionInfo::GetTxVolume() const {

void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortionInfo& proto) const {
proto.SetPathId(PathId);
proto.SetPortionId(Portion);
proto.SetPortionId(PortionId);
proto.SetSchemaVersion(GetSchemaVersionVerified());
*proto.MutableMinSnapshotDeprecated() = MinSnapshotDeprecated.SerializeToProto();
if (!RemoveSnapshot.IsZero()) {
Expand All @@ -236,7 +236,7 @@ void TPortionInfo::SerializeToProto(NKikimrColumnShardDataSharingProto::TPortion

TConclusionStatus TPortionInfo::DeserializeFromProto(const NKikimrColumnShardDataSharingProto::TPortionInfo& proto) {
PathId = proto.GetPathId();
Portion = proto.GetPortionId();
PortionId = proto.GetPortionId();
SchemaVersion = proto.GetSchemaVersion();
for (auto&& i : proto.GetBlobIds()) {
auto blobId = TUnifiedBlobId::BuildFromProto(i);
Expand Down Expand Up @@ -502,7 +502,7 @@ void TPortionInfo::FullValidation() const {
CheckChunksOrder(Records);
CheckChunksOrder(Indexes);
AFL_VERIFY(PathId);
AFL_VERIFY(Portion);
AFL_VERIFY(PortionId);
AFL_VERIFY(MinSnapshotDeprecated.Valid());
std::set<ui32> blobIdxs;
for (auto&& i : Records) {
Expand Down
26 changes: 13 additions & 13 deletions ydb/core/tx/columnshard/engines/portions/portion_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class TPortionInfo {
std::optional<TInsertWriteId> InsertWriteId;

ui64 PathId = 0;
ui64 Portion = 0; // Id of independent (overlayed by PK) portion of data in pathId
ui64 PortionId = 0; // Id of independent (overlayed by PK) portion of data in pathId
TSnapshot MinSnapshotDeprecated = TSnapshot::Zero(); // {PlanStep, TxId} is min snapshot for {Granule, Portion}
TSnapshot RemoveSnapshot = TSnapshot::Zero(); // {XPlanStep, XTxId} is snapshot where the blob has been removed (i.e. compacted into another one)
std::optional<ui64> SchemaVersion;
Expand Down Expand Up @@ -132,7 +132,13 @@ class TPortionInfo {
}
}
}
std::vector<TColumnRecord> Records;

public:
const std::vector<TUnifiedBlobId>& GetBlobIds() const {
return BlobIds;
}

ui32 GetCompactionLevel() const {
return GetMeta().GetCompactionLevel();
}
Expand Down Expand Up @@ -299,8 +305,6 @@ class TPortionInfo {

std::vector<TPage> BuildPages() const;

std::vector<TColumnRecord> Records;

const std::vector<TColumnRecord>& GetRecords() const {
return Records;
}
Expand Down Expand Up @@ -338,12 +342,12 @@ class TPortionInfo {
}

ui64 GetPortionId() const {
return Portion;
return PortionId;
}

NJson::TJsonValue SerializeToJsonVisual() const {
NJson::TJsonValue result = NJson::JSON_MAP;
result.InsertValue("id", Portion);
result.InsertValue("id", PortionId);
result.InsertValue("s_max", RecordSnapshotMax().GetPlanStep() / 1000);
/*
result.InsertValue("s_min", RecordSnapshotMin().GetPlanStep());
Expand Down Expand Up @@ -416,7 +420,7 @@ class TPortionInfo {
return false;
}

bool ValidSnapshotInfo() const { return MinSnapshotDeprecated.Valid() && PathId && Portion; }
bool ValidSnapshotInfo() const { return MinSnapshotDeprecated.Valid() && PathId && PortionId; }
size_t NumChunks() const { return Records.size(); }

TString DebugString(const bool withDetails = false) const;
Expand All @@ -441,12 +445,8 @@ class TPortionInfo {
return HasRemoveSnapshot();
}

ui64 GetPortion() const {
return Portion;
}

TPortionAddress GetAddress() const {
return TPortionAddress(PathId, Portion);
return TPortionAddress(PathId, PortionId);
}

void ResetShardingVersion() {
Expand All @@ -457,8 +457,8 @@ class TPortionInfo {
PathId = pathId;
}

void SetPortion(const ui64 portion) {
Portion = portion;
void SetPortionId(const ui64 id) {
PortionId = id;
}


Expand Down
Loading

0 comments on commit 8fd095b

Please sign in to comment.