Skip to content

Commit

Permalink
[feature](tvf) support query table value function (#34516) (#34640)
Browse files Browse the repository at this point in the history
This PR supports a Table Value Function called `Query`. He can push a query directly to the catalog source for execution by specifying `catalog` and `query` without parsing by Doris. Doris only receives the results returned by the query.
Currently only JDBC Catalog is supported.

Example:

```
Doris > desc function query('catalog' = 'mysql','query' = 'select count(*) as cnt from test.test');           
+-------+--------+------+------+---------+-------+
| Field | Type   | Null | Key  | Default | Extra |
+-------+--------+------+------+---------+-------+
| cnt   | BIGINT | Yes  | true | NULL    | NONE  |
+-------+--------+------+------+---------+-------+

Doris > select * from query('catalog' = 'mysql','query' = 'select count(*) as cnt from test.test');           
+----------+
| cnt      |
+----------+
| 30000000 |
+----------+
```
  • Loading branch information
zy-kkk authored May 10, 2024
1 parent 60e5583 commit 5a31074
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
import org.apache.doris.nereids.trees.expressions.functions.table.Query;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;

Expand All @@ -55,7 +56,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
tableValued(S3.class, "s3"),
tableValued(MvInfos.class, "mv_infos"),
tableValued(Jobs.class, "jobs"),
tableValued(Tasks.class, "tasks")
tableValued(Tasks.class, "tasks"),
tableValued(Query.class, "query")
);

public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.datasource.jdbc;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.JdbcTable;
Expand Down Expand Up @@ -288,6 +289,34 @@ public void executeStmt(String stmt) {
jdbcClient.executeStmt(stmt);
}

/**
* Get columns from query
*
* @param query, the query string
* @return the columns
*/
public List<Column> getColumnsFromQuery(String query) {
makeSureInitialized();
return jdbcClient.getColumnsFromQuery(query);
}

public void configureJdbcTable(JdbcTable jdbcTable, String tableName) {
jdbcTable.setCatalogId(this.getId());
jdbcTable.setExternalTableName(tableName);
jdbcTable.setJdbcTypeName(this.getDatabaseTypeName());
jdbcTable.setJdbcUrl(this.getJdbcUrl());
jdbcTable.setJdbcUser(this.getJdbcUser());
jdbcTable.setJdbcPasswd(this.getJdbcPasswd());
jdbcTable.setDriverClass(this.getDriverClass());
jdbcTable.setDriverUrl(this.getDriverUrl());
jdbcTable.setResourceName(this.getResource());
jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime());
jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive());
}

