Skip to content

Commit

Permalink
HLL in YT statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
alephonea committed Sep 5, 2024
1 parent a20c225 commit 153e432
Show file tree
Hide file tree
Showing 14 changed files with 464 additions and 104 deletions.
1 change: 1 addition & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ TYtConfiguration::TYtConfiguration()
});
REGISTER_SETTING(*this, MinColumnGroupSize).Lower(2);
REGISTER_SETTING(*this, MaxColumnGroups);
REGISTER_SETTING(*this, ExtendedStatsMaxChunkCount);
}

EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings) {
Expand Down
1 change: 1 addition & 0 deletions ydb/library/yql/providers/yt/common/yql_yt_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ struct TYtSettings {
NCommon::TConfSetting<EColumnGroupMode, false> ColumnGroupMode;
NCommon::TConfSetting<ui16, false> MinColumnGroupSize;
NCommon::TConfSetting<ui16, false> MaxColumnGroups;
NCommon::TConfSetting<ui64, false> ExtendedStatsMaxChunkCount;
};

EReleaseTempDataMode GetReleaseTempDataMode(const TYtSettings& settings);
Expand Down
56 changes: 45 additions & 11 deletions ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ TMaybe<ui64> TTransactionCache::TEntry::GetColumnarStat(NYT::TRichYPath ytPath)

auto guard = Guard(Lock_);
if (auto p = StatisticsCache.FindPtr(NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text))) {
ui64 sum = p->LegacyChunksDataWeight;
ui64 sum = p->ColumnarStat.LegacyChunksDataWeight;
for (auto& column: columns) {
if (auto c = p->ColumnDataWeight.FindPtr(column)) {
if (auto c = p->ColumnarStat.ColumnDataWeight.FindPtr(column)) {
sum += *c;
} else {
return Nothing();
Expand All @@ -114,30 +114,64 @@ TMaybe<ui64> TTransactionCache::TEntry::GetColumnarStat(NYT::TRichYPath ytPath)
return Nothing();
}

TMaybe<NYT::TTableColumnarStatistics> TTransactionCache::TEntry::GetExtendedColumnarStat(NYT::TRichYPath ytPath) const {
TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
ytPath.Columns_.Clear();
auto cacheKey = NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text);

auto guard = Guard(Lock_);
auto p = StatisticsCache.FindPtr(cacheKey);
if (!p) {
return Nothing();
}

NYT::TTableColumnarStatistics res;
for (auto& column: columns) {
if (p->ExtendedStatColumns.count(column) == 0) {
return Nothing();
}
if (auto c = p->ColumnarStat.ColumnDataWeight.FindPtr(column)) {
res.ColumnDataWeight[column] = *c;
}
if (auto c = p->ColumnarStat.ColumnEstimatedUniqueCounts.FindPtr(column)) {
res.ColumnEstimatedUniqueCounts[column] = *c;
}
}
return res;
}

void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, ui64 size) {
YQL_ENSURE(ytPath.Columns_.Defined());
TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
ytPath.Columns_.Clear();
auto cacheKey = NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text);

auto guard = Guard(Lock_);
NYT::TTableColumnarStatistics& cacheColumnStat = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
cacheColumnStat.LegacyChunksDataWeight = size;
for (auto& c: cacheColumnStat.ColumnDataWeight) {
auto& cacheEntry = StatisticsCache[cacheKey];
cacheEntry.ColumnarStat.LegacyChunksDataWeight = size;
for (auto& c: cacheEntry.ColumnarStat.ColumnDataWeight) {
c.second = 0;
}
for (auto& c: columns) {
cacheColumnStat.ColumnDataWeight[c] = 0;
cacheEntry.ColumnarStat.ColumnDataWeight[c] = 0;
}
}

void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat) {
void TTransactionCache::TEntry::UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended) {
TVector<TString> columns(std::move(ytPath.Columns_->Parts_));
ytPath.Columns_.Clear();
auto guard = Guard(Lock_);
NYT::TTableColumnarStatistics& cacheColumnStat = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
cacheColumnStat.LegacyChunksDataWeight = columnStat.LegacyChunksDataWeight;
cacheColumnStat.TimestampTotalWeight = columnStat.TimestampTotalWeight;
auto& cacheEntry = StatisticsCache[NYT::NodeToCanonicalYsonString(NYT::PathToNode(ytPath), NYT::NYson::EYsonFormat::Text)];
if (extended) {
std::copy(columns.begin(), columns.end(), std::inserter(cacheEntry.ExtendedStatColumns, cacheEntry.ExtendedStatColumns.end()));
}
cacheEntry.ColumnarStat.LegacyChunksDataWeight = columnStat.LegacyChunksDataWeight;
cacheEntry.ColumnarStat.TimestampTotalWeight = columnStat.TimestampTotalWeight;
for (auto& c: columnStat.ColumnDataWeight) {
cacheColumnStat.ColumnDataWeight[c.first] = c.second;
cacheEntry.ColumnarStat.ColumnDataWeight[c.first] = c.second;
}
for (auto& c : columnStat.ColumnEstimatedUniqueCounts) {
cacheEntry.ColumnarStat.ColumnEstimatedUniqueCounts[c.first] = c.second;
}
}

