Skip to content

Commit

Permalink
ARROW-16032: [C++] Migrate FlightClient API to Result<>
Browse files Browse the repository at this point in the history
Closes apache#12719 from zagto/flight-api-result-client

Authored-by: Tobias Zagorni <tobias@zagorni.eu>
Signed-off-by: David Li <li.davidm96@gmail.com>
  • Loading branch information
zagto authored and lidavidm committed Apr 5, 2022
1 parent 8eaa995 commit 9e08c50
Show file tree
Hide file tree
Showing 15 changed files with 496 additions and 444 deletions.
19 changes: 8 additions & 11 deletions c_glib/arrow-flight-glib/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,11 @@ gaflight_client_new(GAFlightLocation *location,
arrow::Status status;
if (options) {
const auto flight_options = gaflight_client_options_get_raw(options);
status = arrow::flight::FlightClient::Connect(*flight_location,
*flight_options,
&flight_client);
auto result = arrow::flight::FlightClient::Connect(*flight_location, *flight_options);
status = std::move(result).Value(&flight_client);
} else {
status = arrow::flight::FlightClient::Connect(*flight_location,
&flight_client);
auto result = arrow::flight::FlightClient::Connect(*flight_location);
status = std::move(result).Value(&flight_client);
}
if (garrow::check(error, status, "[flight-client][new]")) {
return gaflight_client_new_raw(flight_client.release());
Expand Down Expand Up @@ -315,9 +314,8 @@ gaflight_client_list_flights(GAFlightClient *client,
flight_options = gaflight_call_options_get_raw(options);
}
std::unique_ptr<arrow::flight::FlightListing> flight_listing;
auto status = flight_client->ListFlights(*flight_options,
*flight_criteria,
&flight_listing);
auto result = flight_client->ListFlights(*flight_options, *flight_criteria);
auto status = std::move(result).Value(&flight_listing);
if (!garrow::check(error,
status,
"[flight-client][list-flights]")) {
Expand Down Expand Up @@ -369,9 +367,8 @@ gaflight_client_do_get(GAFlightClient *client,
flight_options = gaflight_call_options_get_raw(options);
}
std::unique_ptr<arrow::flight::FlightStreamReader> flight_reader;
auto status = flight_client->DoGet(*flight_options,
*flight_ticket,
&flight_reader);
auto result = flight_client->DoGet(*flight_options, *flight_ticket);
auto status = std::move(result).Value(&flight_reader);
if (garrow::check(error,
status,
"[flight-client][do-get]")) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/arrow/flight_sql_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ arrow::Status Main() {

// Set up the Flight SQL client
std::unique_ptr<flight::FlightClient> flight_client;
ARROW_RETURN_NOT_OK(flight::FlightClient::Connect(location, &flight_client));
ARROW_ASSIGN_OR_RAISE(flight_client, flight::FlightClient::Connect(location));
std::unique_ptr<flightsql::FlightSqlClient> client(
new flightsql::FlightSqlClient(std::move(flight_client)));

Expand Down
145 changes: 106 additions & 39 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/util/logging.h"
#include "arrow/util/make_unique.h"

#include "arrow/flight/client_auth.h"
#include "arrow/flight/serialization_internal.h"
Expand Down Expand Up @@ -489,21 +490,32 @@ FlightClient::~FlightClient() {
}
}

arrow::Result<std::unique_ptr<FlightClient>> FlightClient::Connect(
const Location& location) {
return Connect(location, FlightClientOptions::Defaults());
}

Status FlightClient::Connect(const Location& location,
std::unique_ptr<FlightClient>* client) {
return Connect(location, FlightClientOptions::Defaults(), client);
return Connect(location, FlightClientOptions::Defaults()).Value(client);
}

Status FlightClient::Connect(const Location& location, const FlightClientOptions& options,
std::unique_ptr<FlightClient>* client) {
arrow::Result<std::unique_ptr<FlightClient>> FlightClient::Connect(
const Location& location, const FlightClientOptions& options) {
flight::transport::grpc::InitializeFlightGrpcClient();

client->reset(new FlightClient);
(*client)->write_size_limit_bytes_ = options.write_size_limit_bytes;
std::unique_ptr<FlightClient> client(new FlightClient());
client->write_size_limit_bytes_ = options.write_size_limit_bytes;
const auto scheme = location.scheme();
ARROW_ASSIGN_OR_RAISE((*client)->transport_,
ARROW_ASSIGN_OR_RAISE(client->transport_,
internal::GetDefaultTransportRegistry()->MakeClient(scheme));
return (*client)->transport_->Init(options, location, *location.uri_);
RETURN_NOT_OK(client->transport_->Init(options, location, *location.uri_));
return client;
}

Status FlightClient::Connect(const Location& location, const FlightClientOptions& options,
std::unique_ptr<FlightClient>* client) {
return Connect(location, options).Value(client);
}

Status FlightClient::Authenticate(const FlightCallOptions& options,
Expand All @@ -519,23 +531,44 @@ arrow::Result<std::pair<std::string, std::string>> FlightClient::AuthenticateBas
return transport_->AuthenticateBasicToken(options, username, password);
}

arrow::Result<std::unique_ptr<ResultStream>> FlightClient::DoAction(
const FlightCallOptions& options, const Action& action) {
std::unique_ptr<ResultStream> results;
RETURN_NOT_OK(CheckOpen());
RETURN_NOT_OK(transport_->DoAction(options, action, &results));
return results;
}

Status FlightClient::DoAction(const FlightCallOptions& options, const Action& action,
std::unique_ptr<ResultStream>* results) {
return DoAction(options, action).Value(results);
}

arrow::Result<std::vector<ActionType>> FlightClient::ListActions(
const FlightCallOptions& options) {
std::vector<ActionType> actions;
RETURN_NOT_OK(CheckOpen());
return transport_->DoAction(options, action, results);
RETURN_NOT_OK(transport_->ListActions(options, &actions));
return actions;
}

Status FlightClient::ListActions(const FlightCallOptions& options,
std::vector<ActionType>* actions) {
return ListActions(options).Value(actions);
}

arrow::Result<std::unique_ptr<FlightInfo>> FlightClient::GetFlightInfo(
const FlightCallOptions& options, const FlightDescriptor& descriptor) {
std::unique_ptr<FlightInfo> info;
RETURN_NOT_OK(CheckOpen());
return transport_->ListActions(options, actions);
RETURN_NOT_OK(transport_->GetFlightInfo(options, descriptor, &info));
return info;
}

Status FlightClient::GetFlightInfo(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
std::unique_ptr<FlightInfo>* info) {
RETURN_NOT_OK(CheckOpen());
return transport_->GetFlightInfo(options, descriptor, info);
return GetFlightInfo(options, descriptor).Value(info);
}

arrow::Result<std::unique_ptr<SchemaResult>> FlightClient::GetSchema(
Expand All @@ -550,63 +583,97 @@ Status FlightClient::GetSchema(const FlightCallOptions& options,
return GetSchema(options, descriptor).Value(schema_result);
}

arrow::Result<std::unique_ptr<FlightListing>> FlightClient::ListFlights() {
return ListFlights({}, {});
}

Status FlightClient::ListFlights(std::unique_ptr<FlightListing>* listing) {
return ListFlights({}, {}).Value(listing);
}

arrow::Result<std::unique_ptr<FlightListing>> FlightClient::ListFlights(
const FlightCallOptions& options, const Criteria& criteria) {
std::unique_ptr<FlightListing> listing;
RETURN_NOT_OK(CheckOpen());
return ListFlights({}, {}, listing);
RETURN_NOT_OK(transport_->ListFlights(options, criteria, &listing));
return listing;
}

Status FlightClient::ListFlights(const FlightCallOptions& options,
const Criteria& criteria,
std::unique_ptr<FlightListing>* listing) {
return ListFlights(options, criteria).Value(listing);
}

arrow::Result<std::unique_ptr<FlightStreamReader>> FlightClient::DoGet(
const FlightCallOptions& options, const Ticket& ticket) {
RETURN_NOT_OK(CheckOpen());
return transport_->ListFlights(options, criteria, listing);
std::unique_ptr<internal::ClientDataStream> remote_stream;
RETURN_NOT_OK(transport_->DoGet(options, ticket, &remote_stream));
auto stream_reader = arrow::internal::make_unique<ClientStreamReader>(
std::move(remote_stream), options.read_options, options.stop_token,
options.memory_manager);
// Eagerly read the schema
RETURN_NOT_OK(stream_reader->EnsureDataStarted());
return stream_reader;
}

Status FlightClient::DoGet(const FlightCallOptions& options, const Ticket& ticket,
std::unique_ptr<FlightStreamReader>* stream) {
return DoGet(options, ticket).Value(stream);
}

arrow::Result<FlightClient::DoPutResult> FlightClient::DoPut(
const FlightCallOptions& options, const FlightDescriptor& descriptor,
const std::shared_ptr<Schema>& schema) {
RETURN_NOT_OK(CheckOpen());
std::unique_ptr<internal::ClientDataStream> remote_stream;
RETURN_NOT_OK(transport_->DoGet(options, ticket, &remote_stream));
*stream = std::unique_ptr<ClientStreamReader>(
new ClientStreamReader(std::move(remote_stream), options.read_options,
options.stop_token, options.memory_manager));
// Eagerly read the schema
return static_cast<ClientStreamReader*>(stream->get())->EnsureDataStarted();
RETURN_NOT_OK(transport_->DoPut(options, &remote_stream));
std::shared_ptr<internal::ClientDataStream> shared_stream = std::move(remote_stream);
DoPutResult result;
result.reader = arrow::internal::make_unique<ClientMetadataReader>(shared_stream);
result.writer = arrow::internal::make_unique<ClientStreamWriter>(
std::move(shared_stream), options.write_options, write_size_limit_bytes_,
descriptor);
RETURN_NOT_OK(result.writer->Begin(schema, options.write_options));
return result;
}

Status FlightClient::DoPut(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
const std::shared_ptr<Schema>& schema,
std::unique_ptr<FlightStreamWriter>* stream,
std::unique_ptr<FlightStreamWriter>* writer,
std::unique_ptr<FlightMetadataReader>* reader) {
ARROW_ASSIGN_OR_RAISE(auto result, DoPut(options, descriptor, schema));
*writer = std::move(result.writer);
*reader = std::move(result.reader);
return Status::OK();
}

arrow::Result<FlightClient::DoExchangeResult> FlightClient::DoExchange(
const FlightCallOptions& options, const FlightDescriptor& descriptor) {
RETURN_NOT_OK(CheckOpen());
std::unique_ptr<internal::ClientDataStream> remote_stream;
RETURN_NOT_OK(transport_->DoPut(options, &remote_stream));
RETURN_NOT_OK(transport_->DoExchange(options, &remote_stream));
std::shared_ptr<internal::ClientDataStream> shared_stream = std::move(remote_stream);
*reader =
std::unique_ptr<FlightMetadataReader>(new ClientMetadataReader(shared_stream));
*stream = std::unique_ptr<FlightStreamWriter>(
new ClientStreamWriter(std::move(shared_stream), options.write_options,
write_size_limit_bytes_, descriptor));
RETURN_NOT_OK((*stream)->Begin(schema, options.write_options));
return Status::OK();
DoExchangeResult result;
result.reader = arrow::internal::make_unique<ClientStreamReader>(
shared_stream, options.read_options, options.stop_token, options.memory_manager);
auto stream_writer = arrow::internal::make_unique<ClientStreamWriter>(
std::move(shared_stream), options.write_options, write_size_limit_bytes_,
descriptor);
RETURN_NOT_OK(stream_writer->Begin());
result.writer = std::move(stream_writer);
return result;
}

Status FlightClient::DoExchange(const FlightCallOptions& options,
const FlightDescriptor& descriptor,
std::unique_ptr<FlightStreamWriter>* writer,
std::unique_ptr<FlightStreamReader>* reader) {
RETURN_NOT_OK(CheckOpen());
std::unique_ptr<internal::ClientDataStream> remote_stream;
RETURN_NOT_OK(transport_->DoExchange(options, &remote_stream));
std::shared_ptr<internal::ClientDataStream> shared_stream = std::move(remote_stream);
*reader = std::unique_ptr<FlightStreamReader>(new ClientStreamReader(
shared_stream, options.read_options, options.stop_token, options.memory_manager));
auto stream_writer = std::unique_ptr<ClientStreamWriter>(
new ClientStreamWriter(std::move(shared_stream), options.write_options,
write_size_limit_bytes_, descriptor));
RETURN_NOT_OK(stream_writer->Begin());
*writer = std::move(stream_writer);
ARROW_ASSIGN_OR_RAISE(auto result, DoExchange(options, descriptor));
*writer = std::move(result.writer);
*reader = std::move(result.reader);
return Status::OK();
}

Expand Down
Loading

0 comments on commit 9e08c50

Please sign in to comment.