private void testJdbcConnection(boolean isReplay) throws DdlException {
if (FeConstants.runningUnitTest) {
// skip test connection in unit test
Expand Down Expand Up @@ -352,19 +381,11 @@ private void testBeToJdbcConnection() throws DdlException {
private JdbcTable getTestConnectionJdbcTable() throws DdlException {
JdbcTable jdbcTable = new JdbcTable(0, "test_jdbc_connection", Lists.newArrayList(),
TableType.JDBC_EXTERNAL_TABLE);
jdbcTable.setCatalogId(this.getId());
jdbcTable.setJdbcTypeName(this.getDatabaseTypeName());
jdbcTable.setJdbcUrl(this.getJdbcUrl());
jdbcTable.setJdbcUser(this.getJdbcUser());
jdbcTable.setJdbcPasswd(this.getJdbcPasswd());
jdbcTable.setDriverClass(this.getDriverClass());
jdbcTable.setDriverUrl(this.getDriverUrl());
this.configureJdbcTable(jdbcTable, "test_jdbc_connection");

// Special checksum computation
jdbcTable.setCheckSum(JdbcResource.computeObjectChecksum(this.getDriverUrl()));
jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime());
jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive());

return jdbcTable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.Optional;

/**
* Elasticsearch external table.
* Jdbc external table.
*/
public class JdbcExternalTable extends ExternalTable {
private static final Logger LOG = LogManager.getLogger(JdbcExternalTable.class);
Expand Down Expand Up @@ -83,27 +83,13 @@ private JdbcTable toJdbcTable() {
JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
String fullDbName = this.dbName + "." + this.name;
JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE);
jdbcTable.setCatalogId(jdbcCatalog.getId());
jdbcTable.setExternalTableName(fullDbName);
jdbcTable.setRemoteDatabaseName(
((JdbcExternalCatalog) catalog).getJdbcClient().getRemoteDatabaseName(this.dbName));
jdbcTable.setRemoteTableName(
((JdbcExternalCatalog) catalog).getJdbcClient().getRemoteTableName(this.dbName, this.name));
jdbcTable.setRemoteColumnNames(((JdbcExternalCatalog) catalog).getJdbcClient().getRemoteColumnNames(this.dbName,
this.name));
jdbcTable.setJdbcTypeName(jdbcCatalog.getDatabaseTypeName());
jdbcTable.setJdbcUrl(jdbcCatalog.getJdbcUrl());
jdbcTable.setJdbcUser(jdbcCatalog.getJdbcUser());
jdbcTable.setJdbcPasswd(jdbcCatalog.getJdbcPasswd());
jdbcTable.setDriverClass(jdbcCatalog.getDriverClass());
jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl());
jdbcTable.setResourceName(jdbcCatalog.getResource());
jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
jdbcTable.setConnectionPoolMinSize(jdbcCatalog.getConnectionPoolMinSize());
jdbcTable.setConnectionPoolMaxSize(jdbcCatalog.getConnectionPoolMaxSize());
jdbcTable.setConnectionPoolMaxLifeTime(jdbcCatalog.getConnectionPoolMaxLifeTime());
jdbcTable.setConnectionPoolMaxWaitTime(jdbcCatalog.getConnectionPoolMaxWaitTime());
jdbcTable.setConnectionPoolKeepAlive(jdbcCatalog.isConnectionPoolKeepAlive());
jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName);

// Set remote properties
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName));
jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName, this.name));
jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName, this.name));

return jdbcTable;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collections;
Expand Down Expand Up @@ -213,6 +215,58 @@ public void executeStmt(String origStmt) {
}
}

/**
* Execute query via jdbc
*
* @param query, the query string
* @return List<Column>
*/
public List<Column> getColumnsFromQuery(String query) {
Connection conn = getConnection();
List<Column> columns = Lists.newArrayList();
try {
PreparedStatement pstmt = conn.prepareStatement(query);
ResultSetMetaData metaData = pstmt.getMetaData();
if (metaData == null) {
throw new JdbcClientException("Query not supported: Failed to get ResultSetMetaData from query: %s",
query);
} else {
List<JdbcFieldSchema> schemas = getSchemaFromResultSetMetaData(metaData);
for (JdbcFieldSchema schema : schemas) {
columns.add(new Column(schema.getColumnName(), jdbcTypeToDoris(schema), true, null, true, null,
true, -1));
}
}
} catch (SQLException e) {
throw new JdbcClientException("Failed to get columns from query: %s", e, query);
} finally {
close(conn);
}
return columns;
}


/**
* Get schema from ResultSetMetaData
*
* @param metaData, the ResultSetMetaData
* @return List<JdbcFieldSchema>
*/
public List<JdbcFieldSchema> getSchemaFromResultSetMetaData(ResultSetMetaData metaData) throws SQLException {
List<JdbcFieldSchema> schemas = Lists.newArrayList();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
JdbcFieldSchema field = new JdbcFieldSchema();
field.setColumnName(metaData.getColumnName(i));
field.setDataType(metaData.getColumnType(i));
field.setDataTypeName(metaData.getColumnTypeName(i));
field.setColumnSize(metaData.getColumnDisplaySize(i));
field.setDecimalDigits(metaData.getScale(i));
field.setNumPrecRadix(metaData.getPrecision(i));
schemas.add(field);
}
return schemas;
}

// This part used to process meta-information of database, table and column.

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class JdbcScanNode extends ExternalScanNode {
private String tableName;
private TOdbcTableType jdbcType;
private String graphQueryString = "";
private boolean isTableValuedFunction = false;
private String query = "";

private JdbcTable tbl;

Expand All @@ -84,6 +86,15 @@ public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean isJdbcExternalT
tableName = tbl.getProperRemoteFullTableName(jdbcType);
}

public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean isTableValuedFunction, String query) {
super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE, false);
this.isTableValuedFunction = isTableValuedFunction;
this.query = query;
tbl = (JdbcTable) desc.getTable();
jdbcType = tbl.getJdbcTableType();
tableName = tbl.getExternalTableName();
}

