Skip to content

Commit

Permalink
Merge pull request #20 from prmoore77/master
Browse files Browse the repository at this point in the history
Minor tweaks to README.md and some minor fixes recommended by linter
  • Loading branch information
rymurr authored Jul 5, 2023
2 parents 6367ae1 + 13fefc2 commit bb2ed00
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 49 deletions.
21 changes: 13 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
Spark source for Flight RPC enabled endpoints
=========================================

[![Build Status](https://travis-ci.org/rymurr/flight-spark-source.svg?branch=master)](https://travis-ci.org/rymurr/flight-spark-source)
[<img src="https://img.shields.io/badge/GitHub-rymurr%2Fflight--spark--source-blue.svg?logo=Github">](https://github.com/rymurr/flight-spark-source)
[![Build Status](https://github.com/rymurr/flight-spark-source/actions/workflows/maven-build.yml/badge.svg)](https://github.com/rymurr/flight-spark-source/actions/workflows/maven-build.yml)

This uses the new [Source V2 Interface](https://databricks.com/session/apache-spark-data-source-v2) to connect to
[Apache Arrow Flight](https://www.dremio.com/understanding-apache-arrow-flight/) endpoints. It is a prototype of what is
Expand All @@ -21,12 +22,15 @@ It currently lacks:
* leverage the transactional capabilities of the Spark Source V2 interface
* publish benchmark test

## How to use
You can choose to build the JAR locally, or use one of the archived JARs from a Github Actions workflow run.
## Usage
You can choose to build the JAR locally, or use one of the archived JAR artifacts built from a [Github Actions workflow run](https://github.com/rymurr/flight-spark-source/actions/workflows/maven-build.yml).

1. Take the JAR file named: `flight-spark-source-1.0-SNAPSHOT-shaded.jar` - and copy it to the spark master node. For the sake of this example, we will use the `/tmp` directory
2. Ensure you have a Flight server running and accessible to your Spark cluster. For example of a Python Flight RPC server - see [this link](https://arrow.apache.org/cookbook/py/flight.html#streaming-parquet-storage-service).
2. On the Spark master - start an interactive Python (or PySpark) session and run something like:
1. Take the built JAR file named: `flight-spark-source-1.0-SNAPSHOT-shaded.jar` - and copy it to the spark master node. For the sake of this example, we will use the `/tmp` directory
2. Ensure you have a Flight server running and accessible to your Spark cluster. For an example of a Python Flight RPC server - see [this link](https://arrow.apache.org/cookbook/py/flight.html#streaming-parquet-storage-service).
NOTE: you will have to add a `get_schema` end-point to that example server for it to work - with signature:
```def get_schema(self, context, descriptor) -> pyarrow.flight.SchemaResult```
See this [link](https://arrow.apache.org/docs/python/generated/pyarrow.flight.FlightClient.html#pyarrow.flight.FlightClient.get_schema) for more details.
3. On the Spark master - start an interactive Python (or PySpark) session and run something like:
```
import os
from pyspark.sql import SparkSession
Expand All @@ -36,7 +40,8 @@ spark = (SparkSession
.builder
.appName("flight client")
.config("spark.jars", "/tmp/flight-spark-source-1.0-SNAPSHOT-shaded.jar")
.getOrCreate())```
.getOrCreate()
)```

# Read from a Flight RPC server using an arbitrary string containing either a command or path
# Note - this will call the Flight RPC Server's "get_schema" end-point (which must be present to use the connector)
Expand All @@ -48,7 +53,7 @@ df = (spark.read.format('cdap.org.apache.arrow.flight.spark')
# -------------------------------------------------------------------
# Uncomment the following 2 lines to use authentication if your Flight RPC server supports Basic Token auth
# .option('username', 'flight_user')
# .option('password', os.environ['FLIGHT_PASSWORD'] # Using an env var containing the password here for better security
# .option('password', os.environ['FLIGHT_PASSWORD']) # Using an env var containing the password here for better security
# -------------------------------------------------------------------
# Uncomment the following 2 lines to use MTLS client certificate verification if your Flight RPC server supports it (MTLS client certs MUST be version 3 or above!!!)
# .option('clientCertificate', mtls_cert_chain) # In this example, mtls_cert_chain is a str with contents of a PEM-encoded client cert (signed by the servers verification CA)
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@
<version>3.0</version>
<configuration>
<inlineHeader>
Copyright (C) ${project.inceptionYear} ${owner}
Copyright (C) ${project.inceptionYear}

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -555,7 +555,7 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<version>1.2.9</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,48 +27,47 @@
import org.apache.spark.sql.vectorized.ColumnVector;

public class FlightColumnarPartitionReader implements PartitionReader<ColumnarBatch> {
private final FlightClientFactory clientFactory;;
private final FlightClient client;
private final CredentialCallOption callOption;
private final FlightStream stream;
private final FlightClientFactory clientFactory;
private final FlightClient client;
private final FlightStream stream;

public FlightColumnarPartitionReader(FlightClientOptions clientOptions, FlightPartition partition) {
// TODO - Should we handle multiple locations?
clientFactory = new FlightClientFactory(partition.getEndpoint().get().getLocations().get(0), clientOptions);
client = clientFactory.apply();
callOption = clientFactory.getCallOption();
stream = client.getStream(partition.getEndpoint().get().getTicket(), callOption);
}
public FlightColumnarPartitionReader(FlightClientOptions clientOptions, FlightPartition partition) {
// TODO - Should we handle multiple locations?
clientFactory = new FlightClientFactory(partition.getEndpoint().get().getLocations().get(0), clientOptions);
client = clientFactory.apply();
CredentialCallOption callOption = clientFactory.getCallOption();
stream = client.getStream(partition.getEndpoint().get().getTicket(), callOption);
}

// This is written this way because the Spark interface iterates in a different way.
// E.g., .next() -> .get() vs. .hasNext() -> .next()
@Override
public boolean next() throws IOException {
try {
return stream.next();
} catch (RuntimeException e) {
throw new IOException(e);
}
// This is written this way because the Spark interface iterates in a different way.
// E.g., .next() -> .get() vs. .hasNext() -> .next()
@Override
public boolean next() throws IOException {
try {
return stream.next();
} catch (RuntimeException e) {
throw new IOException(e);
}
}

@Override
public ColumnarBatch get() {
ColumnarBatch batch = new ColumnarBatch(
stream.getRoot().getFieldVectors()
.stream()
.map(FlightArrowColumnVector::new)
.toArray(ColumnVector[]::new)
);
batch.setNumRows(stream.getRoot().getRowCount());
return batch;
}
@Override
public ColumnarBatch get() {
ColumnarBatch batch = new ColumnarBatch(
stream.getRoot().getFieldVectors()
.stream()
.map(FlightArrowColumnVector::new)
.toArray(ColumnVector[]::new)
);
batch.setNumRows(stream.getRoot().getRowCount());
return batch;
}

@Override
public void close() throws IOException {
try {
AutoCloseables.close(stream, client, clientFactory);
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public void close() throws IOException {
try {
AutoCloseables.close(stream, client, clientFactory);
} catch (Exception e) {
throw new IOException(e);
}
}
}

0 comments on commit bb2ed00

Please sign in to comment.