Skip to content

Commit

Permalink
WIP: [FlightRPC][C++][Java] Add fallback URI scheme
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
lidavidm and alamb committed Feb 20, 2024
1 parent a03d957 commit 1ee1d0a
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 17 deletions.
5 changes: 5 additions & 0 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ TEST(FlightTypes, LocationUnknownScheme) {
ASSERT_OK(Location::Parse("https://example.com/foo"));
}

TEST(FlightTypes, LocationFallback) {
EXPECT_EQ("arrow-flight-reuse-connection://?", Location::ReuseConnection().ToString());
EXPECT_EQ("arrow-flight-reuse-connection", Location::ReuseConnection().scheme());
}

TEST(FlightTypes, RoundtripStatus) {
// Make sure status codes round trip through our conversions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {
ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint"));
}

TEST(FlightIntegration, LocationReuseConnection) {
ASSERT_OK(RunScenario("location:reuse_connection"));
}

TEST(FlightIntegration, PollFlightInfo) { ASSERT_OK(RunScenario("poll_flight_info")); }

TEST(FlightIntegration, AppMetadataFlightInfoEndpoint) {
Expand Down
47 changes: 47 additions & 0 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1928,6 +1928,50 @@ class FlightSqlExtensionScenario : public FlightSqlScenario {
return Status::OK();
}
};

/// \brief The server for testing arrow-flight-reuse-connection://.
class ReuseConnectionServer : public FlightServerBase {
public:
Status GetFlightInfo(const ServerCallContext& context,
const FlightDescriptor& descriptor,
std::unique_ptr<FlightInfo>* info) override {
auto location = Location::ReuseConnection();
auto endpoint = FlightEndpoint{{"reuse"}, {location}};
ARROW_ASSIGN_OR_RAISE(auto info_data, FlightInfo::Make(arrow::Schema({}), descriptor,
{endpoint}, -1, -1));
*info = std::make_unique<FlightInfo>(std::move(info_data));
return Status::OK();
}
};

/// \brief A scenario for testing arrow-flight-reuse-connection://.
class ReuseConnectionScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
*server = std::make_unique<ReuseConnectionServer>();
return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }

Status RunClient(std::unique_ptr<FlightClient> client) override {
auto descriptor = FlightDescriptor::Command("reuse");
ARROW_ASSIGN_OR_RAISE(auto info, client->GetFlightInfo(descriptor));
if (info->endpoints().size() != 1) {
return Status::Invalid("Expected 1 endpoint, got ", info->endpoints().size());
}
const auto& endpoint = info->endpoints().front();
if (endpoint.locations.size() != 1) {
return Status::Invalid("Expected 1 location, got ",
info->endpoints().front().locations.size());
} else if (endpoint.locations.front().ToString() !=
"arrow-flight-reuse-connection://?") {
return Status::Invalid("Expected arrow-flight-reuse-connection://?, got ",
endpoint.locations.front().ToString());
}
return Status::OK();
}
};
} // namespace

Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>* out) {
Expand All @@ -1952,6 +1996,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "expiration_time:renew_flight_endpoint") {
*out = std::make_shared<ExpirationTimeRenewFlightEndpointScenario>();
return Status::OK();
} else if (scenario_name == "location:reuse_connection") {
*out = std::make_shared<ReuseConnectionScenario>();
return Status::OK();
} else if (scenario_name == "poll_flight_info") {
*out = std::make_shared<PollFlightInfoScenario>();
return Status::OK();
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,12 @@ arrow::Result<Location> Location::Parse(const std::string& uri_string) {
return location;
}

const Location& Location::ReuseConnection() {
static Location kFallback =
Location::Parse("arrow-flight-reuse-connection://?").ValueOrDie();
return kFallback;
}

arrow::Result<Location> Location::ForGrpcTcp(const std::string& host, const int port) {
std::stringstream uri_string;
uri_string << "grpc+tcp://" << host << ':' << port;
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,14 @@ struct ARROW_FLIGHT_EXPORT Location {
/// \brief Initialize a location by parsing a URI string
static arrow::Result<Location> Parse(const std::string& uri_string);

/// \brief Get the fallback URI.
///
/// arrow-flight-reuse-connection:// means that a client may attempt to
/// reuse an existing connection to a Flight service to fetch data instead
/// of creating a new connection to one of the other locations listed in a
/// FlightEndpoint response.
static const Location& ReuseConnection();

/// \brief Initialize a location for a non-TLS, gRPC-based Flight
/// service from a host and port
/// \param[in] host The hostname to connect to
Expand Down
5 changes: 5 additions & 0 deletions dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,11 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
"RenewFlightEndpoint are working as expected."),
skip_testers={"JS", "C#", "Rust"},
),
Scenario(
"location:reuse_connection",
description="Ensure arrow-flight-reuse-connection is accepted.",
skip_testers={"JS", "C#", "Rust"},
),
Scenario(
"poll_flight_info",
description="Ensure PollFlightInfo is supported.",
Expand Down
70 changes: 53 additions & 17 deletions docs/source/format/Flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ A client that wishes to download the data would:
connection to the original server to fetch data. Otherwise, the
client must connect to one of the indicated locations.

The server may list "itself" as a location alongside other server
locations. Normally this requires the server to know its public
address, but it may also use the special URI string
``arrow-flight-reuse-connection://?`` to tell clients that they may
reuse an existing connection to the same server, without having to
be able to name itself. See `Connection Reuse`_ below.

In this way, the locations inside an endpoint can also be thought
of as performing look-aside load balancing or service discovery
functions. And the endpoints can represent data that is partitioned
Expand Down Expand Up @@ -307,29 +314,58 @@ well, in which case any `authentication method supported by gRPC

.. _Mutual TLS (mTLS): https://grpc.io/docs/guides/auth/#supported-auth-mechanisms

Transport Implementations
=========================
Location URIs
=============

Flight is primarily defined in terms of its Protobuf and gRPC
specification below, but Arrow implementations may also support
alternative transports (see :ref:`status-flight-rpc`). In that case,
implementations should use the following URI schemes for the given
transport implementations:

+----------------------------+----------------------------+
| Transport | URI Scheme |
+============================+============================+
| gRPC (plaintext) | grpc: or grpc+tcp: |
+----------------------------+----------------------------+
| gRPC (TLS) | grpc+tls: |
+----------------------------+----------------------------+
| gRPC (Unix domain socket) | grpc+unix: |
+----------------------------+----------------------------+
| UCX_ (plaintext) | ucx: |
+----------------------------+----------------------------+
alternative transports (see :ref:`status-flight-rpc`). Clients and
servers need to know which transport to use for a given URI in a
Location, so Flight implementations should use the following URI
schemes for the given transports:

+----------------------------+--------------------------------+
| Transport | URI Scheme |
+============================+================================+
| gRPC (plaintext) | grpc: or grpc+tcp: |
+----------------------------+--------------------------------+
| gRPC (TLS) | grpc+tls: |
+----------------------------+--------------------------------+
| gRPC (Unix domain socket) | grpc+unix: |
+----------------------------+--------------------------------+
| (reuse connection) | arrow-flight-reuse-connection: |
+----------------------------+--------------------------------+
| UCX_ (plaintext) | ucx: |
+----------------------------+--------------------------------+

.. _UCX: https://openucx.org/

Connection Reuse
----------------

"Reuse connection" above is not a particular transport. Instead, it
means that the client may try to execute DoGet against the same server
(and through the same connection) that it originally obtained the
FlightInfo from (i.e., that it called GetFlightInfo against). This is
interpreted the same way as when no specific ``Location`` are
returned.

This allows the server to return "itself" as one possible location to
fetch data without having to know its own public address, which can be
useful in deployments where knowing this would be difficult or
impossible. For example, a developer may forward a remote service in
a cloud environment to their local machine; in this case, the remote
service would have no way to know the local hostname and port that it
is being accessed over.

For compatibility reasons, the URI should always be
``arrow-flight-reuse-connection://?``, with the trailing empty query
string. Java's URI implementation does not accept ``scheme:`` or
``scheme://``, and C++'s implementation does not accept an empty
string, so the obvious candidates are not compatible. The chosen
representation can be parsed by both implementations, as well as Go's
``net/url`` and Python's ``urllib.parse``.

Error Handling
==============

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ Flight.Location toProtocol() {
return Flight.Location.newBuilder().setUri(uri.toString()).build();
}

/**
* Construct a special URI to indicate to clients that they may fetch data by reusing
* an existing connection to a Flight RPC server.
*/
public static Location reuseConnection() {
try {
return new Location(new URI(LocationSchemes.REUSE_CONNECTION, "", "", "", null));
} catch (URISyntaxException e) {
// This should never happen.
throw new IllegalArgumentException(e);
}
}

/**
* Construct a URI for a Flight+gRPC server without transport security.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public final class LocationSchemes {
public static final String GRPC_INSECURE = "grpc+tcp";
public static final String GRPC_DOMAIN_SOCKET = "grpc+unix";
public static final String GRPC_TLS = "grpc+tls";
public static final String REUSE_CONNECTION = "arrow-flight-reuse-connection";

private LocationSchemes() {
throw new AssertionError("Do not instantiate this class.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public void fastPathDefaults() {
Assertions.assertFalse(ArrowMessage.ENABLE_ZERO_COPY_WRITE);
}

@Test
public void fallbackLocation() {
Assertions.assertEquals("arrow-flight-reuse-connection://?",
Location.reuseConnection().getUri().toString());
}

/**
* ARROW-6017: we should be able to construct locations for unknown schemes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,13 @@ public List<CloseableEndpointStreamPair> getStreams(final FlightInfo flightInfo)
// It would also be good to identify when the reported location is the same as the original connection's
// Location and skip creating a FlightClient in that scenario.
final URI endpointUri = endpoint.getLocations().get(0).getUri();

if (endpointUri.getScheme().equals(LocationSchemes.REUSE_CONNECTION)) {
endpoints.add(new CloseableEndpointStreamPair(
sqlClient.getStream(endpoint.getTicket(), getOptions()), null));
continue;
}

final Builder builderForEndpoint = new Builder(ArrowFlightSqlClientHandler.this.builder)
.withHost(endpointUri.getHost())
.withPort(endpointUri.getPort())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,15 @@
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;

import org.apache.arrow.driver.jdbc.utils.CoreMockedSqlProducers;
import org.apache.arrow.driver.jdbc.utils.FallbackFlightSqlProducer;
import org.apache.arrow.driver.jdbc.utils.PartitionedFlightSqlProducer;
import org.apache.arrow.flight.FlightEndpoint;
import org.apache.arrow.flight.FlightProducer;
Expand All @@ -55,6 +57,7 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
Expand All @@ -63,6 +66,7 @@
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.rules.ErrorCollector;

import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -455,6 +459,69 @@ allocator, forGrpcInsecure("localhost", 0), rootProducer)
}
}

@Test
public void testFallbackFlightServer() throws Exception {
final Schema schema = new Schema(
Collections.singletonList(Field.nullable("int_column", Types.MinorType.INT.getType())));
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot resultData = VectorSchemaRoot.create(schema, allocator)) {
resultData.setRowCount(1);
((IntVector) resultData.getVector(0)).set(0, 1);

try (final FallbackFlightSqlProducer rootProducer = new FallbackFlightSqlProducer(resultData);
FlightServer rootServer = FlightServer.builder(
allocator, forGrpcInsecure("localhost", 0), rootProducer)
.build()
.start();
Connection newConnection = DriverManager.getConnection(String.format(
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
Statement newStatement = newConnection.createStatement();
ResultSet result = newStatement.executeQuery("fallback")) {
List<Integer> actualData = new ArrayList<>();
while (result.next()) {
actualData.add(result.getInt(1));
}

// Assert
assertEquals(resultData.getRowCount(), actualData.size());
assertTrue(actualData.contains(((IntVector) resultData.getVector(0)).get(0)));
}
}
}

@Test
public void testFallbackSecondFlightServer() throws Exception {
final Schema schema = new Schema(
Collections.singletonList(Field.nullable("int_column", Types.MinorType.INT.getType())));
try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
VectorSchemaRoot resultData = VectorSchemaRoot.create(schema, allocator)) {
resultData.setRowCount(1);
((IntVector) resultData.getVector(0)).set(0, 1);

try (final FallbackFlightSqlProducer rootProducer = new FallbackFlightSqlProducer(resultData);
FlightServer rootServer = FlightServer.builder(
allocator, forGrpcInsecure("localhost", 0), rootProducer)
.build()
.start();
Connection newConnection = DriverManager.getConnection(String.format(
"jdbc:arrow-flight-sql://%s:%d/?useEncryption=false",
rootServer.getLocation().getUri().getHost(), rootServer.getPort()));
Statement newStatement = newConnection.createStatement()) {

// TODO(https://github.com/apache/arrow/issues/38573)
// XXX: we could try to assert more structure but then we'd have to hardcode
// a particular exception chain which may be fragile
Assertions.assertThrows(SQLException.class, () -> {
try (ResultSet result = newStatement.executeQuery("fallback with error")) {
// Empty body
}
});

}
}
}

@Test
public void testShouldRunSelectQueryWithEmptyVectorsEmbedded() throws Exception {
try (Statement statement = connection.createStatement();
Expand Down
Loading

0 comments on commit 1ee1d0a

Please sign in to comment.