@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
Expand Down Expand Up @@ -232,14 +243,19 @@ private String getJdbcQueryStr() {
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
output.append(prefix).append("TABLE: ").append(tableName).append("\n");
if (detailLevel == TExplainLevel.BRIEF) {
return output.toString();
}
output.append(prefix).append("QUERY: ").append(getJdbcQueryStr()).append("\n");
if (!conjuncts.isEmpty()) {
Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n");
if (isTableValuedFunction) {
output.append(prefix).append("TABLE VALUE FUNCTION\n");
output.append(prefix).append("QUERY: ").append(query).append("\n");
} else {
output.append(prefix).append("TABLE: ").append(tableName).append("\n");
if (detailLevel == TExplainLevel.BRIEF) {
return output.toString();
}
output.append(prefix).append("QUERY: ").append(getJdbcQueryStr()).append("\n");
if (!conjuncts.isEmpty()) {
Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n");
}
}
return output.toString();
}
Expand Down Expand Up @@ -286,7 +302,11 @@ protected void toThrift(TPlanNode msg) {
msg.jdbc_scan_node = new TJdbcScanNode();
msg.jdbc_scan_node.setTupleId(desc.getId().asInt());
msg.jdbc_scan_node.setTableName(tableName);
msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
if (isTableValuedFunction) {
msg.jdbc_scan_node.setQueryString(query);
} else {
msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
}
msg.jdbc_scan_node.setTableType(jdbcType);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.trees.expressions.functions.table;

import org.apache.doris.catalog.FunctionSignature;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
import org.apache.doris.nereids.types.coercion.AnyDataType;
import org.apache.doris.tablefunction.QueryTableValueFunction;
import org.apache.doris.tablefunction.TableValuedFunctionIf;

import java.util.Map;

/** query */
public class Query extends TableValuedFunction {
public Query(Properties properties) {
super("query", properties);
}

@Override
public FunctionSignature customSignature() {
return FunctionSignature.of(AnyDataType.INSTANCE_WITHOUT_INDEX, getArgumentsTypes());
}

@Override
protected TableValuedFunctionIf toCatalogFunction() {
try {
Map<String, String> arguments = getTVFProperties().getMap();
return QueryTableValueFunction.createQueryTableValueFunction(arguments);
} catch (Throwable t) {
throw new AnalysisException("Can not build QueryTableValuedFunction by "
+ this + ": " + t.getMessage(), t);
}
}

@Override
public <R, C> R accept(ExpressionVisitor<R, C> visitor, C context) {
return visitor.visitQuery(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.nereids.trees.expressions.functions.table.Local;
import org.apache.doris.nereids.trees.expressions.functions.table.MvInfos;
import org.apache.doris.nereids.trees.expressions.functions.table.Numbers;
import org.apache.doris.nereids.trees.expressions.functions.table.Query;
import org.apache.doris.nereids.trees.expressions.functions.table.S3;
import org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.expressions.functions.table.Tasks;
Expand Down Expand Up @@ -92,4 +93,8 @@ default R visitNumbers(Numbers numbers, C context) {
default R visitS3(S3 s3, C context) {
return visitTableValuedFunction(s3, context);
}

default R visitQuery(Query query, C context) {
return visitTableValuedFunction(query, context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.tablefunction;

import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;

public class JdbcQueryTableValueFunction extends QueryTableValueFunction {
public static final Logger LOG = LogManager.getLogger(JdbcQueryTableValueFunction.class);

public JdbcQueryTableValueFunction(Map<String, String> params) throws AnalysisException {
super(params);
}

@Override
public List<Column> getTableColumns() throws AnalysisException {
JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf;
return catalog.getColumnsFromQuery(query);
}

@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc) {
JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf;
JdbcTable jdbcTable = new JdbcTable(1, desc.getTable().getName(), desc.getTable().getFullSchema(),
TableType.JDBC);
catalog.configureJdbcTable(jdbcTable, desc.getTable().getName());
desc.setTable(jdbcTable);
return new JdbcScanNode(id, desc, true, query);
}
}
Loading

0 comments on commit 5a31074

Please sign in to comment.