Skip to content

Commit

Permalink
YDB FQ: introduce new TGenTable expr node (#14228)
Browse files Browse the repository at this point in the history
  • Loading branch information
vitalyisaev2 authored Feb 6, 2025
1 parent 877eef3 commit 0ee8cb7
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,23 @@
{"Index": 1, "Name": "DataSource", "Type": "TGenDataSource"}
]
},
{
"Name": "TGenTable",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "GenTable"},
"Children": [
{"Index": 0, "Name": "Name", "Type": "TCoAtom"},
{"Index": 1, "Name": "Splits", "Type": "TExprBase"}
]
},
{
"Name": "TGenReadTable",
"Base": "TCallable",
"Match": {"Type": "Callable", "Name": "GenReadTable!"},
"Children": [
{"Index": 0, "Name": "World", "Type": "TExprBase"},
{"Index": 1, "Name": "DataSource", "Type": "TGenDataSource"},
{"Index": 2, "Name": "Table", "Type": "TCoAtom"},
{"Index": 2, "Name": "Table", "Type": "TGenTable"},
{"Index": 3, "Name": "Columns", "Type": "TExprBase"},
{"Index": 4, "Name": "FilterPredicate", "Type": "TCoLambda"}
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ namespace NYql {
auto cluster = dataSource.Cast().Cluster();
tableNameBuilder << cluster.Value() << ".";
}
tableNameBuilder << '`' << maybeTable.Cast().Value() << '`';
tableNameBuilder << '`' << maybeTable.Cast().Name().Value() << '`';
inputs.push_back(
TPinInfo(maybeRead.DataSource().Raw(), nullptr, maybeTable.Cast().Raw(), tableNameBuilder, false));
return 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace NYql {
{
using TSelf = TGenericDataSourceTypeAnnotationTransformer;
AddHandler({TCoConfigure::CallableName()}, Hndl(&TSelf::HandleConfig));
AddHandler({TGenTable::CallableName()}, Hndl(&TSelf::HandleTable));
AddHandler({TGenReadTable::CallableName()}, Hndl(&TSelf::HandleReadTable));
AddHandler({TGenSourceSettings::CallableName()}, Hndl(&TSelf::HandleSourceSettings));
}
Expand All @@ -48,6 +49,19 @@ namespace NYql {
return TStatus::Ok;
}

TStatus HandleTable(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 2, ctx)) {
return TStatus::Error;
}

if (!EnsureAtom(*input->Child(TGenTable::idx_Name), ctx)) {
return TStatus::Error;
}

input->SetTypeAnn(ctx.MakeType<TUnitExprType>());
return TStatus::Ok;
}

TStatus HandleSourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
if (!EnsureArgsCount(*input, 6, ctx)) {
return TStatus::Error;
Expand Down Expand Up @@ -131,7 +145,7 @@ namespace NYql {
return TStatus::Error;
}

if (!EnsureAtom(*input->Child(TGenReadTable::idx_Table), ctx)) {
if (!EnsureCallable(*input->Child(TGenReadTable::idx_Table), ctx)) {
return TStatus::Error;
}

Expand All @@ -157,9 +171,21 @@ namespace NYql {
}
}

// Determine cluster name
TString clusterName{input->Child(TGenReadTable::idx_DataSource)->Child(1)->Content()};
TString tableName{input->Child(TGenReadTable::idx_Table)->Content()};

// Determine table name
const auto tableNode = input->Child(TGenReadTable::idx_Table);
if (!TGenTable::Match(tableNode)) {
ctx.AddError(TIssue(ctx.GetPosition(tableNode->Pos()),
TStringBuilder() << "Expected " << TGenTable::CallableName()));
return TStatus::Error;
}

TGenTable table(tableNode);
const auto tableName = table.Name().StringValue();

// Extract table metadata
auto [tableMeta, issue] = State_->GetTable(clusterName, tableName, ctx.GetPosition(input->Pos()));
if (issue.has_value()) {
ctx.AddError(issue.value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ namespace NYql {
.Input<TGenSourceSettings>()
.World(genReadTable.World())
.Cluster(genReadTable.DataSource().Cluster())
.Table(genReadTable.Table())
.Table(genReadTable.Table().Name())
.Token<TCoSecureParam>()
.Name().Build(token)
.Build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,17 @@ namespace NYql {
using namespace NKikimr;
using namespace NKikimr::NMiniKQL;

struct TGenericTableDescription {
using TPtr = std::shared_ptr<TGenericTableDescription>;

NYql::TGenericDataSourceInstance DataSourceInstance;
std::optional<NConnector::NApi::TDescribeTableResponse> Response;
};

class TGenericLoadTableMetadataTransformer: public TGraphTransformerBase {
struct TTableDescription {
using TPtr = std::shared_ptr<TTableDescription>;

NYql::TGenericDataSourceInstance DataSourceInstance;
std::optional<NConnector::NApi::TDescribeTableResponse> Response;
};

using TMapType =
std::unordered_map<TGenericState::TTableAddress, TGenericTableDescription::TPtr, THash<TGenericState::TTableAddress>>;
std::unordered_map<TGenericState::TTableAddress, TTableDescription::TPtr, THash<TGenericState::TTableAddress>>;

public:
TGenericLoadTableMetadataTransformer(TGenericState::TPtr state)
Expand Down Expand Up @@ -101,7 +102,7 @@ namespace NYql {
handles.emplace_back(promise.GetFuture());

// preserve data source instance for the further usage
auto emplaceIt = Results_.emplace(std::make_pair(item, std::make_shared<TGenericTableDescription>()));
auto emplaceIt = Results_.emplace(std::make_pair(item, std::make_shared<TTableDescription>()));
auto desc = emplaceIt.first->second;
desc->DataSourceInstance = request.data_source_instance();

Expand Down Expand Up @@ -171,17 +172,22 @@ namespace NYql {
auto row = Build<TCoArgument>(ctx, read.Pos())
.Name("row")
.Done();

auto emptyPredicate = Build<TCoLambda>(ctx, read.Pos())
.Args({row})
.Body<TCoBool>()
.Literal().Build("true")
.Build()
.Done().Ptr();

auto table = Build<TGenTable>(ctx, read.Pos())
.Name().Value(tableName).Build()
.Splits<TCoVoid>().Build().Done();

ins.first->second = Build<TGenReadTable>(ctx, read.Pos())
.World(read.World())
.DataSource(read.DataSource())
.Table().Value(tableName).Build()
.Table(table)
.Columns<TCoVoid>().Build()
.FilterPredicate(emptyPredicate)
.Done().Ptr();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ namespace NYql {
// Get table metadata
const auto [tableMeta, issue] = State_->GetTable(
read.DataSource().Cluster().Value(),
read.Table().Value(),
read.Table().Name().Value(),
ctx.GetPosition(node.Pos()));
if (issue.has_value()) {
ctx.AddError(issue.value());
Expand Down

0 comments on commit 0ee8cb7

Please sign in to comment.