Skip to content

Commit

Permalink
Revert "KQP computation pattern cache serialized program (ydb-platfor…
Browse files Browse the repository at this point in the history
…m#634)" (ydb-platform#750)

This reverts commit d4ee658.
  • Loading branch information
kitaisreal authored and adameat committed Dec 29, 2023
1 parent a943835 commit a5462e4
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class TKqpCompileComputationPatternService : public TActorBootstrapped<TKqpCompi
return;
}

THashMap<NMiniKQL::TSerializedProgram, std::shared_ptr<NMiniKQL::TPatternCacheEntry>> patternsToCompile;
THashMap<TString, std::shared_ptr<NMiniKQL::TPatternCacheEntry>> patternsToCompile;
patternCache->GetPatternsToCompile(patternsToCompile);

TVector<std::pair<TPatternToCompile, size_t>> patternsToCompileWithAccessTimes;
Expand Down Expand Up @@ -115,7 +115,7 @@ class TKqpCompileComputationPatternService : public TActorBootstrapped<TKqpCompi
TIntrusivePtr<TKqpCounters> Counters;

struct TPatternToCompile {
NMiniKQL::TSerializedProgram SerializedProgram;
TString SerializedProgram;
std::shared_ptr<NMiniKQL::TPatternCacheEntry> Entry;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
return CurrentPatternsCompiledCodeSizeInBytes;
}

std::shared_ptr<TPatternCacheEntry>* Find(const TSerializedProgram& serializedProgram) {
std::shared_ptr<TPatternCacheEntry>* Find(const TString& serializedProgram) {
auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
if (it == SerializedProgramToPatternCacheHolder.end()) {
return nullptr;
Expand All @@ -44,7 +44,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
return &it->second.Entry;
}

void Insert(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
void Insert(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
auto [it, inserted] = SerializedProgramToPatternCacheHolder.emplace(std::piecewise_construct,
std::forward_as_tuple(serializedProgram),
std::forward_as_tuple(serializedProgram, entry));
Expand All @@ -69,7 +69,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
ClearIfNeeded();
}

void NotifyPatternCompiled(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
void NotifyPatternCompiled(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry>& entry) {
auto it = SerializedProgramToPatternCacheHolder.find(serializedProgram);
if (it == SerializedProgramToPatternCacheHolder.end()) {
return;
Expand Down Expand Up @@ -108,7 +108,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
* Most recently accessed items are in back of the lists, least recently accessed items are in front of the lists.
*/
struct TPatternCacheHolder : public TIntrusiveListItem<TPatternCacheHolder, TPatternLRUListTag>, TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag> {
TPatternCacheHolder(TSerializedProgram serializedProgram, std::shared_ptr<TPatternCacheEntry> entry)
TPatternCacheHolder(TString serializedProgram, std::shared_ptr<TPatternCacheEntry> entry)
: SerializedProgram(std::move(serializedProgram))
, Entry(std::move(entry))
{}
Expand All @@ -121,7 +121,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
return !TIntrusiveListItem<TPatternCacheHolder, TCompiledPatternLRUListTag>::Empty();
}

TSerializedProgram SerializedProgram;
TString SerializedProgram;
std::shared_ptr<TPatternCacheEntry> Entry;
};

Expand Down Expand Up @@ -195,7 +195,7 @@ class TComputationPatternLRUCache::TLRUPatternCacheImpl
size_t CurrentCompiledPatternsSize = 0;
size_t CurrentPatternsCompiledCodeSizeInBytes = 0;

THashMap<TSerializedProgram, TPatternCacheHolder> SerializedProgramToPatternCacheHolder;
THashMap<TString, TPatternCacheHolder> SerializedProgramToPatternCacheHolder;
TIntrusiveList<TPatternCacheHolder, TPatternLRUListTag> LRUPatternList;
TIntrusiveList<TPatternCacheHolder, TCompiledPatternLRUListTag> LRUCompiledPatternList;
};
Expand Down Expand Up @@ -223,7 +223,7 @@ TComputationPatternLRUCache::~TComputationPatternLRUCache() {
CleanCache();
}

std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TSerializedProgram& serializedProgram) {
std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TString& serializedProgram) {
std::lock_guard<std::mutex> lock(Mutex);
if (auto it = Cache->Find(serializedProgram)) {
++*Hits;
Expand All @@ -238,7 +238,7 @@ std::shared_ptr<TPatternCacheEntry> TComputationPatternLRUCache::Find(const TSer
return {};
}

TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TSerializedProgram& serializedProgram) {
TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscribe(const TString& serializedProgram) {
std::lock_guard lock(Mutex);
if (auto it = Cache->Find(serializedProgram)) {
++*Hits;
Expand All @@ -263,7 +263,7 @@ TComputationPatternLRUCache::TTicket TComputationPatternLRUCache::FindOrSubscrib
return TTicket(serializedProgram, false, promise, nullptr);
}

void TComputationPatternLRUCache::EmplacePattern(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
void TComputationPatternLRUCache::EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
Y_DEBUG_ABORT_UNLESS(patternWithEnv && patternWithEnv->Pattern);
TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;

Expand All @@ -290,7 +290,7 @@ void TComputationPatternLRUCache::EmplacePattern(const TSerializedProgram& seria
}
}

void TComputationPatternLRUCache::NotifyPatternCompiled(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
void TComputationPatternLRUCache::NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv) {
std::lock_guard lock(Mutex);
Cache->NotifyPatternCompiled(serializedProgram, patternWithEnv);
}
Expand All @@ -309,7 +309,7 @@ void TComputationPatternLRUCache::CleanCache() {
Cache->Clear();
}

void TComputationPatternLRUCache::AccessPattern(const TSerializedProgram & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
void TComputationPatternLRUCache::AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry) {
if (!Configuration.PatternAccessTimesBeforeTryToCompile || entry->Pattern->IsCompiled()) {
return;
}
Expand All @@ -321,11 +321,11 @@ void TComputationPatternLRUCache::AccessPattern(const TSerializedProgram & seria
}
}

void TComputationPatternLRUCache::NotifyMissing(const TSerializedProgram& serializedProgram) {
void TComputationPatternLRUCache::NotifyMissing(const TString& serialized) {
TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>> subscribers;
{
std::lock_guard<std::mutex> lock(Mutex);
auto notifyIt = Notify.find(serializedProgram);
auto notifyIt = Notify.find(serialized);
if (notifyIt != Notify.end()) {
subscribers.swap(notifyIt->second);
Notify.erase(notifyIt);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#pragma once

#include "mkql_computation_node.h"
#include "mkql_serialized_program.h"

#include <ydb/library/yql/minikql/mkql_node.h>
#include <library/cpp/threading/future/future.h>
Expand Down Expand Up @@ -58,16 +57,16 @@ class TComputationPatternLRUCache {
public:
class TTicket : private TNonCopyable {
public:
TTicket(const TSerializedProgram& serializedProgram, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
: SerializedProgram(serializedProgram)
TTicket(const TString& serialized, bool isOwned, const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>>& future, TComputationPatternLRUCache* cache)
: Serialized(serialized)
, IsOwned(isOwned)
, Future(future)
, Cache(cache)
{}

~TTicket() {
if (Cache) {
Cache->NotifyMissing(SerializedProgram);
Cache->NotifyMissing(Serialized);
}
}

Expand All @@ -85,7 +84,7 @@ class TComputationPatternLRUCache {
}

private:
const TSerializedProgram SerializedProgram;
const TString Serialized;
const bool IsOwned;
const NThreading::TFuture<std::shared_ptr<TPatternCacheEntry>> Future;
TComputationPatternLRUCache* Cache;
Expand Down Expand Up @@ -125,13 +124,13 @@ class TComputationPatternLRUCache {
return std::make_shared<TPatternCacheEntry>(useAlloc);
}

std::shared_ptr<TPatternCacheEntry> Find(const TSerializedProgram& serializedProgram);
std::shared_ptr<TPatternCacheEntry> Find(const TString& serializedProgram);

TTicket FindOrSubscribe(const TSerializedProgram& serializedProgram);
TTicket FindOrSubscribe(const TString& serializedProgram);

void EmplacePattern(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
void EmplacePattern(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);

void NotifyPatternCompiled(const TSerializedProgram& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);
void NotifyPatternCompiled(const TString& serializedProgram, std::shared_ptr<TPatternCacheEntry> patternWithEnv);

size_t GetSize() const;

Expand Down Expand Up @@ -160,27 +159,27 @@ class TComputationPatternLRUCache {
return PatternsToCompile.size();
}

void GetPatternsToCompile(THashMap<TSerializedProgram, std::shared_ptr<TPatternCacheEntry>> & result) {
void GetPatternsToCompile(THashMap<TString, std::shared_ptr<TPatternCacheEntry>> & result) {
std::lock_guard lock(Mutex);
result.swap(PatternsToCompile);
}

private:
void AccessPattern(const TSerializedProgram & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);
void AccessPattern(const TString & serializedProgram, std::shared_ptr<TPatternCacheEntry> & entry);

void NotifyMissing(const TSerializedProgram& serializedProgram);
void NotifyMissing(const TString& serialized);

static constexpr size_t CacheMaxElementsSize = 10000;

friend class TTicket;

mutable std::mutex Mutex;
THashMap<TSerializedProgram, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;
THashMap<TString, TMaybe<TVector<NThreading::TPromise<std::shared_ptr<TPatternCacheEntry>>>>> Notify;

class TLRUPatternCacheImpl;
std::unique_ptr<TLRUPatternCacheImpl> Cache;

THashMap<TSerializedProgram, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;
THashMap<TString, std::shared_ptr<TPatternCacheEntry>> PatternsToCompile;

const Config Configuration;

Expand Down
49 changes: 0 additions & 49 deletions ydb/library/yql/minikql/computation/mkql_serialized_program.h

This file was deleted.

0 comments on commit a5462e4

Please sign in to comment.