Skip to content

Commit

Permalink
Merge 5ec7daa into 356726c
Browse files Browse the repository at this point in the history
  • Loading branch information
lll-phill-lll authored May 24, 2024
2 parents 356726c + 5ec7daa commit 82225ad
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 8 deletions.
2 changes: 1 addition & 1 deletion ydb/library/yql/minikql/aligned_page_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ void TAlignedPagePoolImpl<T>::Free(void* ptr, size_t size) noexcept {
template<typename T>
void TAlignedPagePoolImpl<T>::UpdateMemoryYellowZone() {
if (Limit == 0) return;
if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return;
// if (IncreaseMemoryLimitCallback && !IsMaximumLimitValueReached) return;

ui8 usedMemoryPercent = 100 * GetUsed() / Limit;
if (usedMemoryPercent >= EnableMemoryYellowZoneThreshold) {
Expand Down
13 changes: 9 additions & 4 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "mkql_grace_join.h"
#include "mkql_grace_join_imp.h"

#include <format>
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/public/udf/udf_value.h>
#include <ydb/library/yql/public/decimal/yql_decimal_serialize.h>
Expand Down Expand Up @@ -631,13 +632,15 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
}

bool IsSwitchToSpillingModeCondition() const {
return false;
// return false;
// TODO: YQL-18033
// return !HasMemoryForProcessing();
return !HasMemoryForProcessing();
}


void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {

std::cerr << std::format("[MISHA] switching mode {}->{}\n", (int)Mode, (int)mode);
switch(mode) {
case EOperatingMode::InMemory: {
MKQL_ENSURE(false, "Internal logic error");
Expand Down Expand Up @@ -772,7 +775,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
}
if (isYield) return EFetchResult::Yield;

if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize ) {
/*if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize ) {
*PartialJoinCompleted = true;
JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
JoinedTablePtr->ResetIterator();
Expand All @@ -783,7 +786,7 @@ class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpilli
JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
JoinedTablePtr->ResetIterator();
}
}*/

if (!*HaveMoreRightRows && !*HaveMoreLeftRows && !*PartialJoinCompleted) {
*PartialJoinCompleted = true;
Expand Down Expand Up @@ -855,10 +858,12 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
if (HasRunningAsyncOperation()) return EFetchResult::Yield;

if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
std::cerr << std::format("[MISHA] loading bucket {}\n", NextBucketToJoin);
LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
}

if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
std::cerr << std::format("[MISHA] loading bucket {}\n", NextBucketToJoin);
RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
}

Expand Down
10 changes: 8 additions & 2 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "mkql_grace_join_imp.h"

#include <format>
#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/utils/log/log.h>

Expand Down Expand Up @@ -263,6 +264,7 @@ void ResizeHashTable(KeysHashTable &t, ui64 newSlots){
// Joins two tables and returns join result in joined table. Tuples of joined table could be received by
// joined table iterator
void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples, ui32 fromBucket, ui32 toBucket ) {
std::cerr << std::format("[MISHA] joining buckets {}->{}\n", fromBucket, toBucket);
if ( hasMoreLeftTuples )
LeftTableBatch_ = true;

Expand Down Expand Up @@ -297,13 +299,17 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef

for (ui64 bucket = fromBucket; bucket < toBucket; bucket++) {

// std::cerr << std::format("[MISHA] bucket: {}", bucket);

joinResults.clear();
TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket];
TTableBucket * bucket2 = &JoinTable2->TableBuckets[bucket];

ui64 tuplesNum1 = JoinTable1->TableBucketsStats[bucket].TuplesNum;
ui64 tuplesNum2 = JoinTable2->TableBucketsStats[bucket].TuplesNum;

std::cerr << std::format("[MISHA] bucket: {}, tuplesNum1: {}, tuplesNum2: {}, KIVSize1: {}, KIVSize2: {}\n", bucket, tuplesNum1, tuplesNum2, bucket1->KeyIntVals.size(), bucket2->KeyIntVals.size());

ui64 headerSize1 = JoinTable1->HeaderSize;
ui64 headerSize2 = JoinTable2->HeaderSize;
ui64 nullsSize1 = JoinTable1->NullsBitmapSize_;
Expand Down Expand Up @@ -1154,8 +1160,8 @@ bool TTable::TryToReduceMemoryAndWait() {
}
}

if (largestBucketSize) return false;

if (!largestBucketSize) return false;
std::cerr << std::format("[MISHA] spilling bucket {} of size {}\n", largestBucketIndex, largestBucketSize);
TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex]));
TableBuckets[largestBucketIndex] = TTableBucket{};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ namespace NYql::NDqs {
NDq::NTaskRunnerActor::ITaskRunnerActorFactory::TPtr TaskRunnerActorFactory;
THashMap<TString, TString> ClusterNamesMapping;

ui64 MkqlInitialMemoryLimit = 8_GB;
ui64 MkqlInitialMemoryLimit = 50_MB;
ui64 MkqlTotalMemoryLimit = 0;
ui64 MkqlMinAllocSize = 30_MB;
ui64 MkqlProgramHardMemoryLimit = 0;
Expand Down

0 comments on commit 82225ad

Please sign in to comment.