Skip to content

Commit

Permalink
Revert "ARROW-17785: [Java] Suppress flakiness from gRPC in JDBC driv…
Browse files Browse the repository at this point in the history
…er tests (#14210)"

This reverts commit dfdd0ce.
  • Loading branch information
kou committed Sep 25, 2022
1 parent 45f2aaf commit bb39059
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 36 deletions.
6 changes: 6 additions & 0 deletions java/flight/flight-sql-jdbc-driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>me.alexpanov</groupId>
<artifactId>free-port-finder</artifactId>
<version>1.1.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>commons-io</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightRuntimeException;
import org.apache.arrow.flight.FlightStatusCode;
import org.apache.arrow.flight.FlightStream;
import org.apache.arrow.flight.Location;
import org.apache.arrow.flight.auth2.BearerCredentialWriter;
Expand All @@ -50,14 +49,12 @@
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.calcite.avatica.Meta.StatementType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link FlightSqlClient} handler.
*/
public final class ArrowFlightSqlClientHandler implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(ArrowFlightSqlClientHandler.class);

private final FlightSqlClient sqlClient;
private final Set<CallOption> options = new HashSet<>();

Expand Down Expand Up @@ -192,18 +189,7 @@ public Schema getDataSetSchema() {

@Override
public void close() {
try {
preparedStatement.close(getOptions());
} catch (FlightRuntimeException fre) {
// ARROW-17785: suppress exceptions caused by flaky gRPC layer
if (fre.status().code().equals(FlightStatusCode.UNAVAILABLE) ||
(fre.status().code().equals(FlightStatusCode.INTERNAL) &&
fre.getMessage().contains("Connection closed after GOAWAY"))) {
LOGGER.warn("Supressed error closing PreparedStatement", fre);
return;
}
throw fre;
}
preparedStatement.close(getOptions());
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ public class ArrowFlightJdbcConnectionPoolDataSourceTest {
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.host("localhost")
.randomPort()
.authentication(authentication)
.producer(PRODUCER)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ public class ArrowFlightJdbcDriverTest {
new UserPasswordAuthentication.Builder().user("user1", "pass1").user("user2", "pass2")
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.authentication(authentication)
.producer(PRODUCER)
.build();
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder().host("localhost").randomPort()
.authentication(authentication).producer(PRODUCER).build();
}

private BufferAllocator allocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ public class ArrowFlightJdbcFactoryTest {
new UserPasswordAuthentication.Builder().user("user1", "pass1").user("user2", "pass2")
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.authentication(authentication)
.producer(PRODUCER)
.build();
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder().host("localhost").randomPort()
.authentication(authentication).producer(PRODUCER).build();
}

private BufferAllocator allocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ public class ConnectionTest {
.user(userTest, passTest)
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.authentication(authentication)
.producer(PRODUCER)
.build();
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder().host("localhost").randomPort()
.authentication(authentication).producer(PRODUCER).build();
}

private BufferAllocator allocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public class ConnectionTlsTest {
.build();

FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.host("localhost")
.randomPort()
.authentication(authentication)
.useEncryption(certKey.cert, certKey.key)
.producer(PRODUCER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import me.alexpanov.net.FreePortFinder;

/**
* Utility class for unit tests that need to instantiate a {@link FlightServer}
* and interact with it.
Expand Down Expand Up @@ -93,6 +95,8 @@ public static FlightServerTestRule createStandardTestRule(final FlightSqlProduce
.build();

return new Builder()
.host("localhost")
.randomPort()
.authentication(authentication)
.producer(producer)
.build();
Expand All @@ -102,6 +106,11 @@ ArrowFlightJdbcDataSource createDataSource() {
return ArrowFlightJdbcDataSource.createNewDataSource(properties);
}

ArrowFlightJdbcDataSource createDataSource(String token) {
properties.put("token", token);
return ArrowFlightJdbcDataSource.createNewDataSource(properties);
}

public ArrowFlightJdbcConnectionPoolDataSource createConnectionPoolDataSource() {
return ArrowFlightJdbcConnectionPoolDataSource.createNewDataSource(properties);
}
Expand Down Expand Up @@ -150,8 +159,9 @@ public Statement apply(Statement base, Description description) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
try (FlightServer flightServer = getStartServer(location -> initiateServer(location), 3)) {
properties.put("port", flightServer.getPort());
try (FlightServer flightServer =
getStartServer(location ->
initiateServer(location), 3)) {
LOGGER.info("Started " + FlightServer.class.getName() + " as " + flightServer);
base.evaluate();
} finally {
Expand All @@ -164,9 +174,12 @@ public void evaluate() throws Throwable {
private FlightServer getStartServer(CheckedFunction<Location, FlightServer> newServerFromLocation,
int retries)
throws IOException {

final Deque<ReflectiveOperationException> exceptions = new ArrayDeque<>();

for (; retries > 0; retries--) {
final FlightServer server = newServerFromLocation.apply(Location.forGrpcInsecure("localhost", 0));
final Location location = Location.forGrpcInsecure(config.getHost(), config.getPort());
final FlightServer server = newServerFromLocation.apply(location);
try {
Method start = server.getClass().getMethod("start");
start.setAccessible(true);
Expand All @@ -176,7 +189,9 @@ private FlightServer getStartServer(CheckedFunction<Location, FlightServer> newS
exceptions.add(e);
}
}
exceptions.forEach(e -> LOGGER.error("Failed to start FlightServer", e));

exceptions.forEach(
e -> LOGGER.error("Failed to start a new " + FlightServer.class.getName() + ".", e));
throw new IOException(exceptions.pop().getCause());
}

Expand Down Expand Up @@ -208,14 +223,44 @@ public void close() throws Exception {
* Builder for {@link FlightServerTestRule}.
*/
public static final class Builder {
private final Properties properties;
private final Properties properties = new Properties();
private FlightSqlProducer producer;
private Authentication authentication;
private CertKeyPair certKeyPair;

public Builder() {
this.properties = new Properties();
this.properties.put("host", "localhost");
/**
* Sets the host for the server rule.
*
* @param host the host value.
* @return the Builder.
*/
public Builder host(final String host) {
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.HOST.camelName(),
host);
return this;
}

/**
* Sets a random port to be used by the server rule.
*
* @return the Builder.
*/
public Builder randomPort() {
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT.camelName(),
FreePortFinder.findFreeLocalPort());
return this;
}

/**
* Sets a specific port to be used by the server rule.
*
* @param port the port value.
* @return the Builder.
*/
public Builder port(final int port) {
properties.put(ArrowFlightConnectionConfigImpl.ArrowFlightConnectionProperty.PORT.camelName(),
port);
return this;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public void testShouldRunSelectQuerySettingLargeMaxRowLimit() throws Exception {
try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(
CoreMockedSqlProducers.LEGACY_REGULAR_SQL_CMD)) {

final long maxRowsLimit = 3;
statement.setLargeMaxRows(maxRowsLimit);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class TokenAuthenticationTest {

static {
FLIGHT_SERVER_TEST_RULE = new FlightServerTestRule.Builder()
.host("localhost")
.randomPort()
.authentication(new TokenAuthentication.Builder()
.token("1234")
.build())
Expand Down

0 comments on commit bb39059

Please sign in to comment.