Skip to content

Commit

Permalink
Fix Statement leak in Trino JDBC DatabaseMetaData
Browse files Browse the repository at this point in the history
Before the change, Trino JDBC's `DatabaseMetaData` implementation
(`TrinoDatabaseMetaData`) would create `Statement` objects that are
never closed. Since `Connection` (`TrinoConnection`) tracks open
statements to be able to close them upon `Connection.close()` (per JDBC
requirements), this created a memory leak where `Statement` objects are
leaked in `Connection.statements` collection.

The commit fixing this, under the condition that `ResultSet` returned
from `TrinoDatabaseMetaData` is correctly closed.
  • Loading branch information
findepi committed Jan 18, 2022
1 parent 9fe581c commit 8f27eca
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,12 @@ private void unregisterStatement(TrinoStatement statement)
checkState(statements.remove(statement), "Statement is not registered");
}

@VisibleForTesting
int activeStatements()
{
return statements.size();
}

private void checkOpen()
throws SQLException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.sql.RowIdLifetime;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -1488,7 +1489,24 @@ private ResultSet selectEmpty(String sql)
private ResultSet select(String sql)
throws SQLException
{
return getConnection().createStatement().executeQuery(sql);
Statement statement = getConnection().createStatement();
TrinoResultSet resultSet;
try {
resultSet = (TrinoResultSet) statement.executeQuery(sql);
resultSet.setCloseStatementOnClose();
}
catch (Throwable e) {
try {
statement.close();
}
catch (Throwable closeException) {
if (closeException != e) {
e.addSuppressed(closeException);
}
}
throw e;
}
return resultSet;
}

private static void buildFilters(StringBuilder out, List<String> filters)
Expand Down
45 changes: 38 additions & 7 deletions client/trino-jdbc/src/main/java/io/trino/jdbc/TrinoResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.trino.client.QueryStatusInfo;
import io.trino.client.StatementClient;

import javax.annotation.concurrent.GuardedBy;

import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
Expand All @@ -31,7 +33,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Stream;

Expand All @@ -44,10 +45,14 @@
public class TrinoResultSet
extends AbstractTrinoResultSet
{
private final Statement statement;
private final StatementClient client;
private final String queryId;

private final AtomicBoolean closed = new AtomicBoolean();
@GuardedBy("this")
private boolean closed;
@GuardedBy("this")
private boolean closeStatementOnClose;

static TrinoResultSet create(Statement statement, StatementClient client, long maxRows, Consumer<QueryStats> progressCallback, WarningsManager warningsManager)
throws SQLException
Expand All @@ -65,6 +70,7 @@ private TrinoResultSet(Statement statement, StatementClient client, List<Column>
columns,
new AsyncIterator<>(flatten(new ResultsPageIterator(requireNonNull(client, "client is null"), progressCallback, warningsManager), maxRows), client));

this.statement = statement;
this.client = requireNonNull(client, "client is null");
requireNonNull(progressCallback, "progressCallback is null");

Expand All @@ -81,21 +87,46 @@ public QueryStats getStats()
return QueryStats.create(queryId, client.getStats());
}

void setCloseStatementOnClose()
throws SQLException
{
boolean alreadyClosed;
synchronized (this) {
alreadyClosed = closed;
if (!alreadyClosed) {
closeStatementOnClose = true;
}
}
if (alreadyClosed) {
statement.close();
}
}

@Override
public void close()
throws SQLException
{
if (closed.compareAndSet(false, true)) {
((AsyncIterator<?>) results).cancel();
client.close();
boolean closeStatement;
synchronized (this) {
if (closed) {
return;
}
closed = true;
closeStatement = closeStatementOnClose;
}

((AsyncIterator<?>) results).cancel();
client.close();
if (closeStatement) {
statement.close();
}
}

@Override
public boolean isClosed()
public synchronized boolean isClosed()
throws SQLException
{
return closed.get();
return closed;
}

void partialCancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,33 @@ public void testEscapeIfNecessary()
assertEquals(TrinoDatabaseMetaData.escapeIfNecessary(true, "abc\\_def"), "abc\\\\\\_def");
}

@Test
public void testStatementsDoNotLeak()
throws Exception
{
TrinoConnection connection = (TrinoConnection) this.connection;
DatabaseMetaData metaData = connection.getMetaData();

// consumed
try (ResultSet resultSet = metaData.getCatalogs()) {
assertThat(countRows(resultSet)).isEqualTo(5);
}
try (ResultSet resultSet = metaData.getSchemas(TEST_CATALOG, null)) {
assertThat(countRows(resultSet)).isEqualTo(10);
}
try (ResultSet resultSet = metaData.getTables(TEST_CATALOG, "sf%", null, null)) {
assertThat(countRows(resultSet)).isEqualTo(64);
}

// not consumed
metaData.getCatalogs().close();
metaData.getSchemas(TEST_CATALOG, null).close();
metaData.getTables(TEST_CATALOG, "sf%", null, null).close();

assertThat(connection.activeStatements()).as("activeStatements")
.isEqualTo(0);
}

private static void assertColumnSpec(ResultSet rs, int dataType, Long precision, Long numPrecRadix, String typeName)
throws SQLException
{
Expand Down Expand Up @@ -1585,6 +1612,16 @@ private MetaDataCallback<List<List<Object>>> readMetaData(MetaDataCallback<Resul
};
}

private int countRows(ResultSet resultSet)
throws Exception
{
int rows = 0;
while (resultSet.next()) {
rows++;
}
return rows;
}

private Connection createConnection()
throws SQLException
{
Expand Down

0 comments on commit 8f27eca

Please sign in to comment.