Skip to content

Commit

Permalink
apacheGH-40345: [FlightRPC][C++][Java][Go] Add URI scheme to reuse co…
Browse files Browse the repository at this point in the history
…nnection (apache#40084)

### Rationale for this change

https://docs.google.com/document/d/1g9M9FmsZhkewlT1mLibuceQO8ugI0-fqumVAXKFjVGg/edit?usp=sharing

### What changes are included in this PR?

Base implementation sans integration test

### Are these changes tested?

Yes

### Are there any user-facing changes?
No
* GitHub Issue: apache#40345

Lead-authored-by: David Li <li.davidm96@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
3 people authored and thisisnic committed Mar 8, 2024
1 parent 163aa99 commit 36364c0
Show file tree
Hide file tree
Showing 20 changed files with 469 additions and 90 deletions.
5 changes: 5 additions & 0 deletions cpp/src/arrow/flight/flight_internals_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,11 @@ TEST(FlightTypes, LocationUnknownScheme) {
ASSERT_OK(Location::Parse("https://example.com/foo"));
}

TEST(FlightTypes, LocationFallback) {
EXPECT_EQ("arrow-flight-reuse-connection://?", Location::ReuseConnection().ToString());
EXPECT_EQ("arrow-flight-reuse-connection", Location::ReuseConnection().scheme());
}

TEST(FlightTypes, RoundtripStatus) {
// Make sure status codes round trip through our conversions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {
ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint"));
}

TEST(FlightIntegration, LocationReuseConnection) {
ASSERT_OK(RunScenario("location:reuse_connection"));
}

TEST(FlightIntegration, SessionOptions) { ASSERT_OK(RunScenario("session_options")); }

TEST(FlightIntegration, PollFlightInfo) { ASSERT_OK(RunScenario("poll_flight_info")); }
Expand Down
47 changes: 47 additions & 0 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2079,6 +2079,50 @@ class FlightSqlExtensionScenario : public FlightSqlScenario {
return Status::OK();
}
};

/// \brief The server for testing arrow-flight-reuse-connection://.
class ReuseConnectionServer : public FlightServerBase {
public:
Status GetFlightInfo(const ServerCallContext& context,
const FlightDescriptor& descriptor,
std::unique_ptr<FlightInfo>* info) override {
auto location = Location::ReuseConnection();
auto endpoint = FlightEndpoint{{"reuse"}, {location}};
ARROW_ASSIGN_OR_RAISE(auto info_data, FlightInfo::Make(arrow::Schema({}), descriptor,
{endpoint}, -1, -1));
*info = std::make_unique<FlightInfo>(std::move(info_data));
return Status::OK();
}
};

/// \brief A scenario for testing arrow-flight-reuse-connection://?.
class ReuseConnectionScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
*server = std::make_unique<ReuseConnectionServer>();
return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }

Status RunClient(std::unique_ptr<FlightClient> client) override {
auto descriptor = FlightDescriptor::Command("reuse");
ARROW_ASSIGN_OR_RAISE(auto info, client->GetFlightInfo(descriptor));
if (info->endpoints().size() != 1) {
return Status::Invalid("Expected 1 endpoint, got ", info->endpoints().size());
}
const auto& endpoint = info->endpoints().front();
if (endpoint.locations.size() != 1) {
return Status::Invalid("Expected 1 location, got ",
info->endpoints().front().locations.size());
} else if (endpoint.locations.front().ToString() !=
"arrow-flight-reuse-connection://?") {
return Status::Invalid("Expected arrow-flight-reuse-connection://?, got ",
endpoint.locations.front().ToString());
}
return Status::OK();
}
};
} // namespace

Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>* out) {
Expand All @@ -2103,6 +2147,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "expiration_time:renew_flight_endpoint") {
*out = std::make_shared<ExpirationTimeRenewFlightEndpointScenario>();
return Status::OK();
} else if (scenario_name == "location:reuse_connection") {
*out = std::make_shared<ReuseConnectionScenario>();
return Status::OK();
} else if (scenario_name == "session_options") {
*out = std::make_shared<SessionOptionsScenario>();
return Status::OK();
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,12 @@ arrow::Result<Location> Location::Parse(const std::string& uri_string) {
return location;
}

const Location& Location::ReuseConnection() {
static Location kFallback =
Location::Parse("arrow-flight-reuse-connection://?").ValueOrDie();
return kFallback;
}

arrow::Result<Location> Location::ForGrpcTcp(const std::string& host, const int port) {
std::stringstream uri_string;
uri_string << "grpc+tcp://" << host << ':' << port;
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,14 @@ struct ARROW_FLIGHT_EXPORT Location {
/// \brief Initialize a location by parsing a URI string
static arrow::Result<Location> Parse(const std::string& uri_string);

/// \brief Get the fallback URI.
///
/// arrow-flight-reuse-connection://? means that a client may attempt to
/// reuse an existing connection to a Flight service to fetch data instead
/// of creating a new connection to one of the other locations listed in a
/// FlightEndpoint response.
static const Location& ReuseConnection();

/// \brief Initialize a location for a non-TLS, gRPC-based Flight
/// service from a host and port
/// \param[in] host The hostname to connect to
Expand Down
5 changes: 5 additions & 0 deletions dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,6 +608,11 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True,
"RenewFlightEndpoint are working as expected."),
skip_testers={"JS", "C#", "Rust"},
),
Scenario(
"location:reuse_connection",
description="Ensure arrow-flight-reuse-connection is accepted.",
skip_testers={"JS", "C#", "Rust"},
),
Scenario(
"session_options",
description="Ensure Flight SQL Sessions work as expected.",
Expand Down
70 changes: 53 additions & 17 deletions docs/source/format/Flight.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ A client that wishes to download the data would:
connection to the original server to fetch data. Otherwise, the
client must connect to one of the indicated locations.

The server may list "itself" as a location alongside other server
locations. Normally this requires the server to know its public
address, but it may also use the special URI string
``arrow-flight-reuse-connection://?`` to tell clients that they may
reuse an existing connection to the same server, without having to
be able to name itself. See `Connection Reuse`_ below.

In this way, the locations inside an endpoint can also be thought
of as performing look-aside load balancing or service discovery
functions. And the endpoints can represent data that is partitioned
Expand Down Expand Up @@ -307,29 +314,58 @@ well, in which case any `authentication method supported by gRPC

.. _Mutual TLS (mTLS): https://grpc.io/docs/guides/auth/#supported-auth-mechanisms

Transport Implementations
=========================
Location URIs
=============

Flight is primarily defined in terms of its Protobuf and gRPC
specification below, but Arrow implementations may also support
alternative transports (see :ref:`status-flight-rpc`). In that case,
implementations should use the following URI schemes for the given
transport implementations:

+----------------------------+----------------------------+
| Transport | URI Scheme |
+============================+============================+
| gRPC (plaintext) | grpc: or grpc+tcp: |
+----------------------------+----------------------------+
| gRPC (TLS) | grpc+tls: |
+----------------------------+----------------------------+
| gRPC (Unix domain socket) | grpc+unix: |
+----------------------------+----------------------------+
| UCX_ (plaintext) | ucx: |
+----------------------------+----------------------------+
alternative transports (see :ref:`status-flight-rpc`). Clients and
servers need to know which transport to use for a given URI in a
Location, so Flight implementations should use the following URI
schemes for the given transports:

+----------------------------+--------------------------------+
| Transport | URI Scheme |
+============================+================================+
| gRPC (plaintext) | grpc: or grpc+tcp: |
+----------------------------+--------------------------------+
| gRPC (TLS) | grpc+tls: |
+----------------------------+--------------------------------+
| gRPC (Unix domain socket) | grpc+unix: |
+----------------------------+--------------------------------+
| (reuse connection) | arrow-flight-reuse-connection: |
+----------------------------+--------------------------------+
| UCX_ (plaintext) | ucx: |
+----------------------------+--------------------------------+

.. _UCX: https://openucx.org/

Connection Reuse
----------------

"Reuse connection" above is not a particular transport. Instead, it
means that the client may try to execute DoGet against the same server
(and through the same connection) that it originally obtained the
FlightInfo from (i.e., that it called GetFlightInfo against). This is
interpreted the same way as when no specific ``Location`` are
returned.

This allows the server to return "itself" as one possible location to
fetch data without having to know its own public address, which can be
useful in deployments where knowing this would be difficult or
impossible. For example, a developer may forward a remote service in
a cloud environment to their local machine; in this case, the remote
service would have no way to know the local hostname and port that it
is being accessed over.

For compatibility reasons, the URI should always be
``arrow-flight-reuse-connection://?``, with the trailing empty query
string. Java's URI implementation does not accept ``scheme:`` or
``scheme://``, and C++'s implementation does not accept an empty
string, so the obvious candidates are not compatible. The chosen
representation can be parsed by both implementations, as well as Go's
``net/url`` and Python's ``urllib.parse``.

Error Handling
==============

Expand Down
20 changes: 12 additions & 8 deletions format/Flight.proto
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ message FlightInfo {

/*
* Application-defined metadata.
*
*
* There is no inherent or required relationship between this
* and the app_metadata fields in the FlightEndpoints or resulting
* FlightData messages. Since this metadata is application-defined,
Expand Down Expand Up @@ -440,11 +440,15 @@ message FlightEndpoint {
* be redeemed on the current service where the ticket was
* generated.
*
* If the list is not empty, the expectation is that the ticket can
* be redeemed at any of the locations, and that the data returned
* will be equivalent. In this case, the ticket may only be redeemed
* at one of the given locations, and not (necessarily) on the
* current service.
* If the list is not empty, the expectation is that the ticket can be
* redeemed at any of the locations, and that the data returned will be
* equivalent. In this case, the ticket may only be redeemed at one of the
* given locations, and not (necessarily) on the current service. If one
* of the given locations is "arrow-flight-reuse-connection://?", the
* client may redeem the ticket on the service where the ticket was
* generated (i.e., the same as above), in addition to the other
* locations. (This URI was chosen to maximize compatibility, as 'scheme:'
* or 'scheme://' are not accepted by Java's java.net.URI.)
*
* In other words, an application can use multiple locations to
* represent redundant and/or load balanced services.
Expand All @@ -460,7 +464,7 @@ message FlightEndpoint {

/*
* Application-defined metadata.
*
*
* There is no inherent or required relationship between this
* and the app_metadata fields in the FlightInfo or resulting
* FlightData messages. Since this metadata is application-defined,
Expand Down Expand Up @@ -587,7 +591,7 @@ message SetSessionOptionsResult {
message Error {
ErrorValue value = 1;
}

map<string, Error> errors = 1;
}

Expand Down
8 changes: 8 additions & 0 deletions go/arrow/flight/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ const (
CancelStatusNotCancellable = flight.CancelStatus_CANCEL_STATUS_NOT_CANCELLABLE
)

// Constants for Location
const (
// LocationReuseConnection is a special location that tells clients
// they may fetch the data from the same service that they obtained
// the FlightEndpoint response from.
LocationReuseConnection = "arrow-flight-reuse-connection://?"
)

// RegisterFlightServiceServer registers an existing flight server onto an
// existing grpc server, or anything that is a grpc service registrar.
func RegisterFlightServiceServer(s *grpc.Server, srv FlightServer) {
Expand Down
52 changes: 52 additions & 0 deletions go/arrow/internal/flight_integration/scenario.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func GetScenario(name string, args ...string) Scenario {
return &expirationTimeCancelFlightInfoScenarioTester{}
case "expiration_time:renew_flight_endpoint":
return &expirationTimeRenewFlightEndpointScenarioTester{}
case "location:reuse_connection":
return &locationReuseConnectionScenarioTester{}
case "poll_flight_info":
return &pollFlightInfoScenarioTester{}
case "app_metadata_flight_info_endpoint":
Expand Down Expand Up @@ -1139,6 +1141,56 @@ func (tester *expirationTimeRenewFlightEndpointScenarioTester) RunClient(addr st
return nil
}

type locationReuseConnectionScenarioTester struct {
flight.BaseFlightServer
}

func (m *locationReuseConnectionScenarioTester) GetFlightInfo(ctx context.Context, desc *flight.FlightDescriptor) (*flight.FlightInfo, error) {
return &flight.FlightInfo{
Schema: flight.SerializeSchema(arrow.NewSchema([]arrow.Field{}, nil), memory.DefaultAllocator),
FlightDescriptor: desc,
Endpoint: []*flight.FlightEndpoint{{
Ticket: &flight.Ticket{Ticket: []byte("reuse")},
Location: []*flight.Location{{Uri: flight.LocationReuseConnection}},
}},
TotalRecords: -1,
TotalBytes: -1,
}, nil
}

func (tester *locationReuseConnectionScenarioTester) MakeServer(port int) flight.Server {
srv := flight.NewServerWithMiddleware(nil)
srv.RegisterFlightService(tester)
initServer(port, srv)
return srv
}

func (tester *locationReuseConnectionScenarioTester) RunClient(addr string, opts ...grpc.DialOption) error {
client, err := flight.NewClientWithMiddleware(addr, nil, nil, opts...)
if err != nil {
return err
}
defer client.Close()

ctx := context.Background()
info, err := client.GetFlightInfo(ctx, &flight.FlightDescriptor{Type: flight.DescriptorCMD, Cmd: []byte("reuse")})
if err != nil {
return err
}

if len(info.Endpoint) != 1 {
return fmt.Errorf("expected 1 endpoint, got %d", len(info.Endpoint))
}
endpoint := info.Endpoint[0]
if len(endpoint.Location) != 1 {
return fmt.Errorf("expected 1 location, got %d", len(endpoint.Location))
} else if endpoint.Location[0].Uri != flight.LocationReuseConnection {
return fmt.Errorf("expected %s, got %s", flight.LocationReuseConnection, endpoint.Location[0].Uri)
}

return nil
}

type pollFlightInfoScenarioTester struct {
flight.BaseFlightServer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ Flight.Location toProtocol() {
return Flight.Location.newBuilder().setUri(uri.toString()).build();
}

/**
* Construct a special URI to indicate to clients that they may fetch data by reusing
* an existing connection to a Flight RPC server.
*/
public static Location reuseConnection() {
try {
return new Location(new URI(LocationSchemes.REUSE_CONNECTION, "", "", "", null));
} catch (URISyntaxException e) {
// This should never happen.
throw new IllegalArgumentException(e);
}
}

/**
* Construct a URI for a Flight+gRPC server without transport security.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public final class LocationSchemes {
public static final String GRPC_INSECURE = "grpc+tcp";
public static final String GRPC_DOMAIN_SOCKET = "grpc+unix";
public static final String GRPC_TLS = "grpc+tls";
public static final String REUSE_CONNECTION = "arrow-flight-reuse-connection";

private LocationSchemes() {
throw new AssertionError("Do not instantiate this class.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ public void fastPathDefaults() {
Assertions.assertFalse(ArrowMessage.ENABLE_ZERO_COPY_WRITE);
}

@Test
public void fallbackLocation() {
Assertions.assertEquals("arrow-flight-reuse-connection://?",
Location.reuseConnection().getUri().toString());
}

/**
* ARROW-6017: we should be able to construct locations for unknown schemes.
*/
Expand Down
Loading

0 comments on commit 36364c0

Please sign in to comment.