Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support query table function in MongoDB #14535

Merged
merged 1 commit into from
Oct 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions docs/src/main/sphinx/connector/mongodb.rst
Original file line number Diff line number Diff line change
Expand Up @@ -469,3 +469,33 @@ ALTER TABLE
The connector supports ``ALTER TABLE RENAME TO``, ``ALTER TABLE ADD COLUMN``
and ``ALTER TABLE DROP COLUMN`` operations.
Other uses of ``ALTER TABLE`` are not supported.

Table functions
---------------

The connector provides specific :doc:`table functions </functions/table>` to
access MongoDB.

.. _mongodb-query-function:

``query(database, collection, filter) -> table``
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

The ``query`` function allows you to query the underlying MongoDB directly. It
requires syntax native to MongoDB, because the full query is pushed down and
processed by MongoDB. This can be useful for accessing native features which are
not available in Trino or for improving query performance in situations where
running a query natively may be faster.

For example, get all rows where ``regionkey`` field is 0::

SELECT
*
FROM
TABLE(
mongodb.system.query(
database => 'tpch',
collection => 'region',
filter => '{ regionkey: 0 }'
)
);
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import io.trino.plugin.mongodb.ptf.Query;
import io.trino.spi.ptf.ConnectorTableFunction;
import io.trino.spi.type.TypeManager;

import javax.inject.Singleton;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand All @@ -40,6 +43,7 @@ public void configure(Binder binder)
binder.bind(MongoPageSinkProvider.class).in(Scopes.SINGLETON);

configBinder(binder).bindConfig(MongoClientConfig.class);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON);
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@
*/
package io.trino.plugin.mongodb;

import com.google.common.collect.ImmutableSet;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSinkProvider;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.ptf.ConnectorTableFunction;
import io.trino.spi.transaction.IsolationLevel;

import javax.inject.Inject;

import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -39,6 +42,7 @@ public class MongoConnector
private final MongoSplitManager splitManager;
private final MongoPageSourceProvider pageSourceProvider;
private final MongoPageSinkProvider pageSinkProvider;
private final Set<ConnectorTableFunction> connectorTableFunctions;

private final ConcurrentMap<ConnectorTransactionHandle, MongoMetadata> transactions = new ConcurrentHashMap<>();

Expand All @@ -47,12 +51,14 @@ public MongoConnector(
MongoSession mongoSession,
MongoSplitManager splitManager,
MongoPageSourceProvider pageSourceProvider,
MongoPageSinkProvider pageSinkProvider)
MongoPageSinkProvider pageSinkProvider,
Set<ConnectorTableFunction> connectorTableFunctions)
{
this.mongoSession = mongoSession;
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.pageSinkProvider = requireNonNull(pageSinkProvider, "pageSinkProvider is null");
this.connectorTableFunctions = ImmutableSet.copyOf(requireNonNull(connectorTableFunctions, "connectorTableFunctions is null"));
}

@Override
Expand Down Expand Up @@ -104,6 +110,12 @@ public ConnectorPageSinkProvider getPageSinkProvider()
return pageSinkProvider;
}

@Override
public Set<ConnectorTableFunction> getTableFunctions()
{
return connectorTableFunctions;
}

@Override
public void shutdown()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.plugin.mongodb.MongoIndex.MongodbIndexKey;
import io.trino.plugin.mongodb.ptf.Query.QueryFunctionHandle;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ColumnSchema;
import io.trino.spi.connector.ConnectorInsertTableHandle;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorOutputMetadata;
Expand All @@ -30,6 +32,7 @@
import io.trino.spi.connector.ConnectorTableLayout;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTableProperties;
import io.trino.spi.connector.ConnectorTableSchema;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.LimitApplicationResult;
Expand All @@ -39,9 +42,11 @@
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SchemaTablePrefix;
import io.trino.spi.connector.SortingProperty;
import io.trino.spi.connector.TableFunctionApplicationResult;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.ptf.ConnectorTableFunctionHandle;
import io.trino.spi.security.TrinoPrincipal;
import io.trino.spi.statistics.ComputedStatistics;
import io.trino.spi.type.Type;
Expand Down Expand Up @@ -331,7 +336,7 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
}

return Optional.of(new LimitApplicationResult<>(
new MongoTableHandle(handle.getSchemaTableName(), handle.getConstraint(), OptionalInt.of(toIntExact(limit))),
new MongoTableHandle(handle.getSchemaTableName(), handle.getFilter(), handle.getConstraint(), OptionalInt.of(toIntExact(limit))),
true,
false));
}
Expand Down Expand Up @@ -362,12 +367,32 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C

handle = new MongoTableHandle(
handle.getSchemaTableName(),
handle.getFilter(),
newDomain,
handle.getLimit());

return Optional.of(new ConstraintApplicationResult<>(handle, constraint.getSummary(), false));
}

@Override
public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
if (!(handle instanceof QueryFunctionHandle)) {
return Optional.empty();
}

