diff --git a/ydb/library/yql/minikql/aligned_page_pool.cpp b/ydb/library/yql/minikql/aligned_page_pool.cpp index 3b489fcbd27b..2caadc25f7c2 100644 --- a/ydb/library/yql/minikql/aligned_page_pool.cpp +++ b/ydb/library/yql/minikql/aligned_page_pool.cpp @@ -504,7 +504,7 @@ void TAlignedPagePoolImpl::Free(void* ptr, size_t size) noexcept { template void TAlignedPagePoolImpl::UpdateMemoryYellowZone() { if (Limit == 0) return; - if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return; + // if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return; ui8 usedMemoryPercent = 100 * GetUsed() / Limit; if (usedMemoryPercent >= EnableMemoryYellowZoneThreshold) { diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index ab5f459070a0..206d01f54091 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -631,9 +631,9 @@ class TGraceJoinSpillingSupportState : public TComputationValueTablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) { + std::cerr << std::format("[MISHA][LEFT][{}] Extracting bucket\n", NextBucketToJoin); LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin); } if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) { + std::cerr << std::format("[MISHA][RIGHT][{}] Extracting bucket\n", NextBucketToJoin); RightPacker->TablePtr->PrepareBucket(NextBucketToJoin); } if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) { + std::cerr << std::format("[MISHA][LEFT][{}] Loading bucket\n", NextBucketToJoin); LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin); } if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) { + std::cerr << std::format("[MISHA][RIGHT][{}] Loading bucket\n", NextBucketToJoin); RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin); } if (LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) { if (*PartialJoinCompleted) { - while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) { + while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, NextBucketToJoin + 1)) { UnpackJoinedData(output); - return EFetchResult::One; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index d6cbdc65ab1a..a0d729b627d7 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -154,21 +154,21 @@ void TTable::ResetIterator() { } // Checks if there are more tuples and sets bucketId and tupleId to next valid. -inline bool HasMoreTuples(std::vector & tableBucketsStats, ui64 & bucketId, ui64 & tupleId ) { +inline bool HasMoreTuples(std::vector & tableBucketsStats, ui64 & bucketId, ui64 & tupleId, ui64 bucketLimit ) { - if (bucketId >= tableBucketsStats.size()) return false; + if (bucketId >= bucketLimit) return false; if ( tupleId >= tableBucketsStats[bucketId].TuplesNum ) { tupleId = 0; bucketId ++; - if (bucketId == tableBucketsStats.size()) { + if (bucketId == bucketLimit) { return false; } while( tableBucketsStats[bucketId].TuplesNum == 0 ) { bucketId ++; - if (bucketId == tableBucketsStats.size()) { + if (bucketId == bucketLimit) { return false; } } @@ -181,7 +181,7 @@ inline bool HasMoreTuples(std::vector & tableBucketsStats, ui // Returns value of next tuple. Returs true if there are more tuples bool TTable::NextTuple(TupleData & td){ - if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex )) { + if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex, TableBucketsStats.size())) { GetTupleData(CurrIterBucket, CurrIterIndex, td); CurrIterIndex++; return true; @@ -767,15 +767,15 @@ inline bool HasRightIdMatch(ui64 currId, ui64 & rightIdIter, const std::vectorTableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) + if (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) { JoinTable1->GetTupleData(JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, td1); - if (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) + if (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit)) { JoinTable2->GetTupleData(JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, td2); JoinTable2->CurrIterIndex++; @@ -786,7 +786,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { JoinTable2->CurrIterBucket = 0; JoinTable2->CurrIterIndex = 0; JoinTable1->CurrIterIndex++; - return NextJoinedData(td1, td2); + return NextJoinedData(td1, td2, bucketLimit); } } else @@ -794,7 +794,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { } if ( JoinKind == EJoinKind::Inner ) { - while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) { ui32 tupleId2; if (HasJoinedTupleId(JoinTable1, tupleId2)) { @@ -810,7 +810,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { } if ( JoinKind == EJoinKind::Left ) { - while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) { ui32 tupleId2; if (HasJoinedTupleId(JoinTable1, tupleId2)) { @@ -845,7 +845,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { } if ( JoinKind == EJoinKind::Right ) { - while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) { ui32 tupleId2; if (HasJoinedTupleId(JoinTable1, tupleId2)) { @@ -886,7 +886,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { if ( RightTableBatch_ && HasMoreRightTuples_ ) return false; - while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) { ui32 tupleId2; bool globalMatchedId = false; @@ -917,7 +917,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { if (LeftTableBatch_ && HasMoreLeftTuples_ ) return false; - while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) { ui32 tupleId2; bool globalMatchedId = false; @@ -949,7 +949,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { if (RightTableBatch_ && HasMoreRightTuples_ ) return false; - while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) { ui32 tupleId2; if ( !RightTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2)) @@ -983,7 +983,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { if (LeftTableBatch_ && HasMoreLeftTuples_ ) return false; - while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) { ui32 tupleId2; if ( !LeftTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2)) { @@ -1010,7 +1010,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { } if ( JoinKind == EJoinKind::Full ) { - if(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + if(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) { ui32 tupleId2; if (HasJoinedTupleId(JoinTable1, tupleId2)) { @@ -1036,7 +1036,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { Table2Initialized_ = true; } - while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) { + while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit)) { if (CurrIterBucket != JoinTable2->CurrIterBucket) { CurrIterBucket = JoinTable2->CurrIterBucket; @@ -1060,7 +1060,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { } if ( JoinKind == EJoinKind::Exclusion ) { - while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) { + while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) { ui32 tupleId2; if (HasJoinedTupleId(JoinTable1, tupleId2)) { @@ -1078,7 +1078,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { td1.AllNulls = true; - while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) { + while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit)) { if (CurrIterBucket != JoinTable2->CurrIterBucket) { CurrIterBucket = JoinTable2->CurrIterBucket; @@ -1433,71 +1433,85 @@ void TTableBucketSpiller::ProcessBucketRestoration() { while (NextVectorToProcess != ENextVectorToProcess::None) { switch (NextVectorToProcess) { case ENextVectorToProcess::KeyAndVals: - if (StateUi64Adapter.HasRunningAsyncIoOperation()) return; + if (StateUi64Adapter.IsDataReady()) { + AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector()); + NextVectorToProcess = ENextVectorToProcess::DataIntVals; + break; + } - if (!StateUi64Adapter.IsDataReady()) { + if (StateUi64Adapter.IsAcceptingDataRequests()) { StateUi64Adapter.RequestNextVector(); - if (StateUi64Adapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.KeyIntVals, StateUi64Adapter.ExtractVector()); - NextVectorToProcess = ENextVectorToProcess::DataIntVals; - break; + return; case ENextVectorToProcess::DataIntVals: - if (StateUi64Adapter.HasRunningAsyncIoOperation()) return; + if (StateUi64Adapter.IsDataReady()) { + AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector()); + NextVectorToProcess = ENextVectorToProcess::StringsValues; + break; + } - if (!StateUi64Adapter.IsDataReady()) { + if (StateUi64Adapter.IsAcceptingDataRequests()) { StateUi64Adapter.RequestNextVector(); - if (StateUi64Adapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.DataIntVals, StateUi64Adapter.ExtractVector()); - NextVectorToProcess = ENextVectorToProcess::StringsValues; - break; + return; case ENextVectorToProcess::StringsValues: - if (StateCharAdapter.HasRunningAsyncIoOperation()) return; + if (StateCharAdapter.IsDataReady()) { + AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector()); + NextVectorToProcess = ENextVectorToProcess::StringsOffsets; + break; + } - if (!StateCharAdapter.IsDataReady()) { + if (StateCharAdapter.IsAcceptingDataRequests()) { StateCharAdapter.RequestNextVector(); - if (StateCharAdapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.StringsValues, StateCharAdapter.ExtractVector()); - NextVectorToProcess = ENextVectorToProcess::StringsOffsets; - break; + return; case ENextVectorToProcess::StringsOffsets: - if (StateUi32Adapter.HasRunningAsyncIoOperation()) return; + if (StateUi32Adapter.IsDataReady()) { + AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector()); + NextVectorToProcess = ENextVectorToProcess::InterfaceValues; + break; + } - if (!StateUi32Adapter.IsDataReady()) { + if (StateUi32Adapter.IsAcceptingDataRequests()) { StateUi32Adapter.RequestNextVector(); - if (StateUi32Adapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.StringsOffsets, StateUi32Adapter.ExtractVector()); - NextVectorToProcess = ENextVectorToProcess::InterfaceValues; - break; + return; case ENextVectorToProcess::InterfaceValues: - if (StateCharAdapter.HasRunningAsyncIoOperation()) return; + if (StateCharAdapter.IsDataReady()) { + AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector()); + NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets; + break; + } - if (!StateCharAdapter.IsDataReady()) { + if (StateCharAdapter.IsAcceptingDataRequests()) { StateCharAdapter.RequestNextVector(); - if (StateCharAdapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.InterfaceValues, StateCharAdapter.ExtractVector()); - NextVectorToProcess = ENextVectorToProcess::InterfaceOffsets; - break; + return; case ENextVectorToProcess::InterfaceOffsets: - if (StateUi32Adapter.HasRunningAsyncIoOperation()) return; + if (StateUi32Adapter.IsDataReady()) { + AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector()); + + SpilledBucketsCount--; + if (SpilledBucketsCount == 0) { + NextVectorToProcess = ENextVectorToProcess::None; + State = EState::WaitingForExtraction; + } else { + NextVectorToProcess = ENextVectorToProcess::KeyAndVals; + } + + break; + } - if (!StateUi32Adapter.IsDataReady()) { + if (StateUi32Adapter.IsAcceptingDataRequests()) { StateUi32Adapter.RequestNextVector(); - if (StateUi32Adapter.HasRunningAsyncIoOperation()) return; + break; } - AppendVector(CurrentBucket.InterfaceOffsets, StateUi32Adapter.ExtractVector()); - SpilledBucketsCount--; - if (SpilledBucketsCount == 0) { - NextVectorToProcess = ENextVectorToProcess::None; - State = EState::WaitingForExtraction; - } else { - NextVectorToProcess = ENextVectorToProcess::KeyAndVals; - } - break; + return; default: return; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h index 9053ac630316..b3c4505546c7 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h @@ -277,7 +277,11 @@ class TTable { void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false, ui32 fromBucket = 0, ui32 toBucket = NumberOfBuckets); // Returns next jointed tuple data. Returs true if there are more tuples - bool NextJoinedData(TupleData& td1, TupleData& td2); + bool NextJoinedData(TupleData& td1, TupleData& td2, ui64 bucketLimit); + + bool NextJoinedData(TupleData& td1, TupleData& td2) { + return NextJoinedData(td1, td2, JoinTable1->TableBucketsStats.size()); + } // Creates buckets that support spilling. void InitializeBucketSpillers(ISpiller::TPtr spiller); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp index 6dc6ef04ca7a..ed9a26bc9f53 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp @@ -562,7 +562,8 @@ using TBase = TComputationValue>; EOperatingMode GetMode() const { return Mode; } bool HasMemoryForProcessing() const { - return !TlsAllocState->IsMemoryYellowZoneEnabled(); + return true; + // return !TlsAllocState->IsMemoryYellowZoneEnabled(); } bool IsReadFromChannelFinished() const { diff --git a/ydb/library/yql/minikql/computation/mkql_vector_spiller_adapter.h b/ydb/library/yql/minikql/computation/mkql_vector_spiller_adapter.h index 3e5b9a3d2df3..d7397b8c90c0 100644 --- a/ydb/library/yql/minikql/computation/mkql_vector_spiller_adapter.h +++ b/ydb/library/yql/minikql/computation/mkql_vector_spiller_adapter.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include @@ -115,7 +116,7 @@ class TVectorSpillerAdapter { ///State will be changed to DataREady without any async read operation. ExtractVector is expected ///to be called immediately. void RequestNextVector() { - MKQL_ENSURE(State == EState::AcceptingDataRequests, "Internal logic error"); + MKQL_ENSURE(State == EState::AcceptingDataRequests, std::format("[MISHA] Actual {}\n", (int)State)); MKQL_ENSURE(CurrentVector.empty(), "Internal logic error"); MKQL_ENSURE(!StoredChunksElementsCount.empty(), "Internal logic error"); diff --git a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h index 9185c43c485f..6078318b5e34 100644 --- a/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h +++ b/ydb/library/yql/providers/dq/worker_manager/local_worker_manager.h @@ -30,7 +30,7 @@ namespace NYql::NDqs { NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory; THashMap ClusterNamesMapping; - ui64 MkqlInitialMemoryLimit = 8_GB; + ui64 MkqlInitialMemoryLimit = 50_MB; ui64 MkqlTotalMemoryLimit = 0; ui64 MkqlMinAllocSize = 30_MB; ui64 MkqlProgramHardMemoryLimit = 0;