Skip to content

Commit

Permalink
toremove
Browse files Browse the repository at this point in the history
  • Loading branch information
lll-phill-lll committed May 30, 2024
1 parent d1a6793 commit 6453f97
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 25 deletions.
7 changes: 5 additions & 2 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -863,26 +863,29 @@ EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const*
if (IsRestoringSpilledBuckets()) return EFetchResult::Yield;

if (LeftPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
std::cerr << std::format("[MISHA][LEFT][{}] Extracting bucket\n", NextBucketToJoin);
LeftPacker->TablePtr->PrepareBucket(NextBucketToJoin);
}

if (RightPacker->TablePtr->IsSpilledBucketWaitingForExtraction(NextBucketToJoin)) {
std::cerr << std::format("[MISHA][RIGHT][{}] Extracting bucket\n", NextBucketToJoin);
RightPacker->TablePtr->PrepareBucket(NextBucketToJoin);
}

if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
std::cerr << std::format("[MISHA][LEFT][{}] Loading bucket\n", NextBucketToJoin);
LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
}

if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
std::cerr << std::format("[MISHA][RIGHT][{}] Loading bucket\n", NextBucketToJoin);
RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
}

if (LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
if (*PartialJoinCompleted) {
while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) {
while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData, NextBucketToJoin + 1)) {
UnpackJoinedData(output);

return EFetchResult::One;
}

Expand Down
40 changes: 20 additions & 20 deletions ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,21 @@ void TTable::ResetIterator() {
}

// Checks if there are more tuples and sets bucketId and tupleId to next valid.
inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui64 & bucketId, ui64 & tupleId ) {
inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui64 & bucketId, ui64 & tupleId, ui64 bucketLimit ) {

if (bucketId >= tableBucketsStats.size()) return false;
if (bucketId >= bucketLimit) return false;

if ( tupleId >= tableBucketsStats[bucketId].TuplesNum ) {
tupleId = 0;
bucketId ++;

if (bucketId == tableBucketsStats.size()) {
if (bucketId == bucketLimit) {
return false;
}

while( tableBucketsStats[bucketId].TuplesNum == 0 ) {
bucketId ++;
if (bucketId == tableBucketsStats.size()) {
if (bucketId == bucketLimit) {
return false;
}
}
Expand All @@ -181,7 +181,7 @@ inline bool HasMoreTuples(std::vector<TTableBucketStats> & tableBucketsStats, ui

// Returns value of next tuple. Returs true if there are more tuples
bool TTable::NextTuple(TupleData & td){
if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex )) {
if (HasMoreTuples(TableBucketsStats, CurrIterBucket, CurrIterIndex, TableBucketsStats.size())) {
GetTupleData(CurrIterBucket, CurrIterIndex, td);
CurrIterIndex++;
return true;
Expand Down Expand Up @@ -767,15 +767,15 @@ inline bool HasRightIdMatch(ui64 currId, ui64 & rightIdIter, const std::vector<u
}


bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
bool TTable::NextJoinedData( TupleData & td1, TupleData & td2, ui64 bucketLimit) {

if (JoinKind == EJoinKind::Cross) {

if (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex))
if (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit))
{
JoinTable1->GetTupleData(JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, td1);

if (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex))
if (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit))
{
JoinTable2->GetTupleData(JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, td2);
JoinTable2->CurrIterIndex++;
Expand All @@ -786,15 +786,15 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
JoinTable2->CurrIterBucket = 0;
JoinTable2->CurrIterIndex = 0;
JoinTable1->CurrIterIndex++;
return NextJoinedData(td1, td2);
return NextJoinedData(td1, td2, bucketLimit);
}
}
else
return false;
}

if ( JoinKind == EJoinKind::Inner ) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand All @@ -810,7 +810,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}

if ( JoinKind == EJoinKind::Left ) {
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand Down Expand Up @@ -845,7 +845,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}

if ( JoinKind == EJoinKind::Right ) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand Down Expand Up @@ -886,7 +886,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
if ( RightTableBatch_ && HasMoreRightTuples_ )
return false;

while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;

bool globalMatchedId = false;
Expand Down Expand Up @@ -917,7 +917,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
if (LeftTableBatch_ && HasMoreLeftTuples_ )
return false;

while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;

bool globalMatchedId = false;
Expand Down Expand Up @@ -949,7 +949,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
if (RightTableBatch_ && HasMoreRightTuples_ )
return false;

while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;

if ( !RightTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2))
Expand Down Expand Up @@ -983,7 +983,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
if (LeftTableBatch_ && HasMoreLeftTuples_ )
return false;

while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if ( !LeftTableBatch_ && HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand All @@ -1010,7 +1010,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}

if ( JoinKind == EJoinKind::Full ) {
if(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
if(HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand All @@ -1036,7 +1036,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
Table2Initialized_ = true;
}

while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) {
while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit)) {

if (CurrIterBucket != JoinTable2->CurrIterBucket) {
CurrIterBucket = JoinTable2->CurrIterBucket;
Expand All @@ -1060,7 +1060,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
}

if ( JoinKind == EJoinKind::Exclusion ) {
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex)) {
while (HasMoreTuples(JoinTable1->TableBucketsStats, JoinTable1->CurrIterBucket, JoinTable1->CurrIterIndex, bucketLimit)) {
ui32 tupleId2;
if (HasJoinedTupleId(JoinTable1, tupleId2))
{
Expand All @@ -1078,7 +1078,7 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {

td1.AllNulls = true;

while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex)) {
while (HasMoreTuples(JoinTable2->TableBucketsStats, JoinTable2->CurrIterBucket, JoinTable2->CurrIterIndex, bucketLimit)) {

if (CurrIterBucket != JoinTable2->CurrIterBucket) {
CurrIterBucket = JoinTable2->CurrIterBucket;
Expand Down
6 changes: 5 additions & 1 deletion ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,11 @@ class TTable {
void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false, ui32 fromBucket = 0, ui32 toBucket = NumberOfBuckets);

// Returns next jointed tuple data. Returs true if there are more tuples
bool NextJoinedData(TupleData& td1, TupleData& td2);
bool NextJoinedData(TupleData& td1, TupleData& td2, ui64 bucketLimit);

bool NextJoinedData(TupleData& td1, TupleData& td2) {
return NextJoinedData(td1, td2, JoinTable1->TableBucketsStats.size());
}

// Creates buckets that support spilling.
void InitializeBucketSpillers(ISpiller::TPtr spiller);
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/minikql/comp_nodes/mkql_wide_top_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,8 @@ using TBase = TComputationValue<TSpillingSupportState<Sort, HasCount>>;
EOperatingMode GetMode() const { return Mode; }

bool HasMemoryForProcessing() const {
return !TlsAllocState->IsMemoryYellowZoneEnabled();
return true;
// return !TlsAllocState->IsMemoryYellowZoneEnabled();
}

bool IsReadFromChannelFinished() const {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once

#include <format>
#include <queue>

#include <ydb/library/yql/minikql/defs.h>
Expand Down Expand Up @@ -115,7 +116,7 @@ class TVectorSpillerAdapter {
///State will be changed to DataREady without any async read operation. ExtractVector is expected
///to be called immediately.
void RequestNextVector() {
MKQL_ENSURE(State == EState::AcceptingDataRequests, "Internal logic error");
MKQL_ENSURE(State == EState::AcceptingDataRequests, std::format("[MISHA] Actual {}\n", (int)State));
MKQL_ENSURE(CurrentVector.empty(), "Internal logic error");
MKQL_ENSURE(!StoredChunksElementsCount.empty(), "Internal logic error");

Expand Down

0 comments on commit 6453f97

Please sign in to comment.