ConnectorTableHandle tableHandle = ((QueryFunctionHandle) handle).getTableHandle();
ConnectorTableSchema tableSchema = getTableSchema(session, tableHandle);
Map<String, ColumnHandle> columnHandlesByName = getColumnHandles(session, tableHandle);
List<ColumnHandle> columnHandles = tableSchema.getColumns().stream()
.filter(column -> !column.isHidden())
.map(ColumnSchema::getName)
.map(columnHandlesByName::get)
.collect(toImmutableList());

return Optional.of(new TableFunctionApplicationResult<>(tableHandle, columnHandles));
}

private void setRollback(Runnable action)
{
checkState(rollbackAction.compareAndSet(null, action), "rollback action is already set");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public class MongoSession
private static final String FIELDS_TYPE_KEY = "type";
private static final String FIELDS_HIDDEN_KEY = "hidden";

private static final String AND_OP = "$and";
private static final String OR_OP = "$or";

private static final String EQ_OP = "$eq";
Expand Down Expand Up @@ -347,7 +348,7 @@ private MongoTable loadTableSchema(SchemaTableName schemaTableName)
columnHandles.add(columnHandle);
}

MongoTableHandle tableHandle = new MongoTableHandle(schemaTableName);
MongoTableHandle tableHandle = new MongoTableHandle(schemaTableName, Optional.empty());
return new MongoTable(tableHandle, columnHandles.build(), getIndexes(schemaName, tableName), getComment(tableMeta));
}

Expand Down Expand Up @@ -405,10 +406,10 @@ public MongoCursor<Document> execute(MongoTableHandle tableHandle, List<MongoCol
output.append(column.getName(), 1);
}
MongoCollection<Document> collection = getCollection(tableHandle.getSchemaTableName());
Document query = buildQuery(tableHandle.getConstraint());
FindIterable<Document> iterable = collection.find(query).projection(output);
Document filter = buildFilter(tableHandle);
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
FindIterable<Document> iterable = collection.find(filter).projection(output);
tableHandle.getLimit().ifPresent(iterable::limit);
log.debug("Find documents: collection: %s, filter: %s, projection: %s", tableHandle.getSchemaTableName(), query.toJson(), output.toJson());
log.debug("Find documents: collection: %s, filter: %s, projection: %s", tableHandle.getSchemaTableName(), filter.toJson(), output);

if (cursorBatchSize != 0) {
iterable.batchSize(cursorBatchSize);
Expand All @@ -417,6 +418,15 @@ public MongoCursor<Document> execute(MongoTableHandle tableHandle, List<MongoCol
return iterable.iterator();
}

static Document buildFilter(MongoTableHandle table)
{
// Use $and operator because Document.putAll method overwrites existing entries where the key already exists
ImmutableList.Builder<Document> filter = ImmutableList.builder();
table.getFilter().ifPresent(filter::add);
filter.add(buildQuery(table.getConstraint()));
return andPredicate(filter.build());
}

@VisibleForTesting
static Document buildQuery(TupleDomain<ColumnHandle> tupleDomain)
{
Expand Down Expand Up @@ -565,6 +575,15 @@ private static Document orPredicate(List<Document> values)
return new Document(OR_OP, values);
}

private static Document andPredicate(List<Document> values)
{
checkState(!values.isEmpty());
if (values.size() == 1) {
return values.get(0);
}
return new Document(AND_OP, values);
Praveen2112 marked this conversation as resolved.
Show resolved Hide resolved
}

private static Document isNullPredicate()
{
return documentOf(EQ_OP, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;
import org.bson.Document;

import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;

import static java.util.Objects.requireNonNull;
Expand All @@ -30,20 +32,23 @@ public class MongoTableHandle
{
private final SchemaTableName schemaTableName;
private final TupleDomain<ColumnHandle> constraint;
private final Optional<Document> filter;
private final OptionalInt limit;

public MongoTableHandle(SchemaTableName schemaTableName)
public MongoTableHandle(SchemaTableName schemaTableName, Optional<Document> filter)
{
this(schemaTableName, TupleDomain.all(), OptionalInt.empty());
this(schemaTableName, filter, TupleDomain.all(), OptionalInt.empty());
}

@JsonCreator
public MongoTableHandle(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("filter") Optional<Document> filter,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint,
@JsonProperty("limit") OptionalInt limit)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.filter = requireNonNull(filter, "filter is null");
this.constraint = requireNonNull(constraint, "constraint is null");
this.limit = requireNonNull(limit, "limit is null");
}
Expand All @@ -54,6 +59,12 @@ public SchemaTableName getSchemaTableName()
return schemaTableName;
}

@JsonProperty
public Optional<Document> getFilter()
{
return filter;
}

@JsonProperty
public TupleDomain<ColumnHandle> getConstraint()
{
Expand All @@ -69,7 +80,7 @@ public OptionalInt getLimit()
@Override
public int hashCode()
{
return Objects.hash(schemaTableName, constraint, limit);
return Objects.hash(schemaTableName, filter, constraint, limit);
}

@Override
Expand All @@ -83,6 +94,7 @@ public boolean equals(Object obj)
}
MongoTableHandle other = (MongoTableHandle) obj;
return Objects.equals(this.schemaTableName, other.schemaTableName) &&
Objects.equals(this.filter, other.filter) &&
Objects.equals(this.constraint, other.constraint) &&
Objects.equals(this.limit, other.limit);
}
Expand Down
Loading