diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 0122b27b150a..7edcd4afe91e 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -212,41 +212,26 @@ struct IDqAsyncLookupSource { NKikimr::NMiniKQL::TMKQLAllocator> >; struct TEvLookupRequest: NActors::TEventLocal { - TEvLookupRequest(std::shared_ptr alloc, TUnboxedValueMap&& request) - : Alloc(alloc) - , Request(std::move(request)) + TEvLookupRequest(std::weak_ptr request) + : Request(std::move(request)) { } - ~TEvLookupRequest() { - auto guard = Guard(*Alloc); - TKeyTypeHelper empty; - Request = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()}; - } - std::shared_ptr Alloc; - TUnboxedValueMap Request; + std::weak_ptr Request; }; struct TEvLookupResult: NActors::TEventLocal { - TEvLookupResult(std::shared_ptr alloc, TUnboxedValueMap&& result) - : Alloc(alloc) - , Result(std::move(result)) + TEvLookupResult(std::weak_ptr result) + : Result(std::move(result)) { } - ~TEvLookupResult() { - auto guard = Guard(*Alloc.get()); - TKeyTypeHelper empty; - Result = TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()}; - } - - std::shared_ptr Alloc; - TUnboxedValueMap Result; + std::weak_ptr Result; }; virtual size_t GetMaxSupportedKeysInRequest() const = 0; //Initiate lookup for requested keys //Only one request at a time is allowed. Request must contain no more than GetMaxSupportedKeysInRequest() keys - //Upon completion, results are sent in TEvLookupResult event to the preconfigured actor - virtual void AsyncLookup(TUnboxedValueMap&& request) = 0; + //Upon completion, TEvLookupResult event is sent to the preconfigured actor + virtual void AsyncLookup(std::weak_ptr request) = 0; protected: ~IDqAsyncLookupSource() {} }; @@ -280,6 +265,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { std::shared_ptr Alloc; std::shared_ptr KeyTypeHelper; NActors::TActorId ParentId; + ::NMonitoring::TDynamicCounterPtr TaskCounters; google::protobuf::Any LookupSource; //provider specific data source const NKikimr::NMiniKQL::TStructType* KeyType; const NKikimr::NMiniKQL::TStructType* PayloadType; @@ -312,6 +298,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { const THashMap& SecureParams; const THashMap& TaskParams; const NActors::TActorId& ComputeActorId; + ::NMonitoring::TDynamicCounterPtr TaskCounters; const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; std::shared_ptr Alloc; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index bcc9cc89784b..84b982755fbf 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -203,6 +203,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped MkqlMemoryQuota = taskCounters->GetCounter("MkqlMemoryQuota"); OutputChannelSize = taskCounters->GetCounter("OutputChannelSize"); SourceCpuTimeMs = taskCounters->GetCounter("SourceCpuTimeMs", true); + InputTransformCpuTimeMs = taskCounters->GetCounter("InputTransformCpuTimeMs", true); } } @@ -1308,6 +1309,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .SecureParams = secureParams, .TaskParams = taskParams, .ComputeActorId = this->SelfId(), + .TaskCounters = TaskCounters, .TypeEnv = typeEnv, .HolderFactory = holderFactory, .Alloc = Alloc, @@ -1397,11 +1399,20 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped void OnNewAsyncInputDataArrived(const IDqComputeActorAsyncInput::TEvNewAsyncInputDataArrived::TPtr& ev) { Y_ABORT_UNLESS(SourcesMap.FindPtr(ev->Get()->InputIndex) || InputTransformsMap.FindPtr(ev->Get()->InputIndex)); - auto cpuTimeDelta = TakeSourceCpuTimeDelta(); - if (SourceCpuTimeMs) { - SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds()); + { + auto cpuTimeDelta = TakeSourceCpuTimeDelta(); + if (SourceCpuTimeMs) { + SourceCpuTimeMs->Add(cpuTimeDelta.MilliSeconds()); + } + CpuTimeSpent += cpuTimeDelta; + } + { + auto cpuTimeDelta = TakeInputTransformCpuTimeDelta(); + if (InputTransformCpuTimeMs) { + InputTransformCpuTimeMs->Add(cpuTimeDelta.MilliSeconds()); + } + CpuTimeSpent += cpuTimeDelta; } - CpuTimeSpent += cpuTimeDelta; ContinueExecute(EResumeSource::CANewAsyncInput); } @@ -1596,6 +1607,21 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped return result; } + TDuration GetInputTransformCpuTime() const { + auto result = TDuration::Zero(); + for (auto& [inputIndex, sourceInfo] : InputTransformsMap) { + result += sourceInfo.AsyncInput->GetCpuTime(); + } + return result; + } + + TDuration TakeInputTransformCpuTimeDelta() { + auto newInputTransformCpuTime = GetInputTransformCpuTime(); + auto result = newInputTransformCpuTime - InputTransformCpuTime; + InputTransformCpuTime = newInputTransformCpuTime; + return result; + } + void FillStats(NDqProto::TDqComputeActorStats* dst, bool last) { if (RuntimeSettings.CollectNone()) { return; @@ -1605,7 +1631,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ReportEventElapsedTime(); } - dst->SetCpuTimeUs(CpuTime.MicroSeconds()); + dst->SetCpuTimeUs(CpuTime.MicroSeconds() + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds()); dst->SetMaxMemoryUsage(MemoryLimits.MemoryQuotaManager->GetMaxMemorySize()); if (auto memProfileStats = GetMemoryProfileStats(); memProfileStats) { @@ -1638,10 +1664,13 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped } FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, RuntimeSettings.GetCollectStatsLevel()); - // More accurate cpu time counter: + auto cpuTimeUs = taskStats->ComputeCpuTime.MicroSeconds() + taskStats->BuildCpuTime.MicroSeconds(); if (TDerived::HasAsyncTaskRunner) { - protoTask->SetCpuTimeUs(CpuTime.MicroSeconds() + taskStats->ComputeCpuTime.MicroSeconds() + taskStats->BuildCpuTime.MicroSeconds()); + // Async TR is another actor, summarize CPU usage + cpuTimeUs += CpuTime.MicroSeconds(); } + // CpuTimeUs does include SourceCpuTime + protoTask->SetCpuTimeUs(cpuTimeUs + SourceCpuTime.MicroSeconds() + InputTransformCpuTime.MicroSeconds()); protoTask->SetSourceCpuTimeUs(SourceCpuTime.MicroSeconds()); ui64 ingressBytes = 0; @@ -1901,6 +1930,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped TDqComputeActorMetrics MetricsReporter; NWilson::TSpan ComputeActorSpan; TDuration SourceCpuTime; + TDuration InputTransformCpuTime; private: bool Running = true; TInstant LastSendStatsTime; @@ -1910,6 +1940,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped ::NMonitoring::TDynamicCounters::TCounterPtr MkqlMemoryQuota; ::NMonitoring::TDynamicCounters::TCounterPtr OutputChannelSize; ::NMonitoring::TDynamicCounters::TCounterPtr SourceCpuTimeMs; + ::NMonitoring::TDynamicCounters::TCounterPtr InputTransformCpuTimeMs; THolder Stat; TDuration CpuTimeSpent; }; diff --git a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp index 98aabb3a9314..ccec56621dfc 100644 --- a/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp +++ b/ydb/library/yql/dq/actors/input_transforms/dq_input_transform_lookup.cpp @@ -33,6 +33,7 @@ class TInputTransformStreamLookupBase ui64 inputIndex, NUdf::TUnboxedValue inputFlow, NActors::TActorId computeActorId, + ::NMonitoring::TDynamicCounterPtr taskCounters, IDqAsyncIoFactory* factory, NDqProto::TDqInputTransformLookupSettings&& settings, TVector&& lookupInputIndexes, @@ -42,6 +43,7 @@ class TInputTransformStreamLookupBase const NMiniKQL::TStructType* lookupPayloadType, const NMiniKQL::TMultiType* outputRowType, TOutputRowColumnOrder&& outputRowColumnOrder, + size_t maxDelayedRows, size_t cacheLimit, std::chrono::seconds cacheTtl ) @@ -51,6 +53,7 @@ class TInputTransformStreamLookupBase , InputIndex(inputIndex) , InputFlow(std::move(inputFlow)) , ComputeActorId(std::move(computeActorId)) + , TaskCounters(taskCounters) , Factory(factory) , Settings(std::move(settings)) , LookupInputIndexes(std::move(lookupInputIndexes)) @@ -63,9 +66,9 @@ class TInputTransformStreamLookupBase , OutputRowColumnOrder(std::move(outputRowColumnOrder)) , InputFlowFetchStatus(NUdf::EFetchStatus::Yield) , LruCache(std::make_unique(cacheLimit, lookupKeyType)) + , MaxDelayedRows(maxDelayedRows) , CacheTtl(cacheTtl) , ReadyQueue(OutputRowType) - , WaitingForLookupResults(false) { Y_ABORT_UNLESS(Alloc); for (size_t i = 0; i != LookupInputIndexes.size(); ++i) { @@ -75,6 +78,7 @@ class TInputTransformStreamLookupBase Y_DEBUG_ABORT_UNLESS(OtherInputIndexes[i] < InputRowType->GetElementsCount()); } Y_DEBUG_ABORT_UNLESS(LookupInputIndexes.size() == LookupKeyType->GetMembersCount()); + InitMonCounters(taskCounters); } void Bootstrap() { @@ -83,6 +87,7 @@ class TInputTransformStreamLookupBase .Alloc = Alloc, .KeyTypeHelper = KeyTypeHelper, .ParentId = SelfId(), + .TaskCounters = TaskCounters, .LookupSource = Settings.GetRightSource().GetLookupSource(), .KeyType = LookupKeyType, .PayloadType = LookupPayloadType, @@ -91,8 +96,10 @@ class TInputTransformStreamLookupBase .MaxKeysInRequest = 1000 // TODO configure me }; auto guard = Guard(*Alloc); - LookupSource = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs)); - RegisterWithSameMailbox(LookupSource.second); + auto [lookupSource, lookupSourceActor] = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs)); + MaxKeysInRequest = lookupSource->GetMaxSupportedKeysInRequest(); + LookupSourceId = RegisterWithSameMailbox(lookupSourceActor); + KeysForLookup = std::make_shared(MaxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()); } protected: virtual NUdf::EFetchStatus FetchWideInputValue(NUdf::TUnboxedValue* inputRowItems) = 0; @@ -101,8 +108,17 @@ class TInputTransformStreamLookupBase private: //events STRICT_STFUNC(StateFunc, hFunc(IDqAsyncLookupSource::TEvLookupResult, Handle); + hFunc(IDqComputeActorAsyncInput::TEvAsyncInputError, Handle); ) + void Handle(IDqComputeActorAsyncInput::TEvAsyncInputError::TPtr ev) { + auto evptr = ev->Get(); + Send(ComputeActorId, new IDqComputeActorAsyncInput::TEvAsyncInputError( + InputIndex, + evptr->Issues, + evptr->FatalCode)); + } + void AddReadyQueue(NUdf::TUnboxedValue& lookupKey, NUdf::TUnboxedValue& inputOther, NUdf::TUnboxedValue *lookupPayload) { NUdf::TUnboxedValue* outputRowItems; NUdf::TUnboxedValue outputRow = HolderFactory.CreateDirectArrayHolder(OutputRowColumnOrder.size(), outputRowItems); @@ -132,21 +148,31 @@ class TInputTransformStreamLookupBase } void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) { + auto startCycleCount = GetCycleCountFast(); + if (!KeysForLookup) { + return; + } auto guard = BindAllocator(); const auto now = std::chrono::steady_clock::now(); - auto lookupResult = std::move(ev->Get()->Result); + auto lookupResult = ev->Get()->Result.lock(); + Y_ABORT_UNLESS(lookupResult == KeysForLookup); for (; !AwaitingQueue.empty(); AwaitingQueue.pop_front()) { auto& [lookupKey, inputOther] = AwaitingQueue.front(); - auto lookupPayload = lookupResult.FindPtr(lookupKey); + auto lookupPayload = lookupResult->FindPtr(lookupKey); if (lookupPayload == nullptr) { continue; } AddReadyQueue(lookupKey, inputOther, lookupPayload); } - for (auto&& [k, v]: lookupResult) { + for (auto&& [k, v]: *lookupResult) { LruCache->Update(NUdf::TUnboxedValue(const_cast(k)), std::move(v), now + CacheTtl); } - WaitingForLookupResults = false; + KeysForLookup->clear(); + auto deltaTime = GetCpuTimeDelta(startCycleCount); + CpuTime += deltaTime; + if (CpuTimeUs) { + CpuTimeUs->Add(deltaTime.MicroSeconds()); + } Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex}); } @@ -165,9 +191,10 @@ class TInputTransformStreamLookupBase } void PassAway() final { - Send(LookupSource.second->SelfId(), new NActors::TEvents::TEvPoison{}); + Send(LookupSourceId, new NActors::TEvents::TEvPoison{}); auto guard = BindAllocator(); //All resources, held by this class, that have been created with mkql allocator, must be deallocated here + KeysForLookup.reset(); InputFlow.Clear(); KeyTypeHelper.reset(); decltype(AwaitingQueue){}.swap(AwaitingQueue); @@ -184,47 +211,89 @@ class TInputTransformStreamLookupBase i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& batch, TMaybe&, bool& finished, i64 freeSpace) final { Y_UNUSED(freeSpace); + auto startCycleCount = GetCycleCountFast(); auto guard = BindAllocator(); DrainReadyQueue(batch); - if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && !WaitingForLookupResults) { - NUdf::TUnboxedValue* inputRowItems; - NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems); + if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && KeysForLookup->empty()) { + Y_DEBUG_ABORT_UNLESS(AwaitingQueue.empty()); + NUdf::TUnboxedValue* inputRowItems; + NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems); const auto now = std::chrono::steady_clock::now(); - const auto maxKeysInRequest = LookupSource.first->GetMaxSupportedKeysInRequest(); - IDqAsyncLookupSource::TUnboxedValueMap keysForLookup{maxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual()}; LruCache->Prune(now); + size_t rowLimit = std::numeric_limits::max(); + size_t row = 0; while ( - (keysForLookup.size() < maxKeysInRequest) && + row < rowLimit && + (KeysForLookup->size() < MaxKeysInRequest) && ((InputFlowFetchStatus = FetchWideInputValue(inputRowItems)) == NUdf::EFetchStatus::Ok)) { NUdf::TUnboxedValue* keyItems; NUdf::TUnboxedValue key = HolderFactory.CreateDirectArrayHolder(LookupInputIndexes.size(), keyItems); NUdf::TUnboxedValue* otherItems; NUdf::TUnboxedValue other = HolderFactory.CreateDirectArrayHolder(OtherInputIndexes.size(), otherItems); + bool nullsInKey = false; for (size_t i = 0; i != LookupInputIndexes.size(); ++i) { keyItems[i] = inputRowItems[LookupInputIndexes[i]]; + if (!keyItems[i]) { + nullsInKey = true; + } } for (size_t i = 0; i != OtherInputIndexes.size(); ++i) { otherItems[i] = inputRowItems[OtherInputIndexes[i]]; } - if (auto lookupPayload = LruCache->Get(key, now)) { + if (nullsInKey) { + AddReadyQueue(key, other, nullptr); + } else if (auto lookupPayload = LruCache->Get(key, now)) { AddReadyQueue(key, other, &*lookupPayload); } else { + if (AwaitingQueue.empty()) { + // look ahead at most MaxDelayedRows after first missing + rowLimit = row + MaxDelayedRows; + } AwaitingQueue.emplace_back(key, std::move(other)); - keysForLookup.emplace(std::move(key), NUdf::TUnboxedValue{}); + KeysForLookup->emplace(std::move(key), NUdf::TUnboxedValue{}); } + ++row; } - if (!keysForLookup.empty()) { - LookupSource.first->AsyncLookup(std::move(keysForLookup)); - WaitingForLookupResults = true; + if (Batches && (!KeysForLookup->empty() || !ReadyQueue.RowCount())) { + Batches->Inc(); + LruHits->Add(ReadyQueue.RowCount()); + LruMiss->Add(AwaitingQueue.size()); + } + if (!KeysForLookup->empty()) { + Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup)); } DrainReadyQueue(batch); } + auto deltaTime = GetCpuTimeDelta(startCycleCount); + CpuTime += deltaTime; + if (CpuTimeUs) { + CpuTimeUs->Add(deltaTime.MicroSeconds()); + } finished = IsFinished(); return AwaitingQueue.size(); } + void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { + if (!taskCounters) { + return; + } + auto component = taskCounters->GetSubgroup("component", "Lookup"); + LruHits = component->GetCounter("Hits"); + LruMiss = component->GetCounter("Miss"); + CpuTimeUs = component->GetCounter("CpuUs"); + Batches = component->GetCounter("Batches"); + } + + static TDuration GetCpuTimeDelta(ui64 startCycleCount) { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - startCycleCount)); + } + + TDuration GetCpuTime() override { + return CpuTime; + } + TMaybe ExtraData() override { google::protobuf::Any result; //TODO fill me @@ -256,10 +325,12 @@ class TInputTransformStreamLookupBase ui64 InputIndex; // NYql::NDq::IDqComputeActorAsyncInput NUdf::TUnboxedValue InputFlow; const NActors::TActorId ComputeActorId; + ::NMonitoring::TDynamicCounterPtr TaskCounters; IDqAsyncIoFactory::TPtr Factory; NDqProto::TDqInputTransformLookupSettings Settings; protected: - std::pair LookupSource; + NActors::TActorId LookupSourceId; + size_t MaxKeysInRequest; const TVector LookupInputIndexes; const TVector OtherInputIndexes; const NMiniKQL::TMultiType* const InputRowType; @@ -271,13 +342,20 @@ class TInputTransformStreamLookupBase NUdf::EFetchStatus InputFlowFetchStatus; std::unique_ptr LruCache; + size_t MaxDelayedRows; std::chrono::seconds CacheTtl; using TInputKeyOtherPair = std::pair; using TAwaitingQueue = std::deque>; //input row split in two parts: key columns and other columns TAwaitingQueue AwaitingQueue; NKikimr::NMiniKQL::TUnboxedValueBatch ReadyQueue; - std::atomic WaitingForLookupResults; NYql::NDq::TDqAsyncStats IngressStats; + std::shared_ptr KeysForLookup; + + ::NMonitoring::TDynamicCounters::TCounterPtr LruHits; + ::NMonitoring::TDynamicCounters::TCounterPtr LruMiss; + ::NMonitoring::TDynamicCounters::TCounterPtr CpuTimeUs; + ::NMonitoring::TDynamicCounters::TCounterPtr Batches; + TDuration CpuTime; }; class TInputTransformStreamLookupWide: public TInputTransformStreamLookupBase { @@ -524,6 +602,7 @@ std::pair CreateInputTransformStre args.InputIndex, args.TransformInput, args.ComputeActorId, + args.TaskCounters, factory, std::move(settings), std::move(lookupKeyInputIndexes), @@ -533,6 +612,7 @@ std::pair CreateInputTransformStre lookupPayloadType, outputRowType, std::move(outputColumnsOrder), + settings.GetMaxDelayedRows(), settings.GetCacheLimit(), std::chrono::seconds(settings.GetCacheTtlSeconds()) ) : @@ -543,6 +623,7 @@ std::pair CreateInputTransformStre args.InputIndex, args.TransformInput, args.ComputeActorId, + args.TaskCounters, factory, std::move(settings), std::move(lookupKeyInputIndexes), @@ -552,6 +633,7 @@ std::pair CreateInputTransformStre lookupPayloadType, outputRowType, std::move(outputColumnsOrder), + settings.GetMaxDelayedRows(), settings.GetCacheLimit(), std::chrono::seconds(settings.GetCacheTtlSeconds()) ); diff --git a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json index 0d29ed67202f..6040ff143ea7 100644 --- a/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json +++ b/ydb/library/yql/dq/expr_nodes/dq_expr_nodes.json @@ -188,7 +188,7 @@ {"Index": 6, "Name": "LeftJoinKeyNames", "Type": "TCoAtomList"}, {"Index": 7, "Name": "RightJoinKeyNames", "Type": "TCoAtomList"}, {"Index": 8, "Name": "TTL", "Type": "TCoAtom"}, - {"Index": 9, "Name": "MaxDelay", "Type": "TCoAtom"}, + {"Index": 9, "Name": "MaxDelayedRows", "Type": "TCoAtom"}, {"Index": 10, "Name": "MaxCachedRows", "Type": "TCoAtom"} ] }, diff --git a/ydb/library/yql/dq/proto/dq_tasks.proto b/ydb/library/yql/dq/proto/dq_tasks.proto index da8bdf36122e..76006b2f6049 100644 --- a/ydb/library/yql/dq/proto/dq_tasks.proto +++ b/ydb/library/yql/dq/proto/dq_tasks.proto @@ -186,6 +186,7 @@ message TDqInputTransformLookupSettings { bytes NarrowOutputRowType = 8; //Serialized struct type uint64 CacheLimit = 9; uint64 CacheTtlSeconds = 10; + uint64 MaxDelayedRows = 11; } message TDqTask { diff --git a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp index 0eba2f3be213..6406527a9576 100644 --- a/ydb/library/yql/providers/dq/opt/physical_optimize.cpp +++ b/ydb/library/yql/providers/dq/opt/physical_optimize.cpp @@ -227,7 +227,7 @@ class TDqsPhysicalOptProposalTransformer : public TOptimizeTransformerBase { .RightJoinKeyNames(join.RightJoinKeyNames()) .TTL(ctx.NewAtom(pos, 300)) //TODO configure me .MaxCachedRows(ctx.NewAtom(pos, 1'000'000)) //TODO configure me - .MaxDelay(ctx.NewAtom(pos, 1'000'000)) //Configure me + .MaxDelayedRows(ctx.NewAtom(pos, 1'000'000)) //Configure me .Done(); auto lambda = Build(ctx, pos) diff --git a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp index d7b77b8bbda6..619f10a99bcf 100644 --- a/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp +++ b/ydb/library/yql/providers/generic/actors/ut/yql_generic_lookup_actor_ut.cpp @@ -30,21 +30,31 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { return result; } - //Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread + // Simple actor to call IDqAsyncLookupSource::AsyncLookup from an actor system's thread class TCallLookupActor: public TActorBootstrapped { public: TCallLookupActor( std::shared_ptr alloc, - NYql::NDq::IDqAsyncLookupSource* lookupSource, - NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request) + const NActors::TActorId& lookupActor, + std::shared_ptr request) : Alloc(alloc) - , LookupSource(lookupSource) - , Request(std::move(request)) + , LookupActor(lookupActor) + , Request(request) { } void Bootstrap() { - LookupSource->AsyncLookup(std::move(Request)); + auto ev = new NYql::NDq::IDqAsyncLookupSource::TEvLookupRequest(Request); + TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(LookupActor, SelfId(), ev)); + } + + void PassAway() override { + auto guard = Guard(*Alloc); + Request.reset(); + } + + ~TCallLookupActor() { + PassAway(); } private: @@ -52,8 +62,8 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { private: std::shared_ptr Alloc; - NYql::NDq::IDqAsyncLookupSource* LookupSource; - NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap Request; + const NActors::TActorId LookupActor; + std::shared_ptr Request; }; Y_UNIT_TEST(Lookup) { @@ -136,7 +146,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { .AddResponse( MakeRecordBatch( MakeArray("id", {0, 1, 2}, arrow::uint64()), - MakeArray("optional_id", {100, 101, 103}, arrow::uint64()), //the last value is intentially wrong + MakeArray("optional_id", {100, 101, 103}, arrow::uint64()), // the last value is intentially wrong MakeArray("string_value", {"a", "b", "c"}, arrow::utf8()) ), NewSuccess() @@ -166,6 +176,7 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { connectorMock, std::make_shared(), edge, + nullptr, alloc, keyTypeHelper, std::move(lookupSourceSettings), @@ -174,44 +185,45 @@ Y_UNIT_TEST_SUITE(GenericProviderLookupActor) { typeEnv, holderFactory, 1'000'000); - runtime.Register(actor); + auto lookupActor = runtime.Register(actor); - NYql::NDq::IDqAsyncLookupSource::TUnboxedValueMap request(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual()); + auto request = std::make_shared(3, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual()); for (size_t i = 0; i != 3; ++i) { NYql::NUdf::TUnboxedValue* keyItems; auto key = holderFactory.CreateDirectArrayHolder(2, keyItems); keyItems[0] = NYql::NUdf::TUnboxedValuePod(ui64(i)); keyItems[1] = NYql::NUdf::TUnboxedValuePod(ui64(100 + i)); - request.emplace(std::move(key), NYql::NUdf::TUnboxedValue{}); + request->emplace(std::move(key), NYql::NUdf::TUnboxedValue{}); } - guard.Release(); //let actors use alloc + guard.Release(); // let actors use alloc - auto callLookupActor = new TCallLookupActor(alloc, lookupSource, std::move(request)); + auto callLookupActor = new TCallLookupActor(alloc, lookupActor, request); runtime.Register(callLookupActor); auto ev = runtime.GrabEdgeEventRethrow(edge); auto guard2 = Guard(*alloc.get()); - auto lookupResult = std::move(ev->Get()->Result); + auto lookupResult = ev->Get()->Result.lock(); + UNIT_ASSERT(lookupResult); - UNIT_ASSERT_EQUAL(3, lookupResult.size()); + UNIT_ASSERT_EQUAL(3, lookupResult->size()); { - const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {0, 100})); + const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {0, 100})); UNIT_ASSERT(v); NYql::NUdf::TUnboxedValue val = v->GetElement(0); UNIT_ASSERT(val.AsStringRef() == TStringBuf("a")); } { - const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {1, 101})); + const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {1, 101})); UNIT_ASSERT(v); NYql::NUdf::TUnboxedValue val = v->GetElement(0); UNIT_ASSERT(val.AsStringRef() == TStringBuf("b")); } { - const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {2, 102})); + const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {2, 102})); UNIT_ASSERT(v); UNIT_ASSERT(!*v); } } -} //Y_UNIT_TEST_SUITE(GenericProviderLookupActor) +} // Y_UNIT_TEST_SUITE(GenericProviderLookupActor) diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h index c38fa9c9f0d4..ec7810ad163e 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_base_actor.h @@ -10,7 +10,7 @@ namespace NYql::NDq { template class TGenericBaseActor: public NActors::TActorBootstrapped { - protected: //Events + protected: // Events // Event ids enum EEventIds: ui32 { EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), @@ -89,7 +89,7 @@ namespace NYql::NDq { NConnector::NApi::TError Error; }; - protected: //TODO move common logic here + protected: // TODO move common logic here }; } // namespace NYql::NDq diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp index 21090f61e5b8..d4c8b242882a 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.cpp @@ -44,11 +44,11 @@ namespace NYql::NDq { template T ExtractFromConstFuture(const NThreading::TFuture& f) { - //We want to avoid making a copy of data stored in a future. - //But there is no direct way to extract data from a const future5 - //So, we make a copy of the future, that is cheap. Then, extract the value from this copy. - //It destructs the value in the original future, but this trick is legal and documented here: - //https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency + // We want to avoid making a copy of data stored in a future. + // But there is no direct way to extract data from a const future5 + // So, we make a copy of the future, that is cheap. Then, extract the value from this copy. + // It destructs the value in the original future, but this trick is legal and documented here: + // https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency return NThreading::TFuture(f).ExtractValueSync(); } @@ -64,6 +64,7 @@ namespace NYql::NDq { NConnector::IClient::TPtr connectorClient, TGenericTokenProvider::TPtr tokenProvider, NActors::TActorId&& parentId, + ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, @@ -84,19 +85,34 @@ namespace NYql::NDq { , HolderFactory(holderFactory) , ColumnDestinations(CreateColumnDestination()) , MaxKeysInRequest(maxKeysInRequest) - , Request( - 0, - KeyTypeHelper->GetValueHash(), - KeyTypeHelper->GetValueEqual()) { + InitMonCounters(taskCounters); } ~TGenericLookupActor() { + Free(); + } + + private: + void Free() { auto guard = Guard(*Alloc); + Request.reset(); KeyTypeHelper.reset(); - TKeyTypeHelper empty; - Request = IDqAsyncLookupSource::TUnboxedValueMap(0, empty.GetValueHash(), empty.GetValueEqual()); } + void InitMonCounters(const ::NMonitoring::TDynamicCounterPtr& taskCounters) { + if (!taskCounters) { + return; + } + auto component = taskCounters->GetSubgroup("component", "LookupSrc"); + Count = component->GetCounter("Reqs"); + Keys = component->GetCounter("Keys"); + ResultChunks = component->GetCounter("Chunks"); + ResultRows = component->GetCounter("Rows"); + ResultBytes = component->GetCounter("Bytes"); + AnswerTime = component->GetCounter("AnswerMs"); + CpuTime = component->GetCounter("CpuUs"); + } + public: void Bootstrap() { auto dsi = LookupSource.data_source_instance(); @@ -112,17 +128,22 @@ namespace NYql::NDq { static constexpr char ActorName[] = "GENERIC_PROVIDER_LOOKUP_ACTOR"; - private: //IDqAsyncLookupSource + private: // IDqAsyncLookupSource size_t GetMaxSupportedKeysInRequest() const override { return MaxKeysInRequest; } - void AsyncLookup(IDqAsyncLookupSource::TUnboxedValueMap&& request) override { + void AsyncLookup(std::weak_ptr request) override { auto guard = Guard(*Alloc); - CreateRequest(std::move(request)); + CreateRequest(request.lock()); + } + void PassAway() override { + Free(); + TBase::PassAway(); } - private: //events + private: // events STRICT_STFUNC(StateFunc, + hFunc(TEvLookupRequest, Handle); hFunc(TEvListSplitsIterator, Handle); hFunc(TEvListSplitsPart, Handle); hFunc(TEvReadSplitsIterator, Handle); @@ -189,19 +210,43 @@ namespace NYql::NDq { FinalizeRequest(); } - void Handle(TEvError::TPtr) { - FinalizeRequest(); + void Handle(TEvError::TPtr ev) { + auto actorSystem = TActivationContext::ActorSystem(); + auto error = ev->Get()->Error; + auto errEv = std::make_unique( + -1, + NConnector::ErrorToIssues(error), + NConnector::ErrorToDqStatus(error)); + actorSystem->Send(new NActors::IEventHandle(ParentId, SelfId(), errEv.release())); } void Handle(NActors::TEvents::TEvPoison::TPtr) { PassAway(); } + void Handle(TEvLookupRequest::TPtr ev) { + auto guard = Guard(*Alloc); + CreateRequest(ev->Get()->Request.lock()); + } + private: - void CreateRequest(IDqAsyncLookupSource::TUnboxedValueMap&& request) { - YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys"; - Y_ABORT_IF(InProgress); - Y_ABORT_IF(request.size() == 0 || request.size() > MaxKeysInRequest); + static TDuration GetCpuTimeDelta(ui64 startCycleCount) { + return TDuration::Seconds(NHPTimer::GetSeconds(GetCycleCountFast() - startCycleCount)); + } + + void CreateRequest(std::shared_ptr request) { + if (!request) { + return; + } + auto startCycleCount = GetCycleCountFast(); + SentTime = TInstant::Now(); + YQL_CLOG(DEBUG, ProviderGeneric) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys"; + Y_ABORT_IF(request->size() == 0 || request->size() > MaxKeysInRequest); + + if (Count) { + Count->Inc(); + Keys->Add(request->size()); + } Request = std::move(request); NConnector::NApi::TListSplitsRequest splitRequest; @@ -224,6 +269,9 @@ namespace NYql::NDq { SendError(actorSystem, selfId, result.Status); } }); + if (CpuTime) { + CpuTime->Add(GetCpuTimeDelta(startCycleCount).MicroSeconds()); + } } void ReadNextData() { @@ -251,9 +299,17 @@ namespace NYql::NDq { } void ProcessReceivedData(const NConnector::NApi::TReadSplitsResponse& resp) { + auto startCycleCount = GetCycleCountFast(); Y_ABORT_UNLESS(resp.payload_case() == NConnector::NApi::TReadSplitsResponse::PayloadCase::kArrowIpcStreaming); + if (ResultChunks) { + ResultChunks->Inc(); + if (resp.has_stats()) { + ResultRows->Add(resp.stats().rows()); + ResultBytes->Add(resp.stats().bytes()); + } + } auto guard = Guard(*Alloc); - NKikimr::NArrow::NSerialization::TSerializerContainer deser = NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); //todo move to class' member + NKikimr::NArrow::NSerialization::TSerializerContainer deser = NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); // todo move to class' member const auto& data = deser->Deserialize(resp.arrow_ipc_streaming()); Y_ABORT_UNLESS(data.ok()); const auto& value = data.ValueOrDie(); @@ -273,20 +329,26 @@ namespace NYql::NDq { for (size_t j = 0; j != columns.size(); ++j) { (ColumnDestinations[j].first == EColumnDestination::Key ? keyItems : outputItems)[ColumnDestinations[j].second] = columns[j][i]; } - if (auto* v = Request.FindPtr(key)) { - *v = std::move(output); //duplicates will be overwritten + if (auto* v = Request->FindPtr(key)) { + *v = std::move(output); // duplicates will be overwritten } } + if (CpuTime) { + CpuTime->Add(GetCpuTimeDelta(startCycleCount).MicroSeconds()); + } } void FinalizeRequest() { - YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request.size() << " keys"; + YQL_CLOG(DEBUG, ProviderGeneric) << "Sending lookup results for " << Request->size() << " keys"; auto guard = Guard(*Alloc); - auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(Request)); + auto ev = new IDqAsyncLookupSource::TEvLookupResult(Request); + if (AnswerTime) { + AnswerTime->Add((TInstant::Now() - SentTime).MilliSeconds()); + } + Request.reset(); TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev)); LookupResult = {}; ReadSplitsIterator = {}; - InProgress = false; } static void SendError(NActors::TActorSystem* actorSystem, const NActors::TActorId& selfId, const NConnector::NApi::TError& error) { @@ -352,7 +414,7 @@ namespace NYql::NDq { select.mutable_from()->Settable(LookupSource.table()); NConnector::NApi::TPredicate_TDisjunction disjunction; - for (const auto& [k, _] : Request) { + for (const auto& [k, _] : *Request) { NConnector::NApi::TPredicate_TConjunction conjunction; for (ui32 c = 0; c != KeyType->GetMembersCount(); ++c) { NConnector::NApi::TPredicate_TComparison eq; @@ -378,20 +440,28 @@ namespace NYql::NDq { const NYql::Generic::TLookupSource LookupSource; const NKikimr::NMiniKQL::TStructType* const KeyType; const NKikimr::NMiniKQL::TStructType* const PayloadType; - const NKikimr::NMiniKQL::TStructType* const SelectResultType; //columns from KeyType + PayloadType + const NKikimr::NMiniKQL::TStructType* const SelectResultType; // columns from KeyType + PayloadType const NKikimr::NMiniKQL::THolderFactory& HolderFactory; const std::vector> ColumnDestinations; const size_t MaxKeysInRequest; - std::atomic_bool InProgress; - IDqAsyncLookupSource::TUnboxedValueMap Request; - NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; //TODO move me to TEvReadSplitsPart + std::shared_ptr Request; + NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator; // TODO move me to TEvReadSplitsPart NKikimr::NMiniKQL::TKeyPayloadPairVector LookupResult; + ::NMonitoring::TDynamicCounters::TCounterPtr Count; + ::NMonitoring::TDynamicCounters::TCounterPtr Keys; + ::NMonitoring::TDynamicCounters::TCounterPtr ResultRows; + ::NMonitoring::TDynamicCounters::TCounterPtr ResultBytes; + ::NMonitoring::TDynamicCounters::TCounterPtr ResultChunks; + ::NMonitoring::TDynamicCounters::TCounterPtr AnswerTime; + ::NMonitoring::TDynamicCounters::TCounterPtr CpuTime; + TInstant SentTime; }; std::pair CreateGenericLookupActor( NConnector::IClient::TPtr connectorClient, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NActors::TActorId parentId, + ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, @@ -407,6 +477,7 @@ namespace NYql::NDq { connectorClient, std::move(tokenProvider), std::move(parentId), + taskCounters, alloc, keyTypeHelper, std::move(lookupSource), diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h index 9f8c0c268f23..128964b1553f 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_lookup_actor.h @@ -15,6 +15,7 @@ namespace NYql::NDq { NConnector::IClient::TPtr connectorClient, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NActors::TActorId parentId, + ::NMonitoring::TDynamicCounterPtr taskCounters, std::shared_ptr alloc, std::shared_ptr keyTypeHelper, NYql::Generic::TLookupSource&& lookupSource, diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp index e9b2b8bf8bd5..88c5e656ee82 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_provider_factories.cpp @@ -22,6 +22,7 @@ namespace NYql::NDq { genericClient, credentialsFactory, std::move(args.ParentId), + args.TaskCounters, args.Alloc, args.KeyTypeHelper, std::move(lookupSource), diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp index 4c664f665826..f3b32eb2d27a 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp @@ -27,11 +27,11 @@ namespace NYql::NDq { template T ExtractFromConstFuture(const NThreading::TFuture& f) { - //We want to avoid making a copy of data stored in a future. - //But there is no direct way to extract data from a const future - //So, we make a copy of the future, that is cheap. Then, extract the value from this copy. - //It destructs the value in the original future, but this trick is legal and documented here: - //https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency + // We want to avoid making a copy of data stored in a future. + // But there is no direct way to extract data from a const future + // So, we make a copy of the future, that is cheap. Then, extract the value from this copy. + // It destructs the value in the original future, but this trick is legal and documented here: + // https://docs.yandex-team.ru/arcadia-cpp/cookbook/concurrency return NThreading::TFuture(f).ExtractValueSync(); } diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp index f2651cac0d1c..bbb6e1555c5f 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp +++ b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.cpp @@ -70,4 +70,4 @@ namespace NYql::NDq { } return std::make_unique(); } -} //namespace NYql::NDq +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h index 6ff0d1fd578d..c656e3a38daf 100644 --- a/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h +++ b/ydb/library/yql/providers/generic/actors/yql_generic_token_provider.h @@ -13,7 +13,7 @@ namespace NYql::NDq { class TGenericTokenProvider { public: using TPtr = std::unique_ptr; - TGenericTokenProvider() = default; //No auth required + TGenericTokenProvider() = default; // No auth required TGenericTokenProvider(const TString& staticIamToken); TGenericTokenProvider( const TString& serviceAccountId, @@ -34,4 +34,4 @@ namespace NYql::NDq { const TString& staticIamToken, const TString& serviceAccountId, const TString& ServiceAccountIdSignature, const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory); -} //namespace NYql::NDq +} // namespace NYql::NDq diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/test_creds.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/test_creds.h index f3025de07053..cefd4a43c98d 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/test_creds.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/test_creds.h @@ -33,4 +33,4 @@ namespace NYql::NTestCreds { } }; -} //namespace NYql::NTestCreds +} // namespace NYql::NTestCreds diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h index 07a19c5ce827..6d1f83c7e407 100644 --- a/ydb/library/yql/providers/generic/provider/yql_generic_settings.h +++ b/ydb/library/yql/providers/generic/provider/yql_generic_settings.h @@ -42,4 +42,4 @@ namespace NYql { THashMap ClusterNamesToClusterConfigs; // cluster name -> cluster config THashMap> DatabaseIdsToClusterNames; // database id -> cluster name }; -} //namespace NYql +} // namespace NYql diff --git a/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp b/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp index 03f77cf2b21c..5edc60ac5547 100644 --- a/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp +++ b/ydb/library/yql/providers/yt/actors/ut/yql_yt_lookup_actor_ut.cpp @@ -49,25 +49,34 @@ class TCallLookupActor: public TActorBootstrapped { TCallLookupActor( std::shared_ptr alloc, const NActors::TActorId& lookupActor, - NDq::IDqAsyncLookupSource::TUnboxedValueMap&& request) + std::shared_ptr request) : Alloc(alloc) , LookupActor(lookupActor) - , Request(std::move(request)) + , Request(request) { } void Bootstrap() { - auto ev = new NDq::IDqAsyncLookupSource::TEvLookupRequest(Alloc, std::move(Request)); + auto ev = new NDq::IDqAsyncLookupSource::TEvLookupRequest(Request); TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(LookupActor, SelfId(), ev)); } + ~TCallLookupActor() { + PassAway(); + } + + void PassAway() override { + auto guard = Guard(*Alloc); + Request.reset(); + } + private: static constexpr char ActorName[] = "TEST"; private: std::shared_ptr Alloc; const NActors::TActorId LookupActor; - NDq::IDqAsyncLookupSource::TUnboxedValueMap Request; + std::shared_ptr Request; }; Y_UNIT_TEST(Lookup) { @@ -138,43 +147,43 @@ Y_UNIT_TEST(Lookup) { typeEnv, holderFactory, 1'000'000); - runtime.Register(lookupActor); + auto lookupActorId = runtime.Register(lookupActor); - NDq::IDqAsyncLookupSource::TUnboxedValueMap request{4, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual()}; - request.emplace(CreateStructValue(holderFactory, {"host1", "vpc1"}), NUdf::TUnboxedValue{}); - request.emplace(CreateStructValue(holderFactory, {"host2", "vpc1"}), NUdf::TUnboxedValue{}); - request.emplace(CreateStructValue(holderFactory, {"host2", "vpc2"}), NUdf::TUnboxedValue{}); //NOT_FOUND expected - request.emplace(CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"}), NUdf::TUnboxedValue{}); + auto request = std::make_shared(4, keyTypeHelper->GetValueHash(), keyTypeHelper->GetValueEqual()); + request->emplace(CreateStructValue(holderFactory, {"host1", "vpc1"}), NUdf::TUnboxedValue{}); + request->emplace(CreateStructValue(holderFactory, {"host2", "vpc1"}), NUdf::TUnboxedValue{}); + request->emplace(CreateStructValue(holderFactory, {"host2", "vpc2"}), NUdf::TUnboxedValue{}); //NOT_FOUND expected + request->emplace(CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"}), NUdf::TUnboxedValue{}); guard.Release(); //let actors use alloc - auto callLookupActor = new TCallLookupActor(alloc, lookupActor->SelfId(), std::move(request)); + auto callLookupActor = new TCallLookupActor(alloc, lookupActorId, request); runtime.Register(callLookupActor); auto ev = runtime.GrabEdgeEventRethrow(edge); auto guard2 = Guard(*alloc.get()); - auto lookupResult = std::move(ev->Get()->Result); - UNIT_ASSERT_EQUAL(4, lookupResult.size()); + auto lookupResult = ev->Get()->Result.lock(); + UNIT_ASSERT_EQUAL(4, lookupResult->size()); { - const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"host1", "vpc1"})); + const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {"host1", "vpc1"})); UNIT_ASSERT(v); UNIT_ASSERT(CheckStructValue(*v, {"host1.vpc1.net", "192.168.1.1"})); } { - const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"host2", "vpc1"})); + const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {"host2", "vpc1"})); UNIT_ASSERT(v); UNIT_ASSERT(CheckStructValue(*v, {"host2.vpc1.net", "192.168.1.2"})); } { - const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"host2", "vpc2"})); + const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {"host2", "vpc2"})); UNIT_ASSERT(v); UNIT_ASSERT(!*v); } { - const auto* v = lookupResult.FindPtr(CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"})); + const auto* v = lookupResult->FindPtr(CreateStructValue(holderFactory, {"very very long hostname to for test 2", "vpc2"})); UNIT_ASSERT(v); UNIT_ASSERT(CheckStructValue(*v, {"very very long fqdn for test 2", "192.168.100.2"})); } } -} //Y_UNIT_TEST_SUITE(GenericProviderLookupActor) \ No newline at end of file +} //Y_UNIT_TEST_SUITE(GenericProviderLookupActor) diff --git a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp index 35298adb6292..a01cfdbc1c8e 100644 --- a/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp +++ b/ydb/library/yql/providers/yt/actors/yql_yt_lookup_actor.cpp @@ -87,12 +87,17 @@ class TYtLookupActor { } ~TYtLookupActor() { + Free(); + } + +private: + void Free() { auto guard = Guard(*Alloc); KeyTypeHelper.reset(); TKeyTypeHelper empty; Data = IDqAsyncLookupSource::TUnboxedValueMap{0, empty.GetValueHash(), empty.GetValueEqual()}; } - +public: void Bootstrap() { YQL_CLOG(INFO, ProviderYt) << "New Yt proivider lookup source actor(ActorId=" << SelfId() << ") for" @@ -156,21 +161,30 @@ class TYtLookupActor size_t GetMaxSupportedKeysInRequest() const override { return MaxKeysInRequest; } - void AsyncLookup(IDqAsyncLookupSource::TUnboxedValueMap&& request) override { - YQL_CLOG(DEBUG, ProviderYt) << "ActorId=" << SelfId() << " Got LookupRequest for " << request.size() << " keys"; + void AsyncLookup(std::weak_ptr wrequest) override { Y_ABORT_IF(InProgress); - Y_ABORT_IF(request.size() > MaxKeysInRequest); - InProgress = true; auto guard = Guard(*Alloc); - for (const auto& [k, _]: request) { + auto request = wrequest.lock(); + if (!request) { + YQL_CLOG(DEBUG, ProviderYt) << "ActorId=" << SelfId() << " LookupRequest was lost"; + return; + } + YQL_CLOG(DEBUG, ProviderYt) << "ActorId=" << SelfId() << " Got LookupRequest for " << request->size() << " keys"; + InProgress = true; + Y_ABORT_IF(request->size() > MaxKeysInRequest); + for (auto& [k, val]: *request) { if (const auto* v = Data.FindPtr(k)) { - request[k] = *v; + val = *v; } } - auto ev = new IDqAsyncLookupSource::TEvLookupResult(Alloc, std::move(request)); + auto ev = new IDqAsyncLookupSource::TEvLookupResult(request); TActivationContext::ActorSystem()->Send(new NActors::IEventHandle(ParentId, SelfId(), ev)); InProgress = false; } + void PassAway() override { + Free(); + TBase::PassAway(); + } private: //events STRICT_STFUNC(StateFunc, diff --git a/ydb/tests/fq/generic/test_streaming_join.py b/ydb/tests/fq/generic/test_streaming_join.py index e5018d23b1c6..2ee95ff83130 100644 --- a/ydb/tests/fq/generic/test_streaming_join.py +++ b/ydb/tests/fq/generic/test_streaming_join.py @@ -130,6 +130,7 @@ def freeze(json): ('{"id":9,"user":3}', '{"id":9,"user_id":3,"lookup":"ydb30"}'), ('{"id":2,"user":2}', '{"id":2,"user_id":2,"lookup":"ydb20"}'), ('{"id":1,"user":1}', '{"id":1,"user_id":1,"lookup":"ydb10"}'), + ('{"id":10,"user":null}', '{"id":10,"user_id":null,"lookup":null}'), ('{"id":4,"user":3}', '{"id":4,"user_id":3,"lookup":"ydb30"}'), ('{"id":5,"user":3}', '{"id":5,"user_id":3,"lookup":"ydb30"}'), ('{"id":6,"user":1}', '{"id":6,"user_id":1,"lookup":"ydb10"}'), @@ -349,6 +350,10 @@ def freeze(json): '{"id":3,"za":2,"yb":"1","yc":114,"zd":115}', '{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115}', ), + ( + '{"id":3,"za":2,"yb":null,"yc":114,"zd":115}', + '{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":null,"yc":114,"zd":115}', + ), ] ), ), @@ -390,6 +395,10 @@ def freeze(json): '{"id":3,"za":2,"yb":"1","yc":114,"zd":115}', '{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":2,"yb":"1","yc":114,"zd":115}', ), + ( + '{"id":3,"za":null,"yb":"1","yc":114,"zd":115}', + '{"a":null,"b":null,"c":null,"d":null,"e":null,"f":null,"za":null,"yb":"1","yc":114,"zd":115}', + ), ] ), ), @@ -506,6 +515,19 @@ def test_streamlookup( messages_ctr = Counter(map(freeze, map(json.loads, map(itemgetter(1), messages)))) assert read_data_ctr == messages_ctr + for node_index in kikimr.compute_plane.kikimr_cluster.nodes: + sensors = kikimr.compute_plane.get_sensors(node_index, "dq_tasks") + for component in ["Lookup", "LookupSrc"]: + componentSensors = sensors.find_sensors( + labels={"operation": query_id, "component": component}, + key_label="sensor", + ) + for k in componentSensors: + print( + f'node[{node_index}].operation[{query_id}].component[{component}].{k} = {componentSensors[k]}', + file=sys.stderr, + ) + fq_client.abort_query(query_id) fq_client.wait_query(query_id) diff --git a/ydb/tests/tools/fq_runner/kikimr_metrics.py b/ydb/tests/tools/fq_runner/kikimr_metrics.py index 006e95119738..08825289171b 100644 --- a/ydb/tests/tools/fq_runner/kikimr_metrics.py +++ b/ydb/tests/tools/fq_runner/kikimr_metrics.py @@ -31,7 +31,7 @@ def find_sensors(self, labels, key_label): continue v = lbls.get(key_label, None) if v is not None: - result[v] = s["value"] + result[v] = s.get("value", None) return result def collect_non_zeros(self):