Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sys view full for columnshards #786

Merged
merged 11 commits into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 98 additions & 80 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp

Large diffs are not rendered by default.

35 changes: 25 additions & 10 deletions ydb/core/sys_view/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -387,27 +387,42 @@ struct Schema : NIceDb::Schema {

struct PrimaryIndexStats : Table<10> {
struct PathId : Column<1, NScheme::NTypeIds::Uint64> {};
struct Kind : Column<2, NScheme::NTypeIds::Uint32> {};
struct Kind : Column<2, NScheme::NTypeIds::Utf8> {};
struct TabletId : Column<3, NScheme::NTypeIds::Uint64> {};
struct Rows : Column<4, NScheme::NTypeIds::Uint64> {};
struct Bytes : Column<5, NScheme::NTypeIds::Uint64> {};
struct RawBytes : Column<6, NScheme::NTypeIds::Uint64> {};
struct Portions : Column<7, NScheme::NTypeIds::Uint64> {};
struct Blobs : Column<8, NScheme::NTypeIds::Uint64> {};
struct RawBytes : Column<5, NScheme::NTypeIds::Uint64> {};
struct PortionId: Column<6, NScheme::NTypeIds::Uint64> {};
struct ChunkIdx : Column<7, NScheme::NTypeIds::Uint64> {};
struct ColumnName: Column<8, NScheme::NTypeIds::Utf8> {};
struct InternalColumnId : Column<9, NScheme::NTypeIds::Uint32> {};
struct BlobId : Column<10, NScheme::NTypeIds::Utf8> {};
struct BlobRangeOffset : Column<11, NScheme::NTypeIds::Uint64> {};
struct BlobRangeSize : Column<12, NScheme::NTypeIds::Uint64> {};
struct Activity : Column<13, NScheme::NTypeIds::Bool> {};
struct TierName : Column<14, NScheme::NTypeIds::Utf8> {};

using TKey = TableKey<
PathId,
Kind,
TabletId>;
TabletId,
PortionId,
ChunkIdx
>;
using TColumns = TableColumns<
PathId,
Kind,
TabletId,
Rows,
Bytes,
RawBytes,
Portions,
Blobs>;
PortionId,
ChunkIdx,
ColumnName,
InternalColumnId,
BlobId,
BlobRangeOffset,
BlobRangeSize,
Activity,
TierName
>;
};

struct StorageStats : Table<11> {
Expand Down
32 changes: 22 additions & 10 deletions ydb/core/tx/columnshard/columnshard__scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <ydb/library/yql/core/issue/yql_issue.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>
#include <ydb/services/metadata/request/common.h>
#include <ydb/core/tx/columnshard/engines/column_engine_logs.h>
#include <util/generic/noncopyable.h>

namespace NKikimr::NColumnShard {
Expand Down Expand Up @@ -644,29 +645,40 @@ PrepareStatsReadMetadata(ui64 tabletId, const NOlap::TReadDescription& read, con

auto out = std::make_shared<NOlap::TReadStatsMetadata>(tabletId,
isReverse ? NOlap::TReadStatsMetadata::ESorting::DESC : NOlap::TReadStatsMetadata::ESorting::ASC,
read.GetProgram());
read.GetProgram(), index ? index->GetVersionedIndex().GetSchema(read.GetSnapshot()) : nullptr, read.GetSnapshot());

out->SetPKRangesFilter(read.PKRangesFilter);
out->ReadColumnIds.assign(readColumnIds.begin(), readColumnIds.end());
out->ResultColumnIds = read.ColumnIds;

if (!index) {
const NOlap::TColumnEngineForLogs* logsIndex = dynamic_cast<const NOlap::TColumnEngineForLogs*>(index.get());
if (!index || !logsIndex) {
return out;
}

THashMap<ui64, THashSet<ui64>> portionsInUse;
for (auto&& filter : read.PKRangesFilter) {
const ui64 fromPathId = *filter.GetPredicateFrom().Get<arrow::UInt64Array>(0, 0, 1);
const ui64 toPathId = *filter.GetPredicateTo().Get<arrow::UInt64Array>(0, 0, Max<ui64>());
const auto& stats = index->GetStats();
if (read.TableName.EndsWith(NOlap::TIndexInfo::TABLE_INDEX_STATS_TABLE)) {
if (fromPathId <= read.PathId && toPathId >= read.PathId && stats.contains(read.PathId)) {
out->IndexStats[read.PathId] = std::make_shared<NOlap::TColumnEngineStats>(*stats.at(read.PathId));
if (fromPathId <= read.PathId && toPathId >= read.PathId) {
auto pathInfo = logsIndex->GetGranuleOptional(read.PathId);
if (!pathInfo) {
continue;
}
for (auto&& p : pathInfo->GetPortions()) {
if (portionsInUse[read.PathId].emplace(p.first).second) {
out->IndexPortions.emplace_back(p.second);
}
}
}
} else if (read.TableName.EndsWith(NOlap::TIndexInfo::STORE_INDEX_STATS_TABLE)) {
auto it = stats.lower_bound(fromPathId);
auto itEnd = stats.upper_bound(toPathId);
for (; it != itEnd; ++it) {
out->IndexStats[it->first] = std::make_shared<NOlap::TColumnEngineStats>(*it->second);
auto pathInfos = logsIndex->GetTables(fromPathId, toPathId);
for (auto&& pathInfo: pathInfos) {
for (auto&& p: pathInfo->GetPortions()) {
if (portionsInUse[p.second->GetPathId()].emplace(p.first).second) {
out->IndexPortions.emplace_back(p.second);
}
}
}
}
}
Expand Down
79 changes: 40 additions & 39 deletions ydb/core/tx/columnshard/columnshard__stats_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,64 +26,65 @@ std::optional<NOlap::TPartialReadResult> TStatsIterator::GetBatch() {
}

std::shared_ptr<arrow::RecordBatch> TStatsIterator::FillStatsBatch() {
ui64 numRows = 0;
numRows += NOlap::TColumnEngineStats::GetRecordsCount() * IndexStats.size();

std::vector<std::shared_ptr<NOlap::TPortionInfo>> portions;
ui32 recordsCount = 0;
while (IndexPortions.size()) {
auto& i = IndexPortions.front();
recordsCount += i->Records.size();
portions.emplace_back(i);
IndexPortions.pop_front();
if (recordsCount > 10000) {
break;
}
}
std::vector<ui32> allColumnIds;
for (const auto& c : PrimaryIndexStatsSchema.Columns) {
allColumnIds.push_back(c.second.Id);
}
std::sort(allColumnIds.begin(), allColumnIds.end());
auto schema = NOlap::MakeArrowSchema(PrimaryIndexStatsSchema.Columns, allColumnIds);
auto builders = NArrow::MakeBuilders(schema, numRows);
auto builders = NArrow::MakeBuilders(schema, recordsCount);

while (!IndexStats.empty()) {
auto it = Reverse ? std::prev(IndexStats.end()) : IndexStats.begin();
const auto& stats = it->second;
Y_ABORT_UNLESS(stats);
AppendStats(builders, it->first, *stats);
IndexStats.erase(it);
for (auto&& p: portions) {
AppendStats(builders, *p);
}

auto columns = NArrow::Finish(std::move(builders));
return arrow::RecordBatch::Make(schema, numRows, columns);
return arrow::RecordBatch::Make(schema, recordsCount, columns);
}

void TStatsIterator::ApplyRangePredicates(std::shared_ptr<arrow::RecordBatch>& batch) {
NArrow::TColumnFilter filter = ReadMetadata->GetPKRangesFilter().BuildFilter(batch);
filter.Apply(batch);
}

void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, ui64 pathId, const NOlap::TColumnEngineStats& stats) {
auto kinds = stats.GetKinds();
auto pathIds = stats.GetConstValues<arrow::UInt64Type::c_type>(pathId);
auto tabletIds = stats.GetConstValues<arrow::UInt64Type::c_type>(ReadMetadata->TabletId);
auto rows = stats.GetRowsValues();
auto bytes = stats.GetBytesValues();
auto rawBytes = stats.GetRawBytesValues();
auto portions = stats.GetPortionsValues();
auto blobs = stats.GetBlobsValues();

void TStatsIterator::AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const NOlap::TPortionInfo& portion) {
std::vector<const NOlap::TColumnRecord*> records;
for (auto&& r: portion.Records) {
records.emplace_back(&r);
}
if (Reverse) {
std::reverse(std::begin(pathIds), std::end(pathIds));
std::reverse(std::begin(kinds), std::end(kinds));
std::reverse(std::begin(tabletIds), std::end(tabletIds));
std::reverse(std::begin(rows), std::end(rows));
std::reverse(std::begin(bytes), std::end(bytes));
std::reverse(std::begin(rawBytes), std::end(rawBytes));
std::reverse(std::begin(portions), std::end(portions));
std::reverse(std::begin(blobs), std::end(blobs));
std::reverse(records.begin(), records.end());
}
for (auto&& r: records) {
NArrow::Append<arrow::UInt64Type>(*builders[0], portion.GetPathId());
const std::string prod = ::ToString(portion.GetMeta().Produced);
NArrow::Append<arrow::StringType>(*builders[1], prod);
NArrow::Append<arrow::UInt64Type>(*builders[2], ReadMetadata->TabletId);
NArrow::Append<arrow::UInt64Type>(*builders[3], r->GetMeta().GetNumRowsVerified());
NArrow::Append<arrow::UInt64Type>(*builders[4], r->GetMeta().GetRawBytesVerified());
NArrow::Append<arrow::UInt64Type>(*builders[5], portion.GetPortionId());
NArrow::Append<arrow::UInt64Type>(*builders[6], r->GetChunkIdx());
NArrow::Append<arrow::StringType>(*builders[7], ReadMetadata->GetColumnNameDef(r->GetColumnId()).value_or("undefined"));
NArrow::Append<arrow::UInt32Type>(*builders[8], r->GetColumnId());
std::string blobIdString = r->BlobRange.ToString();
NArrow::Append<arrow::StringType>(*builders[9], blobIdString);
NArrow::Append<arrow::UInt64Type>(*builders[10], r->BlobRange.Offset);
NArrow::Append<arrow::UInt64Type>(*builders[11], r->BlobRange.Size);
NArrow::Append<arrow::BooleanType>(*builders[12], !portion.HasRemoveSnapshot() || ReadMetadata->GetRequestSnapshot() < portion.GetRemoveSnapshot());
std::string strTierName(portion.GetMeta().GetTierName().data(), portion.GetMeta().GetTierName().size());
NArrow::Append<arrow::StringType>(*builders[13], strTierName);
}

NArrow::Append<arrow::UInt64Type>(*builders[0], pathIds);
NArrow::Append<arrow::UInt32Type>(*builders[1], kinds);
NArrow::Append<arrow::UInt64Type>(*builders[2], tabletIds);
NArrow::Append<arrow::UInt64Type>(*builders[3], rows);
NArrow::Append<arrow::UInt64Type>(*builders[4], bytes);
NArrow::Append<arrow::UInt64Type>(*builders[5], rawBytes);
NArrow::Append<arrow::UInt64Type>(*builders[6], portions);
NArrow::Append<arrow::UInt64Type>(*builders[7], blobs);
}

}

12 changes: 7 additions & 5 deletions ydb/core/tx/columnshard/columnshard__stats_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,18 @@ class TStatsIterator : public TScanIteratorBase {
, Reverse(ReadMetadata->IsDescSorted())
, KeySchema(NOlap::MakeArrowSchema(PrimaryIndexStatsSchema.Columns, PrimaryIndexStatsSchema.KeyColumns))
, ResultSchema(NOlap::MakeArrowSchema(PrimaryIndexStatsSchema.Columns, ReadMetadata->ResultColumnIds))
, IndexStats(ReadMetadata->IndexStats.begin(), ReadMetadata->IndexStats.end())
, IndexPortions(ReadMetadata->IndexPortions)
{
if (ResultSchema->num_fields() == 0) {
ResultSchema = KeySchema;
}
if (Reverse) {
std::reverse(IndexPortions.begin(), IndexPortions.end());
}
}

bool Finished() const override {
return IndexStats.empty();
return IndexPortions.empty();
}

std::optional<NOlap::TPartialReadResult> GetBatch() override;
Expand All @@ -73,14 +76,13 @@ class TStatsIterator : public TScanIteratorBase {
std::shared_ptr<arrow::Schema> KeySchema;
std::shared_ptr<arrow::Schema> ResultSchema;

TMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>> IndexStats;
std::deque<std::shared_ptr<NOlap::TPortionInfo>> IndexPortions;

std::shared_ptr<arrow::RecordBatch> FillStatsBatch();

void ApplyRangePredicates(std::shared_ptr<arrow::RecordBatch>& batch);

void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders,
ui64 pathId, const NOlap::TColumnEngineStats& stats);
void AppendStats(const std::vector<std::unique_ptr<arrow::ArrayBuilder>>& builders, const NOlap::TPortionInfo& portion);
};

}
11 changes: 11 additions & 0 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,17 @@ class TColumnEngineForLogs : public IColumnEngine {
return it->second;
}

std::vector<std::shared_ptr<TGranuleMeta>> GetTables(const ui64 pathIdFrom, const ui64 pathIdTo) const {
std::vector<std::shared_ptr<TGranuleMeta>> result;
for (auto&& i : Tables) {
if (i.first < pathIdFrom || i.first > pathIdTo) {
continue;
}
result.emplace_back(i.second);
}
return result;
}

ui64 GetTabletId() const {
return TabletId;
}
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/column_record.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ struct TColumnRecord {
}
};

ui32 GetColumnId() const {
return ColumnId;
}
ui16 GetChunkIdx() const {
return Chunk;
}

TColumnSerializationStat GetSerializationStat(const std::string& columnName) const {
TColumnSerializationStat result(ColumnId, columnName);
result.Merge(GetSerializationStat());
Expand Down
24 changes: 21 additions & 3 deletions ydb/core/tx/columnshard/engines/reader/read_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,18 +255,36 @@ struct TReadMetadata : public TReadMetadataBase, public std::enable_shared_from_
struct TReadStatsMetadata : public TReadMetadataBase, public std::enable_shared_from_this<TReadStatsMetadata> {
private:
using TBase = TReadMetadataBase;
TSnapshot RequestSnapshot;
std::shared_ptr<ISnapshotSchema> ResultIndexSchema;
public:
using TConstPtr = std::shared_ptr<const TReadStatsMetadata>;

const ui64 TabletId;
std::vector<ui32> ReadColumnIds;
std::vector<ui32> ResultColumnIds;
THashMap<ui64, std::shared_ptr<NOlap::TColumnEngineStats>> IndexStats;
std::deque<std::shared_ptr<NOlap::TPortionInfo>> IndexPortions;

const TSnapshot& GetRequestSnapshot() const { return RequestSnapshot; }

explicit TReadStatsMetadata(ui64 tabletId, const ESorting sorting, const TProgramContainer& ssaProgram)
std::optional<std::string> GetColumnNameDef(const ui32 columnId) const {
if (!ResultIndexSchema) {
return {};
}
auto f = ResultIndexSchema->GetFieldByColumnId(columnId);
if (!f) {
return {};
}
return f->name();
}

explicit TReadStatsMetadata(ui64 tabletId, const ESorting sorting, const TProgramContainer& ssaProgram, const std::shared_ptr<ISnapshotSchema>& schema, const TSnapshot& requestSnapshot)
: TBase(sorting, ssaProgram)
, RequestSnapshot(requestSnapshot)
, ResultIndexSchema(schema)
, TabletId(tabletId)
{}
{
}

std::vector<std::pair<TString, NScheme::TTypeInfo>> GetKeyYqlSchema() const override;

Expand Down
Loading