Expand Down
13 changes: 11 additions & 2 deletions ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class TTransactionCache {
bool KeepTables = false;
THashMap<std::pair<TString, ui32>, std::tuple<TString, NYT::TTransactionId, ui64>> Snapshots; // {tablepath, epoch} -> {table_id, transaction_id, revision}
NYT::TNode TransactionSpec;
THashMap<TString, NYT::TTableColumnarStatistics> StatisticsCache;
THashMap<TString, TString> BinarySnapshots; // remote path -> snapshot path
NYT::ITransactionPtr BinarySnapshotTx;
THashMap<TString, NYT::ITransactionPtr> CheckpointTxs;
Expand Down Expand Up @@ -114,8 +113,10 @@ class TTransactionCache {
void CompleteWriteTx(const NYT::TTransactionId& id, bool abort);

TMaybe<ui64> GetColumnarStat(NYT::TRichYPath ytPath) const;
TMaybe<NYT::TTableColumnarStatistics> GetExtendedColumnarStat(NYT::TRichYPath ytPath) const;

void UpdateColumnarStat(NYT::TRichYPath ytPath, ui64 size);
void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat);
void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended = false);

std::pair<TString, NYT::TTransactionId> GetBinarySnapshot(TString remoteTmpFolder, const TString& md5, const TString& localPath, TDuration expirationInterval);

Expand All @@ -124,9 +125,17 @@ class TTransactionCache {
using TPtr = TIntrusivePtr<TEntry>;

private:
struct TStatisticsCacheEntry {
std::unordered_set<TString> ExtendedStatColumns;
NYT::TTableColumnarStatistics ColumnarStat;
};

THashMap<TString, TStatisticsCacheEntry> StatisticsCache;

void DeleteAtFinalizeUnlocked(const TString& table, bool isInternal);
bool CancelDeleteAtFinalizeUnlocked(const TString& table, bool isInternal);
void DoRemove(const TString& table);
void UpdateColumnarStatLocked(NYT::TTableColumnarStatistics& cacheColumnStat, const NYT::TTableColumnarStatistics& columnStat);

size_t ExternalTempTablesCount = 0;
};
Expand Down
85 changes: 64 additions & 21 deletions ydb/library/yql/providers/yt/gateway/native/yql_yt_native.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,27 @@ inline TType OptionFromNode(const NYT::TNode& value) {
}
}

void PopulatePathStatResult(IYtGateway::TPathStatResult& out, int index, NYT::TTableColumnarStatistics& extendedStat) {
for (const auto& entry : extendedStat.ColumnDataWeight) {
out.DataSize[index] += entry.second;
}
out.Extended[index] = IYtGateway::TPathStatResult::TExtendedResult{
.DataWeight = extendedStat.ColumnDataWeight,
.EstimatedUniqueCounts = extendedStat.ColumnEstimatedUniqueCounts
};
}

TString DebugPath(NYT::TRichYPath path) {
constexpr int maxDebugColumns = 20;
if (!path.Columns_ || std::ssize(path.Columns_->Parts_) <= maxDebugColumns) {
return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text);
}
int numColumns = std::ssize(path.Columns_->Parts_);
path.Columns_->Parts_.erase(path.Columns_->Parts_.begin() + maxDebugColumns, path.Columns_->Parts_.end());
path.Columns_->Parts_.push_back("...");
return NYT::NodeToCanonicalYsonString(NYT::PathToNode(path), NYT::NYson::EYsonFormat::Text) + " (" + std::to_string(numColumns) + " columns)";
}

} // unnamed

