Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhancement](nereids) speedup sql cache with use variable as partition predicate #37915

Merged
merged 3 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.DataMaskPolicy;
import org.apache.doris.mysql.privilege.RowFilterPolicy;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.SqlCacheContext;
import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
import org.apache.doris.nereids.SqlCacheContext.FullColumnName;
import org.apache.doris.nereids.SqlCacheContext.FullTableName;
import org.apache.doris.nereids.SqlCacheContext.ScanTable;
Expand Down Expand Up @@ -125,7 +127,9 @@ public void tryAddFeSqlCache(ConnectContext connectContext, String sql) {

SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity.toString() + ":" + sql.trim();
String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
? currentUserIdentity.toString() + ":" + sql.trim()
: currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null
&& sqlCacheContext.getResultSetInFe().isPresent()) {
sqlCaches.put(key, sqlCacheContext);
Expand All @@ -143,7 +147,9 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz
}
SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity.toString() + ":" + sql.trim();
String key = sqlCacheContext.getCacheKeyType() == CacheKeyType.SQL
? currentUserIdentity.toString() + ":" + sql.trim()
: currentUserIdentity.toString() + ":" + DebugUtil.printId(sqlCacheContext.getOrComputeCacheKeyMd5());
if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) {
SqlCache cache = (SqlCache) analyzer.getCache();
sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum());
Expand All @@ -163,15 +169,44 @@ public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyz
/** tryParseSql */
public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, String sql) {
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
Env env = connectContext.getEnv();
String key = currentUserIdentity.toString() + ":" + sql.trim();
String key = currentUserIdentity + ":" + sql.trim();
SqlCacheContext sqlCacheContext = sqlCaches.getIfPresent(key);
if (sqlCacheContext == null) {
return Optional.empty();
}

// LOG.info("Total size: " + GraphLayout.parseInstance(sqlCacheContext).totalSize());

List<Variable> currentVariables = resolveUserVariables(sqlCacheContext);
if (usedVariablesChanged(currentVariables, sqlCacheContext)) {
String md5 = DebugUtil.printId(
sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)));

String md5CacheKey = currentUserIdentity + ":" + md5;
SqlCacheContext sqlCacheContextWithVariable = sqlCaches.getIfPresent(md5CacheKey);

// already exist cache in the fe, but the variable is different to this query,
// we should create another cache context in fe, use another cache key
connectContext.getStatementContext()
.getSqlCacheContext().ifPresent(ctx -> ctx.setCacheKeyType(CacheKeyType.MD5));

if (sqlCacheContextWithVariable != null) {
return tryParseSqlWithoutCheckVariable(
connectContext, md5CacheKey, sqlCacheContextWithVariable, currentUserIdentity
);
} else {
return Optional.empty();
}
} else {
return tryParseSqlWithoutCheckVariable(connectContext, key, sqlCacheContext, currentUserIdentity);
}
}

private Optional<LogicalSqlCache> tryParseSqlWithoutCheckVariable(
ConnectContext connectContext, String key,
SqlCacheContext sqlCacheContext, UserIdentity currentUserIdentity) {
Env env = connectContext.getEnv();

// check table and view and their columns authority
if (privilegeChanged(connectContext, env, sqlCacheContext)) {
return invalidateCache(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class SqlCacheContext {
private volatile PUniqueId cacheKeyMd5;
private volatile ResultSet resultSetInFe;

private volatile CacheKeyType cacheKeyType = CacheKeyType.SQL;

public SqlCacheContext(UserIdentity userIdentity, TUniqueId queryId) {
this.userIdentity = Objects.requireNonNull(userIdentity, "userIdentity cannot be null");
this.queryId = Objects.requireNonNull(queryId, "queryId cannot be null");
Expand Down Expand Up @@ -392,6 +394,14 @@ public void setResultSetInFe(ResultSet resultSetInFe) {
this.resultSetInFe = resultSetInFe;
}

public CacheKeyType getCacheKeyType() {
return cacheKeyType;
}

public void setCacheKeyType(CacheKeyType cacheKeyType) {
this.cacheKeyType = cacheKeyType;
}

/** FullTableName */
@lombok.Data
@lombok.AllArgsConstructor
Expand Down Expand Up @@ -433,4 +443,12 @@ public void addScanPartition(Long partitionId) {
this.scanPartitions.add(partitionId);
}
}

/** CacheKeyType */
public enum CacheKeyType {
// use `userIdentity`:`sql`.trim() as Cache key in FE
SQL,
// use MD5 as Cache key in FE
MD5
}
}
14 changes: 14 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.apache.doris.mysql.MysqlPacket;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.mysql.MysqlServerStatusFlag;
import org.apache.doris.nereids.SqlCacheContext;
import org.apache.doris.nereids.SqlCacheContext.CacheKeyType;
import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.exceptions.ParseException;
Expand Down Expand Up @@ -262,9 +264,15 @@ public void executeQuery(MysqlCommand mysqlCommand, String originStmt) throws Ex
// TODO: after implemented full prepared, we could remove this flag
boolean nereidsUseServerPrep = sessionVariable.enableServeSidePreparedStatement
|| mysqlCommand == MysqlCommand.COM_QUERY;
CacheKeyType cacheKeyType = null;
if (nereidsUseServerPrep && sessionVariable.isEnableNereidsPlanner()) {
if (wantToParseSqlFromSqlCache) {
cachedStmts = parseFromSqlCache(originStmt);
Optional<SqlCacheContext> sqlCacheContext = ConnectContext.get()
.getStatementContext().getSqlCacheContext();
if (sqlCacheContext.isPresent()) {
cacheKeyType = sqlCacheContext.get().getCacheKeyType();
}
if (cachedStmts != null) {
stmts = cachedStmts;
}
Expand Down Expand Up @@ -367,6 +375,12 @@ public void executeQuery(MysqlCommand mysqlCommand, String originStmt) throws Ex
executor.getProfile().getSummaryProfile().setParseSqlFinishTime(parseSqlFinishTime);
ctx.setExecutor(executor);

if (cacheKeyType != null) {
SqlCacheContext sqlCacheContext =
executor.getContext().getStatementContext().getSqlCacheContext().get();
sqlCacheContext.setCacheKeyType(cacheKeyType);
}

try {
executor.execute();
if (connectType.equals(ConnectType.MYSQL)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,26 @@ suite("parse_sql_from_sql_cache") {
assertHasCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1"
def result1 = sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1"
assertTrue(result1.size() == 1 && result1[0][0].toString().toInteger() == 10)


sql "set @custom_variable2=1"
assertNoCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
def res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
assertTrue(res[0][0] == 1)
assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"

sql "set @custom_variable2=2"
assertNoCache "select* from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
// should not invalidate cache with @custom_variable2=1
res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
assertTrue(res[0][0] == 2)
assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"

sql "set @custom_variable2=1"
// should reuse cache
assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1"
assertTrue(res[0][0] == 1)
}
}),
extraThread("test_udf", {
Expand Down
Loading