Skip to content

Commit

Permalink
GH-34532: [Java][FlightSQL] Change JDBC to handle multi-endpoints (#3…
Browse files Browse the repository at this point in the history
…8521)

### Rationale for this change
The Flight SQL JDBC Driver currently doesn't fetch at multiple endpoints correctly when the data is not at the same location as the original connection.

### What changes are included in this PR?
- Create new clients to connect to new locations in endpoints.
- If no location is reported using the current connection.
- Make ArrowFlightSqlClientHandler's builder's build() function to be idempodent.
- Add functionality to clone ArrowFlightSqClientHandler's builder so that it can be used for temporary connections to locations returned by getFlightInfo().
- Add utility classes in unit tests for constructing a distributed Flight SQL Server

### Are these changes tested?
Yes.

### Are there any user-facing changes?
The behavior for when there are reported endpoints from getFlightInfo is now fixed. However if users relied on the previous behavior of just getting streams from the same node, and their server only ever reported the original node, they may observe more Flight client connections opening and closing than before (since new connections get spawned for each partition that has at least one Location now).

* Closes: #34532

Authored-by: James Duong <james.duong@improving.com>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
jduo authored Nov 3, 2023
1 parent cd6e635 commit 2fb7fd9
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.arrow.driver.jdbc;

import static org.apache.arrow.driver.jdbc.utils.FlightStreamQueue.createNewQueue;
import static org.apache.arrow.driver.jdbc.utils.FlightEndpointDataQueue.createNewQueue;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
Expand All @@ -26,7 +26,8 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import org.apache.arrow.driver.jdbc.utils.FlightStreamQueue;
import org.apache.arrow.driver.jdbc.client.CloseableEndpointStreamPair;
import org.apache.arrow.driver.jdbc.utils.FlightEndpointDataQueue;
import org.apache.arrow.driver.jdbc.utils.VectorSchemaRootTransformer;
import org.apache.arrow.flight.FlightInfo;
import org.apache.arrow.flight.FlightStream;
Expand All @@ -47,8 +48,8 @@ public final class ArrowFlightJdbcFlightStreamResultSet
extends ArrowFlightJdbcVectorSchemaRootResultSet {

private final ArrowFlightConnection connection;
private FlightStream currentFlightStream;
private FlightStreamQueue flightStreamQueue;
private CloseableEndpointStreamPair currentEndpointData;
private FlightEndpointDataQueue flightEndpointDataQueue;

private VectorSchemaRootTransformer transformer;
private VectorSchemaRoot currentVectorSchemaRoot;
Expand Down Expand Up @@ -102,20 +103,20 @@ static ArrowFlightJdbcFlightStreamResultSet fromFlightInfo(

resultSet.transformer = transformer;

resultSet.execute(flightInfo);
resultSet.populateData(flightInfo);
return resultSet;
}

private void loadNewQueue() {
Optional.ofNullable(flightStreamQueue).ifPresent(AutoCloseables::closeNoChecked);
flightStreamQueue = createNewQueue(connection.getExecutorService());
Optional.ofNullable(flightEndpointDataQueue).ifPresent(AutoCloseables::closeNoChecked);
flightEndpointDataQueue = createNewQueue(connection.getExecutorService());
}

private void loadNewFlightStream() throws SQLException {
if (currentFlightStream != null) {
AutoCloseables.closeNoChecked(currentFlightStream);
if (currentEndpointData != null) {
AutoCloseables.closeNoChecked(currentEndpointData);
}
this.currentFlightStream = getNextFlightStream(true);
this.currentEndpointData = getNextEndpointStream(true);
}

@Override
Expand All @@ -124,24 +125,24 @@ protected AvaticaResultSet execute() throws SQLException {

if (flightInfo != null) {
schema = flightInfo.getSchemaOptional().orElse(null);
execute(flightInfo);
populateData(flightInfo);
}
return this;
}

private void execute(final FlightInfo flightInfo) throws SQLException {
private void populateData(final FlightInfo flightInfo) throws SQLException {
loadNewQueue();
flightStreamQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
flightEndpointDataQueue.enqueue(connection.getClientHandler().getStreams(flightInfo));
loadNewFlightStream();

// Ownership of the root will be passed onto the cursor.
if (currentFlightStream != null) {
executeForCurrentFlightStream();
if (currentEndpointData != null) {
populateDataForCurrentFlightStream();
}
}

private void executeForCurrentFlightStream() throws SQLException {
final VectorSchemaRoot originalRoot = currentFlightStream.getRoot();
private void populateDataForCurrentFlightStream() throws SQLException {
final VectorSchemaRoot originalRoot = currentEndpointData.getStream().getRoot();

if (transformer != null) {
try {
Expand All @@ -154,9 +155,9 @@ private void executeForCurrentFlightStream() throws SQLException {
}

if (schema != null) {
execute(currentVectorSchemaRoot, schema);
populateData(currentVectorSchemaRoot, schema);
} else {
execute(currentVectorSchemaRoot);
populateData(currentVectorSchemaRoot);
}
}

Expand All @@ -179,20 +180,20 @@ public boolean next() throws SQLException {
return true;
}

if (currentFlightStream != null) {
currentFlightStream.getRoot().clear();
if (currentFlightStream.next()) {
executeForCurrentFlightStream();
if (currentEndpointData != null) {
currentEndpointData.getStream().getRoot().clear();
if (currentEndpointData.getStream().next()) {
populateDataForCurrentFlightStream();
continue;
}

flightStreamQueue.enqueue(currentFlightStream);
flightEndpointDataQueue.enqueue(currentEndpointData);
}

currentFlightStream = getNextFlightStream(false);
currentEndpointData = getNextEndpointStream(false);

if (currentFlightStream != null) {
executeForCurrentFlightStream();
if (currentEndpointData != null) {
populateDataForCurrentFlightStream();
continue;
}

Expand All @@ -207,14 +208,14 @@ public boolean next() throws SQLException {
@Override
protected void cancel() {
super.cancel();
final FlightStream currentFlightStream = this.currentFlightStream;
if (currentFlightStream != null) {
currentFlightStream.cancel("Cancel", null);
final CloseableEndpointStreamPair currentEndpoint = this.currentEndpointData;
if (currentEndpoint != null) {
currentEndpoint.getStream().cancel("Cancel", null);
}

if (flightStreamQueue != null) {
if (flightEndpointDataQueue != null) {
try {
flightStreamQueue.close();
flightEndpointDataQueue.close();
} catch (final Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -224,27 +225,28 @@ protected void cancel() {
@Override
public synchronized void close() {
try {
if (flightStreamQueue != null) {
if (flightEndpointDataQueue != null) {
// flightStreamQueue should close currentFlightStream internally
flightStreamQueue.close();
} else if (currentFlightStream != null) {
flightEndpointDataQueue.close();
} else if (currentEndpointData != null) {
// close is only called for currentFlightStream if there's no queue
currentFlightStream.close();
currentEndpointData.close();
}

} catch (final Exception e) {
throw new RuntimeException(e);
} finally {
super.close();
}
}

private FlightStream getNextFlightStream(final boolean isExecution) throws SQLException {
if (isExecution) {
private CloseableEndpointStreamPair getNextEndpointStream(final boolean canTimeout) throws SQLException {
if (canTimeout) {
final int statementTimeout = statement != null ? statement.getQueryTimeout() : 0;
return statementTimeout != 0 ?
flightStreamQueue.next(statementTimeout, TimeUnit.SECONDS) : flightStreamQueue.next();
flightEndpointDataQueue.next(statementTimeout, TimeUnit.SECONDS) : flightEndpointDataQueue.next();
} else {
return flightStreamQueue.next();
return flightEndpointDataQueue.next();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static ArrowFlightJdbcVectorSchemaRootResultSet fromVectorSchemaRoot(
new ArrowFlightJdbcVectorSchemaRootResultSet(null, state, signature, resultSetMetaData,
timeZone, null);

resultSet.execute(vectorSchemaRoot);
resultSet.populateData(vectorSchemaRoot);
return resultSet;
}

Expand All @@ -92,7 +92,7 @@ protected AvaticaResultSet execute() throws SQLException {
throw new RuntimeException("Can only execute with execute(VectorSchemaRoot)");
}

void execute(final VectorSchemaRoot vectorSchemaRoot) {
void populateData(final VectorSchemaRoot vectorSchemaRoot) {
final List<Field> fields = vectorSchemaRoot.getSchema().getFields();
final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(fields);
signature.columns.clear();
Expand All @@ -102,7 +102,7 @@ void execute(final VectorSchemaRoot vectorSchemaRoot) {
execute2(new ArrowFlightJdbcCursor(vectorSchemaRoot), this.signature.columns);
}

void execute(final VectorSchemaRoot vectorSchemaRoot, final Schema schema) {
void populateData(final VectorSchemaRoot vectorSchemaRoot, final Schema schema) {
final List<ColumnMetaData> columns = ConvertUtils.convertArrowFieldsToColumnMetaDataList(schema.getFields());
signature.columns.clear();
signature.columns.addAll(columns);
Expand Down
Loading

0 comments on commit 2fb7fd9

Please sign in to comment.