Skip to content

Commit

Permalink
wideflow => stream
Browse files Browse the repository at this point in the history
  • Loading branch information
MrLolthe1st committed Dec 21, 2023
1 parent 62c4d4c commit 4f60da1
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 36 deletions.
12 changes: 7 additions & 5 deletions ydb/library/yql/providers/dq/opt/dqs_opt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <ydb/library/yql/core/type_ann/type_ann_core.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
#include <ydb/library/yql/core/yql_type_annotation.h>
#include <ydb/library/yql/core/yql_opt_utils.h>

#include <ydb/library/yql/dq/opt/dq_opt.h>
#include <ydb/library/yql/dq/opt/dq_opt_phy.h>
Expand Down Expand Up @@ -92,12 +93,13 @@ namespace NYql::NDqs {
}

YQL_CLOG(INFO, ProviderDq) << "DqsRewritePhyBlockReadOnDqIntegration";

return Build<TCoWideFromBlocks>(ctx, node->Pos())
.Input(Build<TDqReadBlockWideWrap>(ctx, node->Pos())
.Input(readWideWrap.Input())
.Flags(readWideWrap.Flags())
.Token(readWideWrap.Token())
.Input(Build<TCoToFlow>(ctx, node->Pos())
.Input(Build<TDqReadBlockWideWrap>(ctx, node->Pos())
.Input(readWideWrap.Input())
.Flags(readWideWrap.Flags())
.Token(readWideWrap.Token())
.Done())
.Done())
.Done().Ptr();
}, ctx, optSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ class TDqsDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase {
}

types.push_back(ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64)));
input->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TMultiExprType>(types)));
return TStatus::Ok;
}

input->SetTypeAnn(ctx.MakeType<TFlowExprType>(ctx.MakeType<TMultiExprType>(types)));
Expand Down
54 changes: 23 additions & 31 deletions ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,10 @@ class TSource : public TNonCopyable {
using TPtr = std::shared_ptr<TSource>;
TSource(std::unique_ptr<TSettingsHolder>&& settings,
size_t inflight, TType* type, std::shared_ptr<std::vector<std::shared_ptr<arrow::DataType>>> types, const THolderFactory& holderFactory, NKikimr::NMiniKQL::IStatsRegistry* jobStats)
: Settings_(std::move(settings))
: HolderFactory(holderFactory)
, Settings_(std::move(settings))
, Inputs_(std::move(Settings_->RawInputs))
, Listener_(std::make_shared<TListener>(Inputs_.size(), inflight))
, HolderFactory_(holderFactory)
, JobStats_(jobStats)
{
auto structType = AS_TYPE(TStructType, type);
Expand All @@ -356,7 +356,7 @@ class TSource : public TNonCopyable {
LocalListeners_.back()->Init(LocalListeners_.back());
}
BlockBuilder_.Init(ptr, *Settings_->Pool, Settings_->PgBuilder);
FallbackReader_.SetSpecs(*Settings_->Specs, HolderFactory_);
FallbackReader_.SetSpecs(*Settings_->Specs, HolderFactory);
}

void RunRead() {
Expand Down Expand Up @@ -471,7 +471,7 @@ class TSource : public TNonCopyable {
while (FallbackReader_.IsValid()) {
auto currentRow = std::move(FallbackReader_.GetRow());
if (!Settings_->Specs->InputGroups.empty()) {
currentRow = std::move(HolderFactory_.CreateVariantHolder(currentRow.Release(), Settings_->Specs->InputGroups.at(Settings_->OriginalIndexes[idx])));
currentRow = std::move(HolderFactory.CreateVariantHolder(currentRow.Release(), Settings_->Specs->InputGroups.at(Settings_->OriginalIndexes[idx])));
}
BlockBuilder_.Add(currentRow);
FallbackReader_.Next();
Expand All @@ -497,6 +497,7 @@ class TSource : public TNonCopyable {
}
}

const THolderFactory& HolderFactory;
private:
NYT::NConcurrency::IThreadPoolPtr Pool_;
std::mutex Mtx_;
Expand All @@ -513,7 +514,6 @@ class TSource : public TNonCopyable {
TListener::TPtr Listener_;
TPtr Self_;
size_t Inflight_;
const THolderFactory& HolderFactory_;
NKikimr::NMiniKQL::IStatsRegistry* JobStats_;
};

Expand All @@ -525,52 +525,48 @@ class TReaderState: public TComputationValue<TReaderState> {
, Source_(std::move(source))
, Width_(width)
, Types_(arrowTypes)
, Result_(width)
{
}

EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
if (GotFinish_) {
return EFetchResult::Finish;
return NUdf::EFetchStatus::Finish;
}
YQL_ENSURE(width == Width_ + 1);
auto batch = Source_->Next();
if (!batch) {
GotFinish_ = 1;
Source_->Finish();
return EFetchResult::Finish;
return NUdf::EFetchStatus::Finish;
}

for (size_t i = 0; i < Width_; ++i) {
if (!output[i]) {
continue;
}
if(!batch->Columns[i].type()->Equals(Types_->at(i))) {
*(output[i]) = ctx.HolderFactory.CreateArrowBlock(ARROW_RESULT(arrow::compute::Cast(batch->Columns[i], Types_->at(i))));
continue;
}
*(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(batch->Columns[i]));
}
if (output[Width_]) {
*(output[Width_]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(ui64(batch->RowsCnt)));
YQL_ENSURE(batch->Columns[i].type()->Equals(Types_->at(i)));
output[i] = Source_->HolderFactory.CreateArrowBlock(std::move(batch->Columns[i]));
}
return EFetchResult::One;
output[Width_] = Source_->HolderFactory.CreateArrowBlock(arrow::Datum(ui64(batch->RowsCnt)));
return NUdf::EFetchStatus::Ok;
}

private:
TSource::TPtr Source_;
const size_t Width_;
std::shared_ptr<std::vector<std::shared_ptr<arrow::DataType>>> Types_;
std::vector<NUdf::TUnboxedValue*> Result_;
bool GotFinish_ = 0;
};
};

class TDqYtReadBlockWrapper : public TStatefulWideFlowComputationNode<TDqYtReadBlockWrapper> {
using TBaseComputation = TStatefulWideFlowComputationNode<TDqYtReadBlockWrapper>;
class TDqYtReadBlockWrapper : public TMutableComputationNode<TDqYtReadBlockWrapper> {
using TBaseComputation = TMutableComputationNode<TDqYtReadBlockWrapper>;
public:

TDqYtReadBlockWrapper(const TComputationNodeFactoryContext& ctx, const TString& clusterName,
const TString& token, const NYT::TNode& inputSpec, const NYT::TNode& samplingSpec,
const TVector<ui32>& inputGroups,
TType* itemType, const TVector<TString>& tableNames, TVector<std::pair<NYT::TRichYPath, NYT::TFormat>>&& tables, NKikimr::NMiniKQL::IStatsRegistry* jobStats, size_t inflight,
size_t timeout) : TBaseComputation(ctx.Mutables, this, EValueRepresentation::Boxed)
size_t timeout) : TBaseComputation(ctx.Mutables, EValueRepresentation::Boxed)
, Width_(AS_TYPE(TStructType, itemType)->GetMembersCount())
, CodecCtx_(ctx.Env, ctx.FunctionRegistry, &ctx.HolderFactory)
, ClusterName_(clusterName)
Expand All @@ -588,6 +584,9 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TDqYtReadBlockWrapper
}

void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
}

NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
auto settings = CreateInputStreams(true, Token_, ClusterName_, Timeout_, Inflight_ > 1, Tables_, SamplingSpec_);
settings->Specs = &Specs_;
settings->Pool = arrow::default_memory_pool();
Expand All @@ -598,14 +597,7 @@ using TBaseComputation = TStatefulWideFlowComputationNode<TDqYtReadBlockWrapper
}
auto source = std::make_shared<TSource>(std::move(settings), Inflight_, Type_, types, ctx.HolderFactory, JobStats_);
source->SetSelfAndRun(source);
state = ctx.HolderFactory.Create<TReaderState>(source, Width_, types);
}

EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (!state.HasValue()) {
MakeState(ctx, state);
}
return static_cast<TReaderState&>(*state.AsBoxed()).FetchValues(ctx, output);
return ctx.HolderFactory.Create<TReaderState>(source, Width_, types);
}

void RegisterDependencies() const final {}
Expand Down

0 comments on commit 4f60da1

Please sign in to comment.