///////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -4505,13 +4526,15 @@ class TYtNativeGateway : public IYtGateway {
try {
TPathStatResult res;
res.DataSize.resize(execCtx->Options_.Paths().size(), 0);
res.Extended.resize(execCtx->Options_.Paths().size());

auto entry = execCtx->GetOrCreateEntry();
auto tx = entry->Tx;
const TString tmpFolder = GetTablesTmpFolder(*execCtx->Options_.Config());
const NYT::EOptimizeForAttr tmpOptimizeFor = execCtx->Options_.Config()->OptimizeFor.Get(execCtx->Cluster_).GetOrElse(NYT::EOptimizeForAttr::OF_LOOKUP_ATTR);
TVector<NYT::TRichYPath> ytPaths(Reserve(execCtx->Options_.Paths().size()));
TVector<size_t> pathMap;
bool extended = execCtx->Options_.Extended();

auto extractSysColumns = [] (NYT::TRichYPath& ytPath) -> TVector<TString> {
TVector<TString> res;
Expand Down Expand Up @@ -4555,16 +4578,19 @@ class TYtNativeGateway : public IYtGateway {
YQL_CLOG(INFO, ProviderYt) << "Adding stat for " << col << ": " << size << " (virtual)";
}
}
if (auto val = entry->GetColumnarStat(ytPath)) {
res.DataSize[i] += *val;
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << ": " << res.DataSize[i] << " (from cache)";
TMaybe<ui64> cachedStat;
TMaybe<NYT::TTableColumnarStatistics> cachedExtendedStat;
if (!extended && (cachedStat = entry->GetColumnarStat(ytPath))) {
res.DataSize[i] += *cachedStat;
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << ": " << res.DataSize[i] << " (from cache, extended: false)";
} else if (extended && (cachedExtendedStat = entry->GetExtendedColumnarStat(ytPath))) {
PopulatePathStatResult(res, i, *cachedExtendedStat);
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (from cache, extended: true)";
} else if (onlyCached) {
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " is missing in cache - sync path stat failed";
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " is missing in cache - sync path stat failed (extended: " << extended << ")";
return res;
} else if (NYT::EOptimizeForAttr::OF_SCAN_ATTR == tmpOptimizeFor) {
pathMap.push_back(i);
ytPaths.push_back(ytPath);
} else {
} else if (NYT::EOptimizeForAttr::OF_SCAN_ATTR != tmpOptimizeFor && !extended) {

// Use entire table size for lookup tables (YQL-7257)
if (attrs.IsUndefined()) {
attrs = tx->Get(ytPath.Path_ + "/@", NYT::TGetOptions().AttributeFilter(
Expand All @@ -4576,7 +4602,10 @@ class TYtNativeGateway : public IYtGateway {
auto size = CalcDataSize(ytPath, attrs);
res.DataSize[i] += size;
entry->UpdateColumnarStat(ytPath, size);
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << ": " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << ": " << res.DataSize[i] << " (uncompressed_data_size for lookup, extended: false)";
} else {
ytPaths.push_back(ytPath);
pathMap.push_back(i);
}
} else {
auto p = entry->Snapshots.FindPtr(std::make_pair(tablePath, req.Epoch()));
Expand Down Expand Up @@ -4607,11 +4636,19 @@ class TYtNativeGateway : public IYtGateway {
YQL_CLOG(INFO, ProviderYt) << "Adding stat for " << col << ": " << size << " (virtual)";
}
}
if (auto val = entry->GetColumnarStat(ytPath)) {
res.DataSize[i] += *val;
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (from cache)";
TMaybe<ui64> cachedStat;
TMaybe<NYT::TTableColumnarStatistics> cachedExtendedStat;
if (!extended && (cachedStat = entry->GetColumnarStat(ytPath))) {
res.DataSize[i] += *cachedStat;
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (from cache, extended: false)";
} else if (extended && (cachedExtendedStat = entry->GetExtendedColumnarStat(ytPath))) {
PopulatePathStatResult(res, i, *cachedExtendedStat);
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (from cache, extended: true)";
} else if (onlyCached) {
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << ") is missing in cache - sync path stat failed";
YQL_CLOG(INFO, ProviderYt)
<< "Stat for " << DebugPath(req.Path())
<< " (epoch=" << req.Epoch() << ", extended: " << extended
<< ") is missing in cache - sync path stat failed";
return res;
} else {
if (attrs.IsUndefined()) {
Expand All @@ -4623,40 +4660,46 @@ class TYtNativeGateway : public IYtGateway {
.AddAttribute(TString("schema"))
));
}
if (attrs.HasKey("optimize_for") && attrs["optimize_for"] == "scan" &&
AllPathColumnsAreInSchema(req.Path(), attrs))
if (extended ||
(attrs.HasKey("optimize_for") && attrs["optimize_for"] == "scan" &&
AllPathColumnsAreInSchema(req.Path(), attrs)))
{
pathMap.push_back(i);
ytPaths.push_back(ytPath);
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << ") add for request with path " << ytPath.Path_;
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << ") add for request with path " << ytPath.Path_ << " (extended: " << extended << ")";
} else {
// Use entire table size for lookup tables (YQL-7257)
auto size = CalcDataSize(ytPath, attrs);
res.DataSize[i] += size;
entry->UpdateColumnarStat(ytPath, size);
YQL_CLOG(INFO, ProviderYt) << "Stat for " << req.Path().Path_ << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
YQL_CLOG(INFO, ProviderYt) << "Stat for " << DebugPath(req.Path()) << " (epoch=" << req.Epoch() << "): " << res.DataSize[i] << " (uncompressed_data_size for lookup)";
}
}
}
}

if (ytPaths) {
YQL_ENSURE(!onlyCached);
auto fetchMode = execCtx->Options_.Config()->JoinColumnarStatisticsFetcherMode.Get().GetOrElse(NYT::EColumnarStatisticsFetcherMode::Fallback);
auto fetchMode = extended ?
NYT::EColumnarStatisticsFetcherMode::FromNodes :
execCtx->Options_.Config()->JoinColumnarStatisticsFetcherMode.Get().GetOrElse(NYT::EColumnarStatisticsFetcherMode::Fallback);
auto columnStats = tx->GetTableColumnarStatistics(ytPaths, NYT::TGetTableColumnarStatisticsOptions().FetcherMode(fetchMode));
YQL_ENSURE(pathMap.size() == columnStats.size());
for (size_t i: xrange(columnStats.size())) {
for (size_t i: xrange(columnStats.size())) {
auto& columnStat = columnStats[i];
const ui64 weight = columnStat.LegacyChunksDataWeight +
Accumulate(columnStat.ColumnDataWeight.begin(), columnStat.ColumnDataWeight.end(), 0ull,
[](ui64 sum, decltype(*columnStat.ColumnDataWeight.begin())& v) { return sum + v.second; });

if (extended) {
PopulatePathStatResult(res, pathMap[i], columnStat);
}

res.DataSize[pathMap[i]] += weight;
entry->UpdateColumnarStat(ytPaths[i], columnStat);
entry->UpdateColumnarStat(ytPaths[i], columnStat, extended);
YQL_CLOG(INFO, ProviderYt) << "Stat for " << execCtx->Options_.Paths()[pathMap[i]].Path().Path_ << ": " << weight << " (fetched)";
}
}

res.SetSuccess();
return res;
} catch (...) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#include "yql_yt_transform.h"
#include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h>

#include <ydb/library/yql/providers/yt/lib/skiff/yql_skiff_schema.h>
#include <ydb/library/yql/providers/yt/common/yql_names.h>
#include <ydb/library/yql/providers/yt/common/yql_configuration.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>
#include <ydb/library/yql/providers/yt/codec/yt_codec.h>
#include <ydb/library/yql/providers/yt/gateway/lib/yt_helpers.h>
#include <ydb/library/yql/providers/yt/expr_nodes/yql_yt_expr_nodes.h>
#include <ydb/library/yql/providers/common/codec/yql_codec_type_flags.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <ydb/library/yql/core/yql_type_helpers.h>
#include <ydb/library/yql/core/yql_opt_utils.h>
#include <ydb/library/yql/providers/common/provider/yql_provider.h>

#include <ydb/library/yql/utils/log/log.h>

Expand Down Expand Up @@ -321,6 +322,7 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::EarlyMergeJoin(TExprBas

TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBase node, TExprContext& ctx) const {
auto equiJoin = node.Cast<TYtEquiJoin>();
auto cluster = equiJoin.DataSink().Cluster().StringValue();

const bool tryReorder = State_->Types->CostBasedOptimizer != ECostBasedOptimizerType::Disable
&& equiJoin.Input().Size() > 2
Expand All @@ -338,10 +340,19 @@ TMaybeNode<TExprBase> TYtPhysicalOptProposalTransformer::RuntimeEquiJoin(TExprBa
}
}
}

const auto tree = ImportYtEquiJoin(equiJoin, ctx);

const TMaybe<ui64> maxChunkCountExtendedStats = State_->Configuration->ExtendedStatsMaxChunkCount.Get();

if (tryReorder && waitAllInputs && maxChunkCountExtendedStats) {
YQL_CLOG(INFO, ProviderYt) << "Collecting cbo stats for equiJoin";
auto collectStatus = CollectCboStats(cluster, *tree, State_, ctx);
if (collectStatus == TStatus::Repeat) {
return ExportYtEquiJoin(equiJoin, *tree, ctx, State_);
}
}
if (tryReorder) {
const auto optimizedTree = OrderJoins(tree, State_, ctx);
const auto optimizedTree = OrderJoins(tree, State_, cluster, ctx);
if (optimizedTree != tree) {
return ExportYtEquiJoin(equiJoin, *optimizedTree, ctx, State_);
}
Expand Down
Loading

0 comments on commit 153e432

Please sign in to comment.