Skip to content

Commit

Permalink
Add temporary usage examples to Flight/FlightSQL
Browse files Browse the repository at this point in the history
  • Loading branch information
benibus committed Mar 29, 2024
1 parent 872537b commit d571b33
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
6 changes: 6 additions & 0 deletions cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
#include "arrow/flight/sql/sql_info_internal.h"
#include "arrow/type.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging_v2.h"

#define ARROW_FLIGHT_SQL_LOG(LEVEL, ...) \
ARROW_LOG_WITH("FlightSqlServer", LEVEL, __VA_ARGS__)

#define PROPERTY_TO_OPTIONAL(COMMAND, PROPERTY) \
COMMAND.has_##PROPERTY() ? std::make_optional(COMMAND.PROPERTY()) : std::nullopt
Expand Down Expand Up @@ -723,6 +727,7 @@ Status FlightSqlServerBase::DoPut(const ServerCallContext& context,
if (!any.ParseFromArray(request.cmd.data(), static_cast<int>(request.cmd.size()))) {
return Status::Invalid("Unable to parse command");
}
ARROW_FLIGHT_SQL_LOG(INFO, "[DoPut] command: ", request.cmd);

if (any.Is<pb::sql::CommandStatementUpdate>()) {
ARROW_ASSIGN_OR_RAISE(StatementUpdate internal_command,
Expand Down Expand Up @@ -794,6 +799,7 @@ Status FlightSqlServerBase::DoAction(const ServerCallContext& context,
const Action& action,
std::unique_ptr<ResultStream>* result_stream) {
std::vector<Result> results;
ARROW_FLIGHT_SQL_LOG(INFO, "[DoAction] action.type: ", action.type);
if (action.type == ActionType::kCancelFlightInfo.type) {
std::string_view body(*action.body);
ARROW_ASSIGN_OR_RAISE(auto request, CancelFlightInfoRequest::Deserialize(body));
Expand Down
51 changes: 51 additions & 0 deletions cpp/src/arrow/flight/sql/server_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "arrow/array/array_binary.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/array_primitive.h"
#include "arrow/flight/server_tracing_middleware.h"
#include "arrow/flight/sql/client.h"
#include "arrow/flight/sql/column_metadata.h"
#include "arrow/flight/sql/example/sqlite_server.h"
Expand All @@ -37,11 +38,59 @@
#include "arrow/table.h"
#include "arrow/testing/builder.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/logging_v2.h"

#ifdef ARROW_TELEMETRY
#include "arrow/telemetry/logging.h"
#include "arrow/util/tracing_internal.h"

#include <opentelemetry/context/propagation/global_propagator.h>
#include <opentelemetry/context/propagation/text_map_propagator.h>
#include <opentelemetry/sdk/trace/processor.h>
#include <opentelemetry/sdk/trace/tracer_provider.h>
#include <opentelemetry/trace/propagation/http_trace_context.h>
#endif

using arrow::internal::checked_cast;

namespace arrow::flight::sql {

#ifdef ARROW_TELEMETRY
class OtelEnvironment : public ::testing::Environment {
public:
void SetUp() override {
std::vector<std::unique_ptr<opentelemetry::sdk::trace::SpanProcessor>> processors;
auto tracer_provider =
opentelemetry::nostd::shared_ptr<opentelemetry::sdk::trace::TracerProvider>(
new opentelemetry::sdk::trace::TracerProvider(std::move(processors)));
opentelemetry::trace::Provider::SetTracerProvider(std::move(tracer_provider));

opentelemetry::context::propagation::GlobalTextMapPropagator::SetGlobalPropagator(
opentelemetry::nostd::shared_ptr<
opentelemetry::context::propagation::TextMapPropagator>(
new opentelemetry::trace::propagation::HttpTraceContext()));

auto provider_options = telemetry::LoggerProviderOptions::Defaults();
ASSERT_OK(telemetry::GlobalLoggerProvider::Initialize(provider_options));
auto logging_options = telemetry::LoggingOptions::Defaults();
logging_options.severity_threshold = telemetry::LogLevel::ARROW_TRACE;
logging_options.flush_severity = telemetry::LogLevel::ARROW_TRACE;
std::shared_ptr<telemetry::Logger> logger1, logger2;
ASSERT_OK_AND_ASSIGN(logger1, telemetry::GlobalLoggerProvider::MakeLogger(
"FlightGrpcServer", logging_options));
ASSERT_OK_AND_ASSIGN(logger2, telemetry::GlobalLoggerProvider::MakeLogger(
"FlightSqlServer", logging_options));
ASSERT_OK(util::LoggerRegistry::RegisterLogger(logger1->name(), logger1));
ASSERT_OK(util::LoggerRegistry::RegisterLogger(logger2->name(), logger2));
}

void TearDown() override { EXPECT_TRUE(telemetry::GlobalLoggerProvider::ShutDown()); }
};

static ::testing::Environment* kOtelEnvironment =
::testing::AddGlobalTestEnvironment(new OtelEnvironment);
#endif

/// \brief Auxiliary variant visitor used to assert that GetSqlInfo's values are
/// correctly placed on its DenseUnionArray
class SqlInfoDenseUnionValidator {
Expand Down Expand Up @@ -150,6 +199,8 @@ class TestFlightSqlServer : public ::testing::Test {
void SetUp() override {
ASSERT_OK_AND_ASSIGN(auto location, Location::ForGrpcTcp("0.0.0.0", 0));
arrow::flight::FlightServerOptions options(location);
options.middleware.push_back({"FlightSqlTracingMiddleware",
arrow::flight::MakeTracingServerMiddlewareFactory()});
ASSERT_OK_AND_ASSIGN(server, example::SQLiteFlightSqlServer::Create());
ASSERT_OK(server->Init(options));

Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/flight/transport/grpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,12 @@
#include "arrow/flight/transport_server.h"
#include "arrow/flight/types.h"
#include "arrow/util/logging.h"
#include "arrow/util/logging_v2.h"
#include "arrow/util/uri.h"

#define ARROW_FLIGHT_LOG(LEVEL, ...) \
ARROW_LOG_WITH("FlightGrpcServer", LEVEL, __VA_ARGS__)

namespace arrow {
namespace flight {
namespace transport {
Expand Down Expand Up @@ -335,6 +339,8 @@ class GrpcServiceHandler final : public FlightService::Service {
return flight_context.FinishRequest(result);
}
if (instance != nullptr) {
ARROW_FLIGHT_LOG(
INFO, "[MakeCallContext] Started call for middleware: ", instance->name());
flight_context.middleware_.push_back(instance);
flight_context.middleware_map_.insert({factory.first, instance});
}
Expand Down Expand Up @@ -531,6 +537,7 @@ class GrpcServiceHandler final : public FlightService::Service {
CHECK_ARG_NOT_NULL(flight_context, request, "Action cannot be null");
Action action;
SERVICE_RETURN_NOT_OK(flight_context, internal::FromProto(*request, &action));
ARROW_FLIGHT_LOG(INFO, "[DoAction] action.type=", action.type);

std::unique_ptr<ResultStream> results;
SERVICE_RETURN_NOT_OK(flight_context,
Expand Down

0 comments on commit d571b33

Please sign in to comment.