From 7cdfb283aa95acc2fa3985881600fac7bcbe7471 Mon Sep 17 00:00:00 2001 From: prmoore77 Date: Wed, 5 Jul 2023 10:48:40 -0400 Subject: [PATCH 1/2] Minor enhancements to README.md. Minor fixes recommended from linter. --- README.md | 21 +++-- pom.xml | 4 +- .../spark/FlightColumnarPartitionReader.java | 77 +++++++++---------- 3 files changed, 53 insertions(+), 49 deletions(-) diff --git a/README.md b/README.md index b7219fa..514e281 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,8 @@ Spark source for Flight RPC enabled endpoints ========================================= -[![Build Status](https://travis-ci.org/rymurr/flight-spark-source.svg?branch=master)](https://travis-ci.org/rymurr/flight-spark-source) +[](https://github.com/rymurr/flight-spark-source) +[![Build Status](https://github.com/rymurr/flight-spark-source/actions/workflows/maven-build.yml/badge.svg)](https://github.com/rymurr/flight-spark-source/actions/workflows/maven-build.yml) This uses the new [Source V2 Interface](https://databricks.com/session/apache-spark-data-source-v2) to connect to [Apache Arrow Flight](https://www.dremio.com/understanding-apache-arrow-flight/) endpoints. It is a prototype of what is @@ -21,12 +22,15 @@ It currently lacks: * leverage the transactional capabilities of the Spark Source V2 interface * publish benchmark test -## How to use -You can choose to build the JAR locally, or use one of the archived JARs from a Github Actions workflow run. +## Usage +You can choose to build the JAR locally, or use one of the archived JAR artifacts built from a [Github Actions workflow run](https://github.com/rymurr/flight-spark-source/actions/workflows/maven-build.yml). -1. Take the JAR file named: `flight-spark-source-1.0-SNAPSHOT-shaded.jar` - and copy it to the spark master node. For the sake of this example, we will use the `/tmp` directory -2. Ensure you have a Flight server running and accessible to your Spark cluster. For example of a Python Flight RPC server - see [this link](https://arrow.apache.org/cookbook/py/flight.html#streaming-parquet-storage-service). -2. On the Spark master - start an interactive Python (or PySpark) session and run something like: +1. Take the built JAR file named: `flight-spark-source-1.0-SNAPSHOT-shaded.jar` - and copy it to the spark master node. For the sake of this example, we will use the `/tmp` directory +2. Ensure you have a Flight server running and accessible to your Spark cluster. For example of a Python Flight RPC server - see [this link](https://arrow.apache.org/cookbook/py/flight.html#streaming-parquet-storage-service). + NOTE: you will have to add a `get_schema` end-point to that example server for it to work - with signature: +```def get_schema(self, context, descriptor) -> pyarrow.flight.SchemaResult``` + See this [link](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightClient.html#pyarrow.flight.FlightClient.get_schema) for more details. +3. On the Spark master - start an interactive Python (or PySpark) session and run something like: ``` import os from pyspark.sql import SparkSession @@ -36,7 +40,8 @@ spark = (SparkSession .builder .appName("flight client") .config("spark.jars", "/tmp/flight-spark-source-1.0-SNAPSHOT-shaded.jar") - .getOrCreate())``` + .getOrCreate() + )``` # Read from a Flight RPC server using an arbitrary string containing either a command or path # Note - this will call the Flight RPC Server's "get_schema" end-point (which must be present to use the connector) @@ -48,7 +53,7 @@ df = (spark.read.format('cdap.org.apache.arrow.flight.spark') # ------------------------------------------------------------------- # Uncomment the following 2 lines to use authentication if your Flight RPC server supports Basic Token auth # .option('username', 'flight_user') - # .option('password', os.environ['FLIGHT_PASSWORD'] # Using an env var containing the password here for better security + # .option('password', os.environ['FLIGHT_PASSWORD']) # Using an env var containing the password here for better security # ------------------------------------------------------------------- # Uncomment the following 2 lines to use MTLS client certificate verification if your Flight RPC server supports it (MTLS client certs MUST be version 3 or above!!!) # .option('clientCertificate', mtls_cert_chain) # In this example, mtls_cert_chain is a str with contents of a PEM-encoded client cert (signed by the servers verification CA) diff --git a/pom.xml b/pom.xml index 11d7b76..33c65f4 100644 --- a/pom.xml +++ b/pom.xml @@ -82,7 +82,7 @@ 3.0 - Copyright (C) ${project.inceptionYear} ${owner} + Copyright (C) ${project.inceptionYear} Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -555,7 +555,7 @@ ch.qos.logback logback-classic - 1.2.3 + 1.2.9 test diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightColumnarPartitionReader.java b/src/main/java/org/apache/arrow/flight/spark/FlightColumnarPartitionReader.java index 1f019e1..1c27223 100644 --- a/src/main/java/org/apache/arrow/flight/spark/FlightColumnarPartitionReader.java +++ b/src/main/java/org/apache/arrow/flight/spark/FlightColumnarPartitionReader.java @@ -27,48 +27,47 @@ import org.apache.spark.sql.vectorized.ColumnVector; public class FlightColumnarPartitionReader implements PartitionReader { - private final FlightClientFactory clientFactory;; - private final FlightClient client; - private final CredentialCallOption callOption; - private final FlightStream stream; + private final FlightClientFactory clientFactory; + private final FlightClient client; + private final FlightStream stream; - public FlightColumnarPartitionReader(FlightClientOptions clientOptions, FlightPartition partition) { - // TODO - Should we handle multiple locations? - clientFactory = new FlightClientFactory(partition.getEndpoint().get().getLocations().get(0), clientOptions); - client = clientFactory.apply(); - callOption = clientFactory.getCallOption(); - stream = client.getStream(partition.getEndpoint().get().getTicket(), callOption); - } + public FlightColumnarPartitionReader(FlightClientOptions clientOptions, FlightPartition partition) { + // TODO - Should we handle multiple locations? + clientFactory = new FlightClientFactory(partition.getEndpoint().get().getLocations().get(0), clientOptions); + client = clientFactory.apply(); + CredentialCallOption callOption = clientFactory.getCallOption(); + stream = client.getStream(partition.getEndpoint().get().getTicket(), callOption); + } - // This is written this way because the Spark interface iterates in a different way. - // E.g., .next() -> .get() vs. .hasNext() -> .next() - @Override - public boolean next() throws IOException { - try { - return stream.next(); - } catch (RuntimeException e) { - throw new IOException(e); - } + // This is written this way because the Spark interface iterates in a different way. + // E.g., .next() -> .get() vs. .hasNext() -> .next() + @Override + public boolean next() throws IOException { + try { + return stream.next(); + } catch (RuntimeException e) { + throw new IOException(e); } + } - @Override - public ColumnarBatch get() { - ColumnarBatch batch = new ColumnarBatch( - stream.getRoot().getFieldVectors() - .stream() - .map(FlightArrowColumnVector::new) - .toArray(ColumnVector[]::new) - ); - batch.setNumRows(stream.getRoot().getRowCount()); - return batch; - } + @Override + public ColumnarBatch get() { + ColumnarBatch batch = new ColumnarBatch( + stream.getRoot().getFieldVectors() + .stream() + .map(FlightArrowColumnVector::new) + .toArray(ColumnVector[]::new) + ); + batch.setNumRows(stream.getRoot().getRowCount()); + return batch; + } - @Override - public void close() throws IOException { - try { - AutoCloseables.close(stream, client, clientFactory); - } catch (Exception e) { - throw new IOException(e); - } - } + @Override + public void close() throws IOException { + try { + AutoCloseables.close(stream, client, clientFactory); + } catch (Exception e) { + throw new IOException(e); + } + } } From 13fefc2895d0603ef4e549d06afce18b4404b497 Mon Sep 17 00:00:00 2001 From: prmoore77 Date: Wed, 5 Jul 2023 10:56:03 -0400 Subject: [PATCH 2/2] Minor fix --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 514e281..52c77c5 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ It currently lacks: You can choose to build the JAR locally, or use one of the archived JAR artifacts built from a [Github Actions workflow run](https://github.com/rymurr/flight-spark-source/actions/workflows/maven-build.yml). 1. Take the built JAR file named: `flight-spark-source-1.0-SNAPSHOT-shaded.jar` - and copy it to the spark master node. For the sake of this example, we will use the `/tmp` directory -2. Ensure you have a Flight server running and accessible to your Spark cluster. For example of a Python Flight RPC server - see [this link](https://arrow.apache.org/cookbook/py/flight.html#streaming-parquet-storage-service). +2. Ensure you have a Flight server running and accessible to your Spark cluster. For an example of a Python Flight RPC server - see [this link](https://arrow.apache.org/cookbook/py/flight.html#streaming-parquet-storage-service). NOTE: you will have to add a `get_schema` end-point to that example server for it to work - with signature: ```def get_schema(self, context, descriptor) -> pyarrow.flight.SchemaResult``` See this [link](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightClient.html#pyarrow.flight.FlightClient.get_schema) for more details.