Skip to content

Commit

Permalink
streamlookup: fix KeysForLookup reallocation (#10283)
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Oct 11, 2024
1 parent 80e5c94 commit 4fe6505
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class TInputTransformStreamLookupBase
auto [lookupSource, lookupSourceActor] = Factory->CreateDqLookupSource(Settings.GetRightSource().GetProviderName(), std::move(lookupSourceArgs));
MaxKeysInRequest = lookupSource->GetMaxSupportedKeysInRequest();
LookupSourceId = RegisterWithSameMailbox(lookupSourceActor);
KeysForLookup = std::make_shared<IDqAsyncLookupSource::TUnboxedValueMap>(MaxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual());
}
protected:
virtual NUdf::EFetchStatus FetchWideInputValue(NUdf::TUnboxedValue* inputRowItems) = 0;
Expand Down Expand Up @@ -141,6 +142,8 @@ class TInputTransformStreamLookupBase
}

void Handle(IDqAsyncLookupSource::TEvLookupResult::TPtr ev) {
if (!KeysForLookup)
return;
auto guard = BindAllocator();
const auto now = std::chrono::steady_clock::now();
auto lookupResult = ev->Get()->Result.lock();
Expand All @@ -156,7 +159,7 @@ class TInputTransformStreamLookupBase
for (auto&& [k, v]: *lookupResult) {
LruCache->Update(NUdf::TUnboxedValue(const_cast<NUdf::TUnboxedValue&&>(k)), std::move(v), now + CacheTtl);
}
KeysForLookup.reset();
KeysForLookup->clear();
Send(ComputeActorId, new TEvNewAsyncInputDataArrived{InputIndex});
}

Expand Down Expand Up @@ -199,11 +202,10 @@ class TInputTransformStreamLookupBase

DrainReadyQueue(batch);

if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && !KeysForLookup) {
if (InputFlowFetchStatus != NUdf::EFetchStatus::Finish && KeysForLookup->empty()) {
NUdf::TUnboxedValue* inputRowItems;
NUdf::TUnboxedValue inputRow = HolderFactory.CreateDirectArrayHolder(InputRowType->GetElementsCount(), inputRowItems);
const auto now = std::chrono::steady_clock::now();
KeysForLookup = std::make_shared<IDqAsyncLookupSource::TUnboxedValueMap>(MaxKeysInRequest, KeyTypeHelper->GetValueHash(), KeyTypeHelper->GetValueEqual());
LruCache->Prune(now);
while (
(KeysForLookup->size() < MaxKeysInRequest) &&
Expand All @@ -227,8 +229,6 @@ class TInputTransformStreamLookupBase
}
if (!KeysForLookup->empty()) {
Send(LookupSourceId, new IDqAsyncLookupSource::TEvLookupRequest(KeysForLookup));
} else {
KeysForLookup.reset();
}
DrainReadyQueue(batch);
}
Expand Down

0 comments on commit 4fe6505

Please sign in to comment.