diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp index d8ae81155dbd..994de78b22b5 100644 --- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp +++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -92,12 +93,13 @@ namespace NYql::NDqs { } YQL_CLOG(INFO, ProviderDq) << "DqsRewritePhyBlockReadOnDqIntegration"; - return Build(ctx, node->Pos()) - .Input(Build(ctx, node->Pos()) - .Input(readWideWrap.Input()) - .Flags(readWideWrap.Flags()) - .Token(readWideWrap.Token()) + .Input(Build(ctx, node->Pos()) + .Input(Build(ctx, node->Pos()) + .Input(readWideWrap.Input()) + .Flags(readWideWrap.Flags()) + .Token(readWideWrap.Token()) + .Done()) .Done()) .Done().Ptr(); }, ctx, optSettings); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp index b7e18d36a3b0..841e20f13335 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp @@ -155,6 +155,8 @@ class TDqsDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { } types.push_back(ctx.MakeType(ctx.MakeType(EDataSlot::Uint64))); + input->SetTypeAnn(ctx.MakeType(ctx.MakeType(types))); + return TStatus::Ok; } input->SetTypeAnn(ctx.MakeType(ctx.MakeType(types))); diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp index d380ae4ef9f9..777e153e25c5 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp @@ -333,10 +333,10 @@ class TSource : public TNonCopyable { using TPtr = std::shared_ptr; TSource(std::unique_ptr&& settings, size_t inflight, TType* type, std::shared_ptr>> types, const THolderFactory& holderFactory, NKikimr::NMiniKQL::IStatsRegistry* jobStats) - : Settings_(std::move(settings)) + : HolderFactory(holderFactory) + , Settings_(std::move(settings)) , Inputs_(std::move(Settings_->RawInputs)) , Listener_(std::make_shared(Inputs_.size(), inflight)) - , HolderFactory_(holderFactory) , JobStats_(jobStats) { auto structType = AS_TYPE(TStructType, type); @@ -356,7 +356,7 @@ class TSource : public TNonCopyable { LocalListeners_.back()->Init(LocalListeners_.back()); } BlockBuilder_.Init(ptr, *Settings_->Pool, Settings_->PgBuilder); - FallbackReader_.SetSpecs(*Settings_->Specs, HolderFactory_); + FallbackReader_.SetSpecs(*Settings_->Specs, HolderFactory); } void RunRead() { @@ -471,7 +471,7 @@ class TSource : public TNonCopyable { while (FallbackReader_.IsValid()) { auto currentRow = std::move(FallbackReader_.GetRow()); if (!Settings_->Specs->InputGroups.empty()) { - currentRow = std::move(HolderFactory_.CreateVariantHolder(currentRow.Release(), Settings_->Specs->InputGroups.at(Settings_->OriginalIndexes[idx]))); + currentRow = std::move(HolderFactory.CreateVariantHolder(currentRow.Release(), Settings_->Specs->InputGroups.at(Settings_->OriginalIndexes[idx]))); } BlockBuilder_.Add(currentRow); FallbackReader_.Next(); @@ -497,6 +497,7 @@ class TSource : public TNonCopyable { } } + const THolderFactory& HolderFactory; private: NYT::NConcurrency::IThreadPoolPtr Pool_; std::mutex Mtx_; @@ -513,7 +514,6 @@ class TSource : public TNonCopyable { TListener::TPtr Listener_; TPtr Self_; size_t Inflight_; - const THolderFactory& HolderFactory_; NKikimr::NMiniKQL::IStatsRegistry* JobStats_; }; @@ -525,52 +525,48 @@ class TReaderState: public TComputationValue { , Source_(std::move(source)) , Width_(width) , Types_(arrowTypes) + , Result_(width) { } - EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { if (GotFinish_) { - return EFetchResult::Finish; + return NUdf::EFetchStatus::Finish; } + YQL_ENSURE(width == Width_ + 1); auto batch = Source_->Next(); if (!batch) { GotFinish_ = 1; Source_->Finish(); - return EFetchResult::Finish; + return NUdf::EFetchStatus::Finish; } + for (size_t i = 0; i < Width_; ++i) { - if (!output[i]) { - continue; - } - if(!batch->Columns[i].type()->Equals(Types_->at(i))) { - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(ARROW_RESULT(arrow::compute::Cast(batch->Columns[i], Types_->at(i)))); - continue; - } - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(batch->Columns[i])); - } - if (output[Width_]) { - *(output[Width_]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(ui64(batch->RowsCnt))); + YQL_ENSURE(batch->Columns[i].type()->Equals(Types_->at(i))); + output[i] = Source_->HolderFactory.CreateArrowBlock(std::move(batch->Columns[i])); } - return EFetchResult::One; + output[Width_] = Source_->HolderFactory.CreateArrowBlock(arrow::Datum(ui64(batch->RowsCnt))); + return NUdf::EFetchStatus::Ok; } private: TSource::TPtr Source_; const size_t Width_; std::shared_ptr>> Types_; + std::vector Result_; bool GotFinish_ = 0; }; }; -class TDqYtReadBlockWrapper : public TStatefulWideFlowComputationNode { -using TBaseComputation = TStatefulWideFlowComputationNode; +class TDqYtReadBlockWrapper : public TMutableComputationNode { +using TBaseComputation = TMutableComputationNode; public: TDqYtReadBlockWrapper(const TComputationNodeFactoryContext& ctx, const TString& clusterName, const TString& token, const NYT::TNode& inputSpec, const NYT::TNode& samplingSpec, const TVector& inputGroups, TType* itemType, const TVector& tableNames, TVector>&& tables, NKikimr::NMiniKQL::IStatsRegistry* jobStats, size_t inflight, - size_t timeout) : TBaseComputation(ctx.Mutables, this, EValueRepresentation::Boxed) + size_t timeout) : TBaseComputation(ctx.Mutables, EValueRepresentation::Boxed) , Width_(AS_TYPE(TStructType, itemType)->GetMembersCount()) , CodecCtx_(ctx.Env, ctx.FunctionRegistry, &ctx.HolderFactory) , ClusterName_(clusterName) @@ -588,6 +584,9 @@ using TBaseComputation = TStatefulWideFlowComputationNode 1, Tables_, SamplingSpec_); settings->Specs = &Specs_; settings->Pool = arrow::default_memory_pool(); @@ -598,14 +597,7 @@ using TBaseComputation = TStatefulWideFlowComputationNode(std::move(settings), Inflight_, Type_, types, ctx.HolderFactory, JobStats_); source->SetSelfAndRun(source); - state = ctx.HolderFactory.Create(source, Width_, types); - } - - EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - if (!state.HasValue()) { - MakeState(ctx, state); - } - return static_cast(*state.AsBoxed()).FetchValues(ctx, output); + return ctx.HolderFactory.Create(source, Width_, types); } void RegisterDependencies() const final {}