Skip to content

Commit

Permalink
Invalidate query compilation cache entries with outdated VIEWs (#1960)
Browse files Browse the repository at this point in the history
In this PR we add the following algorithm for invalidating cache entries for outdated VIEWs:

1. Store path ids and schema versions of the views that were used in the query in the cache entries, so they can be accessed later.
2. Whenever we retrieve a compilation result from cache, send a request for SchemeCache to check if the schema version of the views used in this query (if any) has not changed since we compiled this query.
3. Send a recompilation request if any view is outdated.

There are two important things to note about this solution:

- We make a SchemeCache request for each repeated query and there is a lot of these in an OLTP-focused database like YDB. However, we have already been sending these request for preliminary (this is not the last check of schema version mismatch (at least for tables)) cache invalidation for tables, so views should not incur an additional performance impact here.
- This solution does not guarantee strong consistency for queries using views, because query cache invalidation will not happen instantly after the view definition is updated. The node should get an update from the SchemeCache, which takes some time.
  • Loading branch information
jepett0 authored Feb 18, 2024
1 parent 212a0c0 commit b92e8dc
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 8 deletions.
12 changes: 12 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,18 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase {
if (!AddCluster(table, res, input, ctx)) {
return TStatus::Error;
}

if (const auto& preparingQuery = SessionCtx->Query().PreparingQuery;
preparingQuery
&& res.Metadata->Kind == EKikimrTableKind::View
) {
const auto& viewMetadata = *res.Metadata;
auto* viewInfo = preparingQuery->MutablePhysicalQuery()->MutableViewInfos()->Add();
auto* pathId = viewInfo->MutableTableId();
pathId->SetOwnerId(viewMetadata.PathId.OwnerId());
pathId->SetTableId(viewMetadata.PathId.TableId());
viewInfo->SetSchemaVersion(viewMetadata.SchemaVersion);
}
} else {
TIssueScopeGuard issueScope(ctx.IssueManager, [input, &table, &ctx]() {
return MakeIntrusive<TIssue>(TIssue(ctx.GetPosition(input->Pos()), TStringBuilder()
Expand Down
11 changes: 11 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,24 @@ bool TKqpQueryState::EnsureTableVersions(const TEvTxProxySchemeCache::TEvNavigat
return true;
}

void TKqpQueryState::FillViews(const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpTableInfo>& views) {
for (const auto& view : views) {
const auto& pathId = view.GetTableId();
const auto schemaVersion = view.GetSchemaVersion();
auto [it, isInserted] = TableVersions.emplace(TTableId(pathId.GetOwnerId(), pathId.GetTableId()), schemaVersion);
if (!isInserted) {
Y_ENSURE(it->second == schemaVersion);
}
}
}

std::unique_ptr<TEvTxProxySchemeCache::TEvNavigateKeySet> TKqpQueryState::BuildNavigateKeySet() {
TableVersions.clear();

for (const auto& tx : PreparedQuery->GetPhysicalQuery().GetTransactions()) {
FillTables(tx);
}
FillViews(PreparedQuery->GetPhysicalQuery().GetViewInfos());

auto navigate = MakeHolder<NSchemeCache::TSchemeCacheNavigate>();
navigate->DatabaseName = Database;
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ class TKqpQueryState : public TNonCopyable {
}
}

void FillViews(const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpTableInfo>& views);

bool NeedCheckTableVersions() const {
return CompileStats.FromCache;
}
Expand Down
94 changes: 86 additions & 8 deletions ydb/core/kqp/ut/view/view_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
#include <ydb/library/yql/sql/sql.h>
#include <ydb/library/yql/utils/log/log.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>

#include <util/folder/filelist.h>

#include <format>

using namespace NKikimr;
using namespace NKikimr::NKqp;
using namespace NYdb;
using namespace NYdb::NTable;

namespace {
Expand Down Expand Up @@ -75,8 +77,37 @@ TDataQueryResult ExecuteDataModificationQuery(TSession& session,
return result;
}

TString GetYsonResults(TSession& session, const TString& query, const TExecDataQuerySettings& settings = {}) {
return FormatResultSetYson(ExecuteDataModificationQuery(session, query, settings).GetResultSet(0));
TValue GetSingleResult(const TDataQueryResult& rawResults) {
auto resultSetParser = rawResults.GetResultSetParser(0);
UNIT_ASSERT(resultSetParser.TryNextRow());
return resultSetParser.GetValue(0);
}

TValue GetSingleResult(TSession& session, const TString& query, const TExecDataQuerySettings& settings = {}) {
return GetSingleResult(ExecuteDataModificationQuery(session, query, settings));
}

TInstant GetTimestamp(const TValue& value) {
return TValueParser(value).GetTimestamp();
}

int GetInteger(const TValue& value) {
return TValueParser(value).GetInt32();
}

TMaybe<bool> GetFromCacheStat(const TQueryStats& stats) {
const auto& proto = TProtoAccessor::GetProto(stats);
if (!proto.Hascompilation()) {
return Nothing();
}
return proto.Getcompilation().Getfrom_cache();
}

void AssertFromCache(const TMaybe<TQueryStats>& stats, bool expectedValue) {
UNIT_ASSERT(stats.Defined());
const auto isFromCache = GetFromCacheStat(*stats);
UNIT_ASSERT_C(isFromCache.Defined(), stats->ToString());
UNIT_ASSERT_VALUES_EQUAL_C(*isFromCache, expectedValue, stats->ToString());
}

void CompareResults(const TDataQueryResult& first, const TDataQueryResult& second) {
Expand Down Expand Up @@ -384,6 +415,53 @@ Y_UNIT_TEST_SUITE(TSelectFromViewTest) {
ExecuteDataDefinitionQuery(session, ReadWholeFile(pathPrefix + "drop_view.sql"));
}
}

Y_UNIT_TEST(QueryCacheIsUpdated) {
TKikimrRunner kikimr(TKikimrSettings().SetWithSampleTables(false));
EnableViewsFeatureFlag(kikimr);
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();

constexpr const char* viewName = "TheView";

const auto getCreationQuery = [&viewName](const char* innerQuery) -> TString {
return std::format(R"(
CREATE VIEW {} WITH (security_invoker = TRUE) AS {};
)",
viewName,
innerQuery
);
};
constexpr const char* firstInnerQuery = "SELECT 1";
ExecuteDataDefinitionQuery(session, getCreationQuery(firstInnerQuery));

const TString selectFromViewQuery = std::format(R"(
SELECT * FROM {};
)",
viewName
);
TExecDataQuerySettings queryExecutionSettings;
queryExecutionSettings.KeepInQueryCache(true);
queryExecutionSettings.CollectQueryStats(ECollectQueryStatsMode::Basic);
ExecuteDataModificationQuery(session, selectFromViewQuery, queryExecutionSettings);
// make sure the server side cache is working by calling the same query twice
const auto cachedQueryRawResult = ExecuteDataModificationQuery(session, selectFromViewQuery, queryExecutionSettings);
AssertFromCache(cachedQueryRawResult.GetStats(), true);
UNIT_ASSERT_VALUES_EQUAL(GetInteger(GetSingleResult(cachedQueryRawResult)), 1);

// recreate the view with a different query inside
ExecuteDataDefinitionQuery(session, std::format(R"(
DROP VIEW {};
)",
viewName
)
);
constexpr const char* secondInnerQuery = "SELECT 2";
ExecuteDataDefinitionQuery(session, getCreationQuery(secondInnerQuery));

const auto secondCallRawResult = ExecuteDataModificationQuery(session, selectFromViewQuery, queryExecutionSettings);
AssertFromCache(secondCallRawResult.GetStats(), false);
UNIT_ASSERT_VALUES_EQUAL(GetInteger(GetSingleResult(secondCallRawResult)), 2);
}
}

Y_UNIT_TEST_SUITE(TEvaluateExprInViewTest) {
Expand Down Expand Up @@ -414,9 +492,9 @@ Y_UNIT_TEST_SUITE(TEvaluateExprInViewTest) {
TExecDataQuerySettings queryExecutionSettings;
queryExecutionSettings.KeepInQueryCache(true);
const auto executeTwice = [&](const TString& query) {
return TVector<TString>{
GetYsonResults(session, query, queryExecutionSettings),
GetYsonResults(session, query, queryExecutionSettings)
return TVector<TInstant>{
GetTimestamp(GetSingleResult(session, query, queryExecutionSettings)),
GetTimestamp(GetSingleResult(session, query, queryExecutionSettings))
};
};
const auto viewResults = executeTwice(selectFromViewQuery);
Expand Down Expand Up @@ -455,9 +533,9 @@ Y_UNIT_TEST_SUITE(TEvaluateExprInViewTest) {
TExecDataQuerySettings queryExecutionSettings;
queryExecutionSettings.KeepInQueryCache(true);
const auto executeTwice = [&](const TString& query) {
return TVector<TString>{
GetYsonResults(session, query, queryExecutionSettings),
GetYsonResults(session, query, queryExecutionSettings)
return TVector<TInstant>{
GetTimestamp(GetSingleResult(session, query, queryExecutionSettings)),
GetTimestamp(GetSingleResult(session, query, queryExecutionSettings))
};
};
const auto viewResults = executeTwice(selectFromViewQuery);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/protos/kqp_physical.proto
Original file line number Diff line number Diff line change
Expand Up @@ -507,4 +507,6 @@ message TKqpPhyQuery {
bool HasUncommittedChangesRead = 9;

string QueryDiagnostics = 10;

repeated TKqpTableInfo ViewInfos = 11;
}

0 comments on commit b92e8dc

Please sign in to comment.