diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..8ba2661 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,27 @@ +# +# Copyright (C) 2019 The flight-spark-source Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +root = true + +[*] +end_of_line = lf +insert_final_newline = true +indent_size = 2 +indent_style = space + +[*.js] +trim_trailing_whitespace = true diff --git a/.github/workflows/maven-build.yml b/.github/workflows/maven-build.yml new file mode 100644 index 0000000..359c433 --- /dev/null +++ b/.github/workflows/maven-build.yml @@ -0,0 +1,23 @@ +# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-maven + +name: Java CI with Maven + +on: + push: + branches: [ "master" ] + pull_request: + branches: [ "master" ] + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up JDK 11 + uses: actions/setup-java@v3 + with: + java-version: '11' + distribution: 'temurin' + cache: maven + - name: Build with Maven + run: mvn -B -V verify -Dmaven.javadoc.skip=true --file pom.xml diff --git a/.gitignore b/.gitignore index 25f17f2..2f15439 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ .checkstyle .classpath .idea/ +.vscode/ .project .mvn/wrapper/maven-wrapper.jar .profiler diff --git a/.mvn/extensions.xml b/.mvn/extensions.xml new file mode 100644 index 0000000..05ed801 --- /dev/null +++ b/.mvn/extensions.xml @@ -0,0 +1,30 @@ + + + + + fr.jcgay.maven + maven-profiler + 2.6 + + + fr.jcgay.maven + maven-notifier + 1.10.1 + + diff --git a/.mvn/wrapper/MavenWrapperDownloader.java b/.mvn/wrapper/MavenWrapperDownloader.java new file mode 100644 index 0000000..eb53041 --- /dev/null +++ b/.mvn/wrapper/MavenWrapperDownloader.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import java.net.*; +import java.io.*; +import java.nio.channels.*; +import java.util.Properties; + +public class MavenWrapperDownloader { + + private static final String WRAPPER_VERSION = "0.5.6"; + /** + * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. + */ + private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" + + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; + + /** + * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to + * use instead of the default one. + */ + private static final String MAVEN_WRAPPER_PROPERTIES_PATH = + ".mvn/wrapper/maven-wrapper.properties"; + + /** + * Path where the maven-wrapper.jar will be saved to. + */ + private static final String MAVEN_WRAPPER_JAR_PATH = + ".mvn/wrapper/maven-wrapper.jar"; + + /** + * Name of the property which should be used to override the default download url for the wrapper. + */ + private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; + + public static void main(String args[]) { + System.out.println("- Downloader started"); + File baseDirectory = new File(args[0]); + System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); + + // If the maven-wrapper.properties exists, read it and check if it contains a custom + // wrapperUrl parameter. + File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); + String url = DEFAULT_DOWNLOAD_URL; + if(mavenWrapperPropertyFile.exists()) { + FileInputStream mavenWrapperPropertyFileInputStream = null; + try { + mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); + Properties mavenWrapperProperties = new Properties(); + mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); + url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); + } catch (IOException e) { + System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); + } finally { + try { + if(mavenWrapperPropertyFileInputStream != null) { + mavenWrapperPropertyFileInputStream.close(); + } + } catch (IOException e) { + // Ignore ... + } + } + } + System.out.println("- Downloading from: " + url); + + File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); + if(!outputFile.getParentFile().exists()) { + if(!outputFile.getParentFile().mkdirs()) { + System.out.println( + "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); + } + } + System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); + try { + downloadFileFromURL(url, outputFile); + System.out.println("Done"); + System.exit(0); + } catch (Throwable e) { + System.out.println("- Error downloading"); + e.printStackTrace(); + System.exit(1); + } + } + + private static void downloadFileFromURL(String urlString, File destination) throws Exception { + if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) { + String username = System.getenv("MVNW_USERNAME"); + char[] password = System.getenv("MVNW_PASSWORD").toCharArray(); + Authenticator.setDefault(new Authenticator() { + @Override + protected PasswordAuthentication getPasswordAuthentication() { + return new PasswordAuthentication(username, password); + } + }); + } + URL website = new URL(urlString); + ReadableByteChannel rbc; + rbc = Channels.newChannel(website.openStream()); + FileOutputStream fos = new FileOutputStream(destination); + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + fos.close(); + rbc.close(); + } + +} diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties new file mode 100644 index 0000000..963a19b --- /dev/null +++ b/.mvn/wrapper/maven-wrapper.properties @@ -0,0 +1,19 @@ +# +# Copyright (C) 2019 The flight-spark-source Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip +wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar + diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..fad37da --- /dev/null +++ b/.travis.yml @@ -0,0 +1,8 @@ +dist: focal +language: java +jdk: openjdk11 +cache: + directories: + - $HOME/.m2 +install: mvn install -DskipTests=true -Dmaven.javadoc.skip=true -B -V +script: mvn test -B diff --git a/AUTHORS b/AUTHORS new file mode 100644 index 0000000..c8a28d3 --- /dev/null +++ b/AUTHORS @@ -0,0 +1,9 @@ +# This is the list of flight-spark-source's significant contributors. +# +# This does not necessarily list everyone who has contributed code, +# especially since many employees of one corporation may be contributing. +# To see the full list of contributors, see the revision history in +# source control. +Ryan Murray +Kyle Brooks +Doron Chen \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100755 index 0000000..0cfd14b --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2017 - Dremio Corporation + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/NOTICE b/NOTICE new file mode 100755 index 0000000..d2b0ae2 --- /dev/null +++ b/NOTICE @@ -0,0 +1,6 @@ +Dremio +Copyright 2015-2017 Dremio Corporation + +This product includes software developed at +The Apache Software Foundation (http://www.apache.org/). + diff --git a/README.md b/README.md new file mode 100644 index 0000000..7910ffd --- /dev/null +++ b/README.md @@ -0,0 +1,22 @@ +Spark source for Flight enabled endpoints +========================================= + +[![Build Status](https://travis-ci.org/rymurr/flight-spark-source.svg?branch=master)](https://travis-ci.org/rymurr/flight-spark-source) + +This uses the new [Source V2 Interface](https://databricks.com/session/apache-spark-data-source-v2) to connect to +[Apache Arrow Flight](https://www.dremio.com/understanding-apache-arrow-flight/) endpoints. It is a prototype of what is +possible with Arrow Flight. The prototype has achieved 50x speed up compared to serial jdbc driver and scales with the +number of Flight endpoints/spark executors being run in parallel. + +It currently supports: + +* Columnar Batch reading +* Reading in parallel many flight endpoints as Spark partitions +* filter and project pushdown + +It currently lacks: + +* support for all Spark/Arrow data types and filters +* write interface to use `DoPut` to write Spark dataframes back to an Arrow Flight endpoint +* leverage the transactional capabilities of the Spark Source V2 interface +* publish benchmark test diff --git a/mvnw b/mvnw new file mode 100755 index 0000000..e128fd7 --- /dev/null +++ b/mvnw @@ -0,0 +1,307 @@ +#!/bin/sh +# +# Copyright (C) 2019 The flight-spark-source Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# ---------------------------------------------------------------------------- +# Maven Start Up Batch script +# +# Required ENV vars: +# ------------------ +# JAVA_HOME - location of a JDK home dir +# +# Optional ENV vars +# ----------------- +# M2_HOME - location of maven2's installed home dir +# MAVEN_OPTS - parameters passed to the Java VM when running Maven +# e.g. to debug Maven itself, use +# set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +# MAVEN_SKIP_RC - flag to disable loading of mavenrc files +# ---------------------------------------------------------------------------- + +if [ -z "$MAVEN_SKIP_RC" ] ; then + + if [ -f /etc/mavenrc ] ; then + . /etc/mavenrc + fi + + if [ -f "$HOME/.mavenrc" ] ; then + . "$HOME/.mavenrc" + fi + +fi + +# OS specific support. $var _must_ be set to either true or false. +cygwin=false; +darwin=false; +mingw=false +case "`uname`" in + CYGWIN*) cygwin=true ;; + MINGW*) mingw=true;; + Darwin*) darwin=true + # Use /usr/libexec/java_home if available, otherwise fall back to /Library/Java/Home + # See https://developer.apple.com/library/mac/qa/qa1170/_index.html + if [ -z "$JAVA_HOME" ]; then + if [ -x "/usr/libexec/java_home" ]; then + export JAVA_HOME="`/usr/libexec/java_home`" + else + export JAVA_HOME="/Library/Java/Home" + fi + fi + ;; +esac + +if [ -z "$JAVA_HOME" ] ; then + if [ -r /etc/gentoo-release ] ; then + JAVA_HOME=`java-config --jre-home` + fi +fi + +if [ -z "$M2_HOME" ] ; then + ## resolve links - $0 may be a link to maven's home + PRG="$0" + + # need this for relative symlinks + while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG="`dirname "$PRG"`/$link" + fi + done + + saveddir=`pwd` + + M2_HOME=`dirname "$PRG"`/.. + + # make it fully qualified + M2_HOME=`cd "$M2_HOME" && pwd` + + cd "$saveddir" + # echo Using m2 at $M2_HOME +fi + +# For Cygwin, ensure paths are in UNIX format before anything is touched +if $cygwin ; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --unix "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --unix "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --unix "$CLASSPATH"` +fi + +# For Mingw, ensure paths are in UNIX format before anything is touched +if $mingw ; then + [ -n "$M2_HOME" ] && + M2_HOME="`(cd "$M2_HOME"; pwd)`" + [ -n "$JAVA_HOME" ] && + JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" +fi + +if [ -z "$JAVA_HOME" ]; then + javaExecutable="`which javac`" + if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then + # readlink(1) is not available as standard on Solaris 10. + readLink=`which readlink` + if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then + if $darwin ; then + javaHome="`dirname \"$javaExecutable\"`" + javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" + else + javaExecutable="`readlink -f \"$javaExecutable\"`" + fi + javaHome="`dirname \"$javaExecutable\"`" + javaHome=`expr "$javaHome" : '\(.*\)/bin'` + JAVA_HOME="$javaHome" + export JAVA_HOME + fi + fi +fi + +if [ -z "$JAVACMD" ] ; then + if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + else + JAVACMD="`which java`" + fi +fi + +if [ ! -x "$JAVACMD" ] ; then + echo "Error: JAVA_HOME is not defined correctly." >&2 + echo " We cannot execute $JAVACMD" >&2 + exit 1 +fi + +if [ -z "$JAVA_HOME" ] ; then + echo "Warning: JAVA_HOME environment variable is not set." +fi + +CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher + +# traverses directory structure from process work directory to filesystem root +# first directory with .mvn subdirectory is considered project base directory +find_maven_basedir() { + + if [ -z "$1" ] + then + echo "Path not specified to find_maven_basedir" + return 1 + fi + + basedir="$1" + wdir="$1" + while [ "$wdir" != '/' ] ; do + if [ -d "$wdir"/.mvn ] ; then + basedir=$wdir + break + fi + # workaround for JBEAP-8937 (on Solaris 10/Sparc) + if [ -d "${wdir}" ]; then + wdir=`cd "$wdir/.."; pwd` + fi + # end of workaround + done + echo "${basedir}" +} + +# concatenates all lines of a file +concat_lines() { + if [ -f "$1" ]; then + echo "$(tr -s '\n' ' ' < "$1")" + fi +} + +BASE_DIR=`find_maven_basedir "$(pwd)"` +if [ -z "$BASE_DIR" ]; then + exit 1; +fi + +########################################################################################## +# Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +# This allows using the maven wrapper in projects that prohibit checking in binary data. +########################################################################################## +if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found .mvn/wrapper/maven-wrapper.jar" + fi +else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." + fi + if [ -n "$MVNW_REPOURL" ]; then + jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + else + jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + fi + while IFS="=" read key value; do + case "$key" in (wrapperUrl) jarUrl="$value"; break ;; + esac + done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" + if [ "$MVNW_VERBOSE" = true ]; then + echo "Downloading from: $jarUrl" + fi + wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" + if $cygwin; then + wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` + fi + + if command -v wget > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found wget ... using wget" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + wget "$jarUrl" -O "$wrapperJarPath" + else + wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" + fi + elif command -v curl > /dev/null; then + if [ "$MVNW_VERBOSE" = true ]; then + echo "Found curl ... using curl" + fi + if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then + curl -o "$wrapperJarPath" "$jarUrl" -f + else + curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f + fi + + else + if [ "$MVNW_VERBOSE" = true ]; then + echo "Falling back to using Java to download" + fi + javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" + # For Cygwin, switch paths to Windows format before running javac + if $cygwin; then + javaClass=`cygpath --path --windows "$javaClass"` + fi + if [ -e "$javaClass" ]; then + if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Compiling MavenWrapperDownloader.java ..." + fi + # Compiling the Java class + ("$JAVA_HOME/bin/javac" "$javaClass") + fi + if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then + # Running the downloader + if [ "$MVNW_VERBOSE" = true ]; then + echo " - Running MavenWrapperDownloader.java ..." + fi + ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") + fi + fi + fi +fi +########################################################################################## +# End of extension +########################################################################################## + +export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} +if [ "$MVNW_VERBOSE" = true ]; then + echo $MAVEN_PROJECTBASEDIR +fi +MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" + +# For Cygwin, switch paths to Windows format before running java +if $cygwin; then + [ -n "$M2_HOME" ] && + M2_HOME=`cygpath --path --windows "$M2_HOME"` + [ -n "$JAVA_HOME" ] && + JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` + [ -n "$CLASSPATH" ] && + CLASSPATH=`cygpath --path --windows "$CLASSPATH"` + [ -n "$MAVEN_PROJECTBASEDIR" ] && + MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` +fi + +# Provide a "standardized" way to retrieve the CLI args that will +# work with both Windows and non-Windows executions. +MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" +export MAVEN_CMD_LINE_ARGS + +WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +exec "$JAVACMD" \ + $MAVEN_OPTS \ + -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ + "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ + ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" diff --git a/mvnw.cmd b/mvnw.cmd new file mode 100644 index 0000000..7312986 --- /dev/null +++ b/mvnw.cmd @@ -0,0 +1,179 @@ +@REM +@REM Copyright (C) 2019 The flight-spark-source Authors +@REM +@REM Licensed under the Apache License, Version 2.0 (the "License"); +@REM you may not use this file except in compliance with the License. +@REM You may obtain a copy of the License at +@REM +@REM http://www.apache.org/licenses/LICENSE-2.0 +@REM +@REM Unless required by applicable law or agreed to in writing, software +@REM distributed under the License is distributed on an "AS IS" BASIS, +@REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@REM See the License for the specific language governing permissions and +@REM limitations under the License. +@REM + +@REM ---------------------------------------------------------------------------- +@REM Maven Start Up Batch script +@REM +@REM Required ENV vars: +@REM JAVA_HOME - location of a JDK home dir +@REM +@REM Optional ENV vars +@REM M2_HOME - location of maven2's installed home dir +@REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands +@REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending +@REM MAVEN_OPTS - parameters passed to the Java VM when running Maven +@REM e.g. to debug Maven itself, use +@REM set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 +@REM MAVEN_SKIP_RC - flag to disable loading of mavenrc files +@REM ---------------------------------------------------------------------------- + +@REM Begin all REM lines with '@' in case MAVEN_BATCH_ECHO is 'on' +@echo off +@REM set title of command window +title %0 +@REM enable echoing by setting MAVEN_BATCH_ECHO to 'on' +@if "%MAVEN_BATCH_ECHO%" == "on" echo %MAVEN_BATCH_ECHO% + +@REM set %HOME% to equivalent of $HOME +if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") + +@REM Execute a user defined script before this one +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre +@REM check for pre script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" +if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" +:skipRcPre + +@setlocal + +set ERROR_CODE=0 + +@REM To isolate internal variables from possible post scripts, we use another setlocal +@setlocal + +@REM ==== START VALIDATION ==== +if not "%JAVA_HOME%" == "" goto OkJHome + +echo. +echo Error: JAVA_HOME not found in your environment. >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +:OkJHome +if exist "%JAVA_HOME%\bin\java.exe" goto init + +echo. +echo Error: JAVA_HOME is set to an invalid directory. >&2 +echo JAVA_HOME = "%JAVA_HOME%" >&2 +echo Please set the JAVA_HOME variable in your environment to match the >&2 +echo location of your Java installation. >&2 +echo. +goto error + +@REM ==== END VALIDATION ==== + +:init + +@REM Find the project base dir, i.e. the directory that contains the folder ".mvn". +@REM Fallback to current working directory if not found. + +set MAVEN_PROJECTBASEDIR=%MAVEN_BASEDIR% +IF NOT "%MAVEN_PROJECTBASEDIR%"=="" goto endDetectBaseDir + +set EXEC_DIR=%CD% +set WDIR=%EXEC_DIR% +:findBaseDir +IF EXIST "%WDIR%"\.mvn goto baseDirFound +cd .. +IF "%WDIR%"=="%CD%" goto baseDirNotFound +set WDIR=%CD% +goto findBaseDir + +:baseDirFound +set MAVEN_PROJECTBASEDIR=%WDIR% +cd "%EXEC_DIR%" +goto endDetectBaseDir + +:baseDirNotFound +set MAVEN_PROJECTBASEDIR=%EXEC_DIR% +cd "%EXEC_DIR%" + +:endDetectBaseDir + +IF NOT EXIST "%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config" goto endReadAdditionalConfig + +@setlocal EnableExtensions EnableDelayedExpansion +for /F "usebackq delims=" %%a in ("%MAVEN_PROJECTBASEDIR%\.mvn\jvm.config") do set JVM_CONFIG_MAVEN_PROPS=!JVM_CONFIG_MAVEN_PROPS! %%a +@endlocal & set JVM_CONFIG_MAVEN_PROPS=%JVM_CONFIG_MAVEN_PROPS% + +:endReadAdditionalConfig + +SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" +set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" +set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain + +set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + +FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( + IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B +) + +@REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central +@REM This allows using the maven wrapper in projects that prohibit checking in binary data. +if exist %WRAPPER_JAR% ( + if "%MVNW_VERBOSE%" == "true" ( + echo Found %WRAPPER_JAR% + ) +) else ( + if not "%MVNW_REPOURL%" == "" ( + SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" + ) + if "%MVNW_VERBOSE%" == "true" ( + echo Couldn't find %WRAPPER_JAR%, downloading it ... + echo Downloading from: %DOWNLOAD_URL% + ) + + powershell -Command "&{"^ + "$webclient = new-object System.Net.WebClient;"^ + "if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ + "$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ + "}"^ + "[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ + "}" + if "%MVNW_VERBOSE%" == "true" ( + echo Finished downloading %WRAPPER_JAR% + ) +) +@REM End of extension + +@REM Provide a "standardized" way to retrieve the CLI args that will +@REM work with both Windows and non-Windows executions. +set MAVEN_CMD_LINE_ARGS=%* + +%MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* +if ERRORLEVEL 1 goto error +goto end + +:error +set ERROR_CODE=1 + +:end +@endlocal & set ERROR_CODE=%ERROR_CODE% + +if not "%MAVEN_SKIP_RC%" == "" goto skipRcPost +@REM check for post script, once with legacy .bat ending and once with .cmd ending +if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" +if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" +:skipRcPost + +@REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' +if "%MAVEN_BATCH_PAUSE%" == "on" pause + +if "%MAVEN_TERMINATE_CMD%" == "on" exit %ERROR_CODE% + +exit /B %ERROR_CODE% diff --git a/pom.xml b/pom.xml index 0a8f136..7ac4627 100644 --- a/pom.xml +++ b/pom.xml @@ -1,471 +1,559 @@ + - 4.0.0 + 4.0.0 + + org.apache.arrow.flight.spark + flight-spark-source + 1.0-SNAPSHOT - com.dremio - flight-spark-source - 1.0-SNAPSHOT + + + 2.12.14 + 7.0.0 + 3.2.1 + 1.7.25 + UTF-8 + UTF-8 + + + + + kr.motd.maven + os-maven-plugin + 1.7.0 + + - - 3.1.10-201904162146020182-adf690d - 0.14.0-SNAPSHOT - 2.3.3 - 1.7.25 - - - - - src/main/resources - true - - + + + src/main/resources + true + + - - - org.apache.maven.plugins - maven-checkstyle-plugin - - src/main/checkstyle/checkstyle-config.xml - src/main/checkstyle/checkstyle-suppressions.xml - - - - com.mycila - license-maven-plugin - 3.0 - - - Copyright (C) ${project.inceptionYear} ${owner} + + + org.apache.maven.plugins + maven-checkstyle-plugin + 3.1.0 + + ${project.basedir}/src/main/checkstyle/checkstyle-config.xml + ${project.basedir}/src/main/checkstyle/checkstyle-suppressions.xml + + + + com.mycila + license-maven-plugin + 3.0 + + +Copyright (C) ${project.inceptionYear} ${owner} - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 +http://www.apache.org/licenses/LICENSE-2.0 - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - - Ryan Murray - 2019 - - - 2019 - - true - false +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + + The ${project.artifactId} Authors + 2019 + + + 2019 + + true + false - - src/** - * - **/.mvn/** - - - - **/*~ - **/#*# - **/.#* - **/%*% - **/._* - **/.repository/** - **/CVS - **/CVS/** - **/.cvsignore - **/RCS - **/RCS/** - **/SCCS - **/SCCS/** - **/vssver.scc - **/.svn - **/.svn/** - **/.arch-ids - **/.arch-ids/** - **/.bzr - **/.bzr/** - **/.MySCMServerInfo - **/.DS_Store - **/.metadata - **/.metadata/** - **/.hg - **/.hg/** - **/.hgignore - **/.git - **/.git/** - **/.gitignore - **/.gitmodules - **/BitKeeper - **/BitKeeper/** - **/ChangeSet - **/ChangeSet/** - **/_darcs - **/_darcs/** - **/.darcsrepo - **/.darcsrepo/** - **/-darcs-backup* - **/.darcs-temp-mail - - **/test-output/** - **/release.properties - **/dependency-reduced-pom.xml - **/release-pom.xml - **/pom.xml.releaseBackup - **/cobertura.ser - **/.clover/** - **/.classpath - **/.project - **/.settings/** - **/*.iml - **/*.ipr - **/*.iws - .idea/** - **/nb-configuration.xml - **/MANIFEST.MF - **/*.jpg - **/*.png - **/*.gif - **/*.ico - **/*.bmp - **/*.tiff - **/*.tif - **/*.cr2 - **/*.xcf - **/*.class - **/*.exe - **/*.dll - **/*.so - **/*.md5 - **/*.sha1 - **/*.jar - **/*.zip - **/*.rar - **/*.tar - **/*.tar.gz - **/*.tar.bz2 - **/*.gz - **/*.xls - **/META-INF/services/** - **/*.md - **/*.xls - **/*.doc - **/*.odt - **/*.ods - **/*.pdf - **/.travis.yml - **/*.swf - **/*.json - - **/*.eot - **/*.ttf - **/*.woff - **/*.xlsx - **/*.docx - **/*.ppt - **/*.pptx - **/*.patch - + + src/** + * + **/.mvn/** + + + + **/*~ + **/#*# + **/.#* + **/%*% + **/._* + **/.repository/** + **/CVS + **/CVS/** + **/.cvsignore + **/RCS + **/RCS/** + **/SCCS + **/SCCS/** + **/vssver.scc + **/.svn + **/.svn/** + **/.arch-ids + **/.arch-ids/** + **/.bzr + **/.bzr/** + **/.MySCMServerInfo + **/.DS_Store + **/.metadata + **/.metadata/** + **/.hg + **/.hg/** + **/.hgignore + **/.git + **/.git/** + **/.gitignore + **/.gitmodules + **/BitKeeper + **/BitKeeper/** + **/ChangeSet + **/ChangeSet/** + **/_darcs + **/_darcs/** + **/.darcsrepo + **/.darcsrepo/** + **/-darcs-backup* + **/.darcs-temp-mail + + **/test-output/** + **/release.properties + **/dependency-reduced-pom.xml + **/release-pom.xml + **/pom.xml.releaseBackup + **/cobertura.ser + **/.clover/** + **/.classpath + **/.project + **/.settings/** + **/*.iml + **/*.ipr + **/*.iws + .idea/** + **/nb-configuration.xml + **/MANIFEST.MF + **/*.jpg + **/*.png + **/*.gif + **/*.ico + **/*.bmp + **/*.tiff + **/*.tif + **/*.cr2 + **/*.xcf + **/*.class + **/*.exe + **/*.dll + **/*.so + **/*.md5 + **/*.sha1 + **/*.jar + **/*.zip + **/*.rar + **/*.tar + **/*.tar.gz + **/*.tar.bz2 + **/*.gz + **/*.xls + **/META-INF/services/** + **/*.md + **/*.xls + **/*.doc + **/*.odt + **/*.ods + **/*.pdf + **/.travis.yml + **/*.swf + **/*.json + + **/*.eot + **/*.ttf + **/*.woff + **/*.xlsx + **/*.docx + **/*.ppt + **/*.pptx + **/*.patch + - - **/*.log - **/*.txt - **/*.csv - **/*.tsv - **/*.parquet - **/*.jks - **/*.nonformat - **/*.gzip - **/*.k - **/*.q - **/*.dat + + **/*.log + **/*.txt + **/*.csv + **/*.tsv + **/*.parquet + **/*.jks + **/*.nonformat + **/*.gzip + **/*.k + **/*.q + **/*.dat - - **/Jenkinsfile - **/LICENSE - **/NOTICE - **/postinstall - **/.babelrc - **/.checkstyle - **/.eslintcache - **/.eslintignore - **/.eslintrc - **/git.properties - **/pom.xml.versionsBackup - **/q - **/c.java + + **/Jenkinsfile + **/LICENSE + **/NOTICE + **/AUTHORS + **/postinstall + **/.babelrc + **/.checkstyle + **/.eslintcache + **/.eslintignore + **/.eslintrc + **/git.properties + **/pom.xml.versionsBackup + **/q + **/c.java - - **/node_modules/** - **/.idea/** - **/db/** - - - SLASHSTAR_STYLE - DOUBLEDASHES_STYLE - DOUBLESLASH_STYLE - DOUBLESLASH_STYLE - DOUBLESLASH_STYLE - SLASHSTAR_STYLE - SLASHSTAR_STYLE - SLASHSTAR_STYLE - SLASHSTAR_STYLE - SCRIPT_STYLE - SCRIPT_STYLE - SCRIPT_STYLE - DOUBLEDASHES_STYLE - SCRIPT_STYLE - SLASHSTAR_STYLE - SCRIPT_STYLE - SCRIPT_STYLE - SCRIPT_STYLE - XML_STYLE - SCRIPT_STYLE - - - - - default-cli - - format - - - - verify-license-headers - verify - - check - - - - - - maven-enforcer-plugin - - - avoid_bad_dependencies - verify - - enforce - - - - - - commons-logging - javax.servlet:servlet-api - org.mortbay.jetty:servlet-api - org.mortbay.jetty:servlet-api-2.5 - log4j:log4j - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - 8 - 8 - - - - - - - - - - - - - - - - - - - - - - - - - org.apache.spark - spark-core_2.11 - ${spark.version} - - - org.slf4j - slf4j-log4j12 - - - commons-logging - commons-logging - - - log4j - log4j - - - javax.servlet - servlet-api - - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-core-asl - - - com.fasterxml.jackson.core - jackson-databind - - - - - org.apache.spark - spark-sql_2.11 - ${spark.version} - - - org.slf4j - slf4j-log4j12 - - - commons-logging - commons-logging - - - log4j - log4j - - - javax.servlet - servlet-api - - - org.codehaus.jackson - jackson-mapper-asl - - - org.codehaus.jackson - jackson-core-asl - - - com.fasterxml.jackson.core - jackson-databind - - - org.apache.arrow - arrow-vector - - - - - org.apache.arrow - arrow-flight - ${arrow.version} - shaded - - - com.dremio.sabot - dremio-sabot-flight - ${dremio.version} - test - - - com.dremio.sabot - dremio-sabot-kernel - ${dremio.version} - - - org.slf4j - slf4j-log4j12 - - - commons-logging - commons-logging - - - log4j - log4j - - - javax.servlet - servlet-api - - - - - - - - - - - - - org.slf4j - jul-to-slf4j - ${dep.slf4j.version} - test - + + **/node_modules/** + **/.idea/** + **/db/** + **/*.ipynb + + + SLASHSTAR_STYLE + DOUBLEDASHES_STYLE + DOUBLESLASH_STYLE + DOUBLESLASH_STYLE + DOUBLESLASH_STYLE + SLASHSTAR_STYLE + SLASHSTAR_STYLE + SLASHSTAR_STYLE + SLASHSTAR_STYLE + SCRIPT_STYLE + SCRIPT_STYLE + SCRIPT_STYLE + DOUBLEDASHES_STYLE + SCRIPT_STYLE + SLASHSTAR_STYLE + SCRIPT_STYLE + SCRIPT_STYLE + SCRIPT_STYLE + XML_STYLE + SCRIPT_STYLE + + + + + default-cli + + format + + + + verify-license-headers + verify + + check + + + + + + maven-enforcer-plugin + 1.4.1 + + + avoid_bad_dependencies + verify + + enforce + + + + + + commons-logging + javax.servlet:servlet-api + org.mortbay.jetty:servlet-api + org.mortbay.jetty:servlet-api-2.5 + log4j:log4j + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + 11 + + + + net.alchim31.maven + scala-maven-plugin + 4.6.1 + + false + + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.3.0 + + + package + + shade + + + + + org.apache.arrow:flight-core + org.apache.arrow:flight-grpc + org.apache.arrow:arrow-vector + org.apache.arrow:arrow-format + org.apache.arrow:arrow-memory-core + org.apache.arrow:arrow-memory-netty + com.google.flatbuffers:flatbuffers-java + io.grpc:* + io.netty:* + io.opencensus:* + com.google.code.gson:gson + com.google.code.findbugs:jsr305 + com.google.code.errorprone:error_prone_annotations + com.google.api.grpc:proto-google-common-protos + com.google.protobuf:protobuf-java + com.google.guava:guava + com.google.guava:failureaccess + io.perfmark:perfmark-api + + + io.netty:netty-transport-native-unix-common + io.netty:netty-transport-native-epoll + + + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + com.google.protobuf + cdap.com.google.protobuf + + + org.apache.arrow + cdap.org.apache.arrow + + + io.grpc + cdap.io.grpc + + + io.netty + cdap.io.netty + + + com.google + cdap.com.google + + + + META-INF.native.libnetty_ + META-INF.native.libcdap_netty_ + + + META-INF.native.netty_ + META-INF.native.cdap_netty_ + + + + + + true + shaded + + + + + + + + + org.apache.spark + spark-core_2.12 + ${spark.version} + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + log4j + log4j + + + org.apache.arrow + arrow-format + + + org.apache.arrow + arrow-vector + + + + + org.apache.spark + spark-sql_2.12 + ${spark.version} + + + org.slf4j + slf4j-log4j12 + + + commons-logging + commons-logging + + + log4j + log4j + + + javax.servlet + servlet-api + + + org.apache.arrow + arrow-format + + + org.apache.arrow + arrow-vector + + + + + + com.fasterxml.jackson.core + jackson-core + 2.12.6 + + + com.fasterxml.jackson.core + jackson-databind + 2.12.6.1 + + + org.apache.arrow + flight-core + ${arrow.version} + + + org.apache.arrow + flight-grpc + ${arrow.version} + + + org.scala-lang + scala-library + ${scala.version} + - - org.slf4j - jcl-over-slf4j - ${dep.slf4j.version} - test - + + org.slf4j + jul-to-slf4j + ${dep.slf4j.version} + test + + + org.apache.arrow + flight-core + ${arrow.version} + tests + test + - - org.slf4j - log4j-over-slf4j - ${dep.slf4j.version} - test - - - ch.qos.logback - logback-classic - 1.2.3 - test - - - de.huxhorn.lilith - de.huxhorn.lilith.logback.appender.multiplex-classic - 8.2.0 - test - - - junit - junit - 4.11 - test - - - com.dremio.sabot - dremio-sabot-kernel - ${dremio.version} - tests - test - - - com.dremio - dremio-common - ${dremio.version} - tests - test - + + org.slf4j + jcl-over-slf4j + ${dep.slf4j.version} + test + - + + org.slf4j + log4j-over-slf4j + ${dep.slf4j.version} + test + + + ch.qos.logback + logback-classic + 1.2.3 + test + + + de.huxhorn.lilith + de.huxhorn.lilith.logback.appender.multiplex-classic + 8.2.0 + test + + + junit + junit + 4.13.1 + test + + - \ No newline at end of file + diff --git a/src/main/java/com/dremio/spark/DefaultSource.java b/src/main/java/com/dremio/spark/DefaultSource.java deleted file mode 100644 index 62770b2..0000000 --- a/src/main/java/com/dremio/spark/DefaultSource.java +++ /dev/null @@ -1,14 +0,0 @@ -package com.dremio.spark; - -import org.apache.arrow.memory.RootAllocator; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.DataSourceV2; -import org.apache.spark.sql.sources.v2.ReadSupport; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; - -public class DefaultSource implements DataSourceV2, ReadSupport { - private final RootAllocator rootAllocator = new RootAllocator(); - public DataSourceReader createReader(DataSourceOptions dataSourceOptions) { - return new DremioDataSourceReader(dataSourceOptions, rootAllocator.newChildAllocator(dataSourceOptions.toString(), 0, rootAllocator.getLimit())); - } -} diff --git a/src/main/java/com/dremio/spark/DremioDataSourceReader.java b/src/main/java/com/dremio/spark/DremioDataSourceReader.java deleted file mode 100644 index 16b1c75..0000000 --- a/src/main/java/com/dremio/spark/DremioDataSourceReader.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.dremio.spark; - -import org.apache.arrow.flight.FlightClient; -import org.apache.arrow.flight.FlightDescriptor; -import org.apache.arrow.flight.FlightInfo; -import org.apache.arrow.flight.FlightStream; -import org.apache.arrow.flight.Location; -import org.apache.arrow.memory.BufferAllocator; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.sources.v2.DataSourceOptions; -import org.apache.spark.sql.sources.v2.reader.DataReaderFactory; -import org.apache.spark.sql.sources.v2.reader.DataSourceReader; -import org.apache.spark.sql.types.StructType; - -import java.util.List; - -public class DremioDataSourceReader implements DataSourceReader { - private DataSourceOptions dataSourceOptions; - - public DremioDataSourceReader(DataSourceOptions dataSourceOptions, BufferAllocator allocator) { - this.dataSourceOptions = dataSourceOptions; - FlightClient c = new FlightClient(allocator, new Location(dataSourceOptions.get("host").orElse("localhost"), dataSourceOptions.getInt("port", 43430))); - c.authenticateBasic(dataSourceOptions.get("username").orElse("anonymous"), dataSourceOptions.get("password").orElse("")); - FlightInfo info = c.getInfo(FlightDescriptor.path("sys", "options")); - -// FlightStream s = c.getStream(info.getEndpoints().get(0).getTicket()); - } - - public StructType readSchema() { - return null; - } - - public List> createDataReaderFactories() { - return null; - } -} diff --git a/src/main/java/com/dremio/spark/FlightSparkContext.java b/src/main/java/com/dremio/spark/FlightSparkContext.java deleted file mode 100644 index c51e094..0000000 --- a/src/main/java/com/dremio/spark/FlightSparkContext.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.dremio.spark; - -import org.apache.spark.SparkConf; -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.sql.DataFrameReader; -import org.apache.spark.sql.SQLContext; - -public class FlightSparkContext { - - private final SQLContext sqlContext; - private SparkConf conf; - private final DataFrameReader reader; - - private FlightSparkContext(SparkContext sc, SparkConf conf) { - sqlContext = SQLContext.getOrCreate(sc); - this.conf = conf; - reader = sqlContext.read().format("com.dremio.spark"); - } - - public static FlightSparkContext flightContext(JavaSparkContext sc) { - return new FlightSparkContext(sc.sc(), sc.getConf()); - } - - public void read(String s) { - reader.option("port", Integer.parseInt(conf.get("spark.flight.endpoint.port"))) - .option("host", conf.get("spark.flight.endpoint.host")) - .option("username", conf.get("spark.flight.username")) - .option("password", conf.get("spark.flight.password")) - .load(s); - } -} diff --git a/src/main/java/org/apache/arrow/flight/spark/DefaultSource.java b/src/main/java/org/apache/arrow/flight/spark/DefaultSource.java new file mode 100644 index 0000000..4e3ace9 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/DefaultSource.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.flight.Location; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.sources.DataSourceRegister; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.broadcast.Broadcast; + +public class DefaultSource implements TableProvider, DataSourceRegister { + private SparkSession spark; + + private SparkSession getSparkSession() { + if (spark == null) { + spark = SparkSession.getActiveSession().get(); + } + return spark; + } + + private FlightTable makeTable(CaseInsensitiveStringMap options) { + String uri = options.getOrDefault("uri", "grpc://localhost:47470"); + Location location; + try { + location = new Location(uri); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + String sql = options.getOrDefault("path", ""); + String username = options.getOrDefault("username", ""); + String password = options.getOrDefault("password", ""); + String trustedCertificates = options.getOrDefault("trustedCertificates", ""); + String clientCertificate = options.getOrDefault("clientCertificate", ""); + String clientKey = options.getOrDefault("clientKey", ""); + String token = options.getOrDefault("token", ""); + List middleware = new ArrayList<>(); + if (!token.isEmpty()) { + middleware.add(new TokenClientMiddlewareFactory(token)); + } + + + Broadcast clientOptions = JavaSparkContext.fromSparkContext(getSparkSession().sparkContext()).broadcast( + new FlightClientOptions(username, password, trustedCertificates, clientCertificate, clientKey, middleware) + ); + + return new FlightTable( + String.format("{} Location {} Command {}", shortName(), location.getUri().toString(), sql), + location, + sql, + clientOptions + ); + } + + @Override + public StructType inferSchema(CaseInsensitiveStringMap options) { + return makeTable(options).schema(); + } + + @Override + public String shortName() { + return "flight"; + } + + @Override + public Table getTable(StructType schema, Transform[] partitioning, Map options) { + return makeTable(new CaseInsensitiveStringMap(options)); + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightArrowColumnVector.java b/src/main/java/org/apache/arrow/flight/spark/FlightArrowColumnVector.java new file mode 100644 index 0000000..0d2af58 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightArrowColumnVector.java @@ -0,0 +1,575 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DateMilliVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.SmallIntVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.TinyIntVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.holders.NullableVarCharHolder; +import org.apache.arrow.memory.ArrowBuf; +import org.apache.spark.sql.execution.arrow.FlightArrowUtils; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + + +/** + * A column vector backed by Apache Arrow. Currently calendar interval type and map type are not + * supported. This is a copy of ArrowColumnVector with added support for DateMilli and TimestampMilli + */ +public final class FlightArrowColumnVector extends ColumnVector { + + private final ArrowVectorAccessor accessor; + private FlightArrowColumnVector[] childColumns; + + @Override + public boolean hasNull() { + return accessor.getNullCount() > 0; + } + + @Override + public int numNulls() { + return accessor.getNullCount(); + } + + @Override + public void close() { + if (childColumns != null) { + for (int i = 0; i < childColumns.length; i++) { + childColumns[i].close(); + childColumns[i] = null; + } + childColumns = null; + } + accessor.close(); + } + + @Override + public boolean isNullAt(int rowId) { + return accessor.isNullAt(rowId); + } + + @Override + public boolean getBoolean(int rowId) { + return accessor.getBoolean(rowId); + } + + @Override + public byte getByte(int rowId) { + return accessor.getByte(rowId); + } + + @Override + public short getShort(int rowId) { + return accessor.getShort(rowId); + } + + @Override + public int getInt(int rowId) { + return accessor.getInt(rowId); + } + + @Override + public long getLong(int rowId) { + return accessor.getLong(rowId); + } + + @Override + public float getFloat(int rowId) { + return accessor.getFloat(rowId); + } + + @Override + public double getDouble(int rowId) { + return accessor.getDouble(rowId); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + if (isNullAt(rowId)) { + return null; + } + return accessor.getDecimal(rowId, precision, scale); + } + + @Override + public UTF8String getUTF8String(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor.getUTF8String(rowId); + } + + @Override + public byte[] getBinary(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor.getBinary(rowId); + } + + @Override + public ColumnarArray getArray(int rowId) { + if (isNullAt(rowId)) { + return null; + } + return accessor.getArray(rowId); + } + + @Override + public ColumnarMap getMap(int rowId) { + throw new UnsupportedOperationException(); + } + + @Override + public FlightArrowColumnVector getChild(int ordinal) { + return childColumns[ordinal]; + } + + public FlightArrowColumnVector(ValueVector vector) { + super(FlightArrowUtils.fromArrowField(vector.getField())); + + if (vector instanceof BitVector) { + accessor = new BooleanAccessor((BitVector) vector); + } else if (vector instanceof TinyIntVector) { + accessor = new ByteAccessor((TinyIntVector) vector); + } else if (vector instanceof SmallIntVector) { + accessor = new ShortAccessor((SmallIntVector) vector); + } else if (vector instanceof IntVector) { + accessor = new IntAccessor((IntVector) vector); + } else if (vector instanceof BigIntVector) { + accessor = new LongAccessor((BigIntVector) vector); + } else if (vector instanceof Float4Vector) { + accessor = new FloatAccessor((Float4Vector) vector); + } else if (vector instanceof Float8Vector) { + accessor = new DoubleAccessor((Float8Vector) vector); + } else if (vector instanceof DecimalVector) { + accessor = new DecimalAccessor((DecimalVector) vector); + } else if (vector instanceof VarCharVector) { + accessor = new StringAccessor((VarCharVector) vector); + } else if (vector instanceof VarBinaryVector) { + accessor = new BinaryAccessor((VarBinaryVector) vector); + } else if (vector instanceof DateDayVector) { + accessor = new DateAccessor((DateDayVector) vector); + } else if (vector instanceof DateMilliVector) { + accessor = new DateMilliAccessor((DateMilliVector) vector); + } else if (vector instanceof TimeStampMicroVector) { + accessor = new TimestampMicroAccessor((TimeStampMicroVector) vector); + } else if (vector instanceof TimeStampMicroTZVector) { + accessor = new TimestampMicroTZAccessor((TimeStampMicroTZVector) vector); + } else if (vector instanceof TimeStampMilliVector) { + accessor = new TimestampMilliAccessor((TimeStampMilliVector) vector); + } else if (vector instanceof ListVector) { + ListVector listVector = (ListVector) vector; + accessor = new ArrayAccessor(listVector); + } else if (vector instanceof StructVector) { + StructVector structVector = (StructVector) vector; + accessor = new StructAccessor(structVector); + + childColumns = new FlightArrowColumnVector[structVector.size()]; + for (int i = 0; i < childColumns.length; ++i) { + childColumns[i] = new FlightArrowColumnVector(structVector.getVectorById(i)); + } + } else { + System.out.println(vector); + throw new UnsupportedOperationException(); + } + } + + private abstract static class ArrowVectorAccessor { + + private final ValueVector vector; + + ArrowVectorAccessor(ValueVector vector) { + this.vector = vector; + } + + // TODO: should be final after removing ArrayAccessor workaround + boolean isNullAt(int rowId) { + return vector.isNull(rowId); + } + + final int getNullCount() { + return vector.getNullCount(); + } + + final void close() { + vector.close(); + } + + boolean getBoolean(int rowId) { + throw new UnsupportedOperationException(); + } + + byte getByte(int rowId) { + throw new UnsupportedOperationException(); + } + + short getShort(int rowId) { + throw new UnsupportedOperationException(); + } + + int getInt(int rowId) { + throw new UnsupportedOperationException(); + } + + long getLong(int rowId) { + throw new UnsupportedOperationException(); + } + + float getFloat(int rowId) { + throw new UnsupportedOperationException(); + } + + double getDouble(int rowId) { + throw new UnsupportedOperationException(); + } + + Decimal getDecimal(int rowId, int precision, int scale) { + throw new UnsupportedOperationException(); + } + + UTF8String getUTF8String(int rowId) { + throw new UnsupportedOperationException(); + } + + byte[] getBinary(int rowId) { + throw new UnsupportedOperationException(); + } + + ColumnarArray getArray(int rowId) { + throw new UnsupportedOperationException(); + } + } + + private static class BooleanAccessor extends ArrowVectorAccessor { + + private final BitVector accessor; + + BooleanAccessor(BitVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final boolean getBoolean(int rowId) { + return accessor.get(rowId) == 1; + } + } + + private static class ByteAccessor extends ArrowVectorAccessor { + + private final TinyIntVector accessor; + + ByteAccessor(TinyIntVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final byte getByte(int rowId) { + return accessor.get(rowId); + } + } + + private static class ShortAccessor extends ArrowVectorAccessor { + + private final SmallIntVector accessor; + + ShortAccessor(SmallIntVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final short getShort(int rowId) { + return accessor.get(rowId); + } + } + + private static class IntAccessor extends ArrowVectorAccessor { + + private final IntVector accessor; + + IntAccessor(IntVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final int getInt(int rowId) { + return accessor.get(rowId); + } + } + + private static class LongAccessor extends ArrowVectorAccessor { + + private final BigIntVector accessor; + + LongAccessor(BigIntVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final long getLong(int rowId) { + return accessor.get(rowId); + } + } + + private static class FloatAccessor extends ArrowVectorAccessor { + + private final Float4Vector accessor; + + FloatAccessor(Float4Vector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final float getFloat(int rowId) { + return accessor.get(rowId); + } + } + + private static class DoubleAccessor extends ArrowVectorAccessor { + + private final Float8Vector accessor; + + DoubleAccessor(Float8Vector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final double getDouble(int rowId) { + return accessor.get(rowId); + } + } + + private static class DecimalAccessor extends ArrowVectorAccessor { + + private final DecimalVector accessor; + + DecimalAccessor(DecimalVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final Decimal getDecimal(int rowId, int precision, int scale) { + if (isNullAt(rowId)) { + return null; + } + return Decimal.apply(accessor.getObject(rowId), precision, scale); + } + } + + private static class StringAccessor extends ArrowVectorAccessor { + + private final VarCharVector accessor; + private final NullableVarCharHolder stringResult = new NullableVarCharHolder(); + + StringAccessor(VarCharVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final UTF8String getUTF8String(int rowId) { + accessor.get(rowId, stringResult); + if (stringResult.isSet == 0) { + return null; + } else { + return UTF8String.fromAddress(null, + stringResult.buffer.memoryAddress() + stringResult.start, + stringResult.end - stringResult.start); + } + } + } + + private static class BinaryAccessor extends ArrowVectorAccessor { + + private final VarBinaryVector accessor; + + BinaryAccessor(VarBinaryVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final byte[] getBinary(int rowId) { + return accessor.getObject(rowId); + } + } + + private static class DateAccessor extends ArrowVectorAccessor { + + private final DateDayVector accessor; + + DateAccessor(DateDayVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final int getInt(int rowId) { + return accessor.get(rowId); + } + } + + private static class DateMilliAccessor extends ArrowVectorAccessor { + + private final DateMilliVector accessor; + private final double val = 1.0 / (24. * 60. * 60. * 1000.); + + DateMilliAccessor(DateMilliVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final int getInt(int rowId) { + System.out.println(accessor.get(rowId) + " " + (accessor.get(rowId) * val) + " " + val); + return (int) (accessor.get(rowId) * val); + } + } + + private static class TimestampMicroAccessor extends ArrowVectorAccessor { + + private final TimeStampVector accessor; + + TimestampMicroAccessor(TimeStampMicroVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final long getLong(int rowId) { + return accessor.get(rowId); + } + } + + private static class TimestampMicroTZAccessor extends ArrowVectorAccessor { + + private final TimeStampVector accessor; + + TimestampMicroTZAccessor(TimeStampMicroTZVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final long getLong(int rowId) { + return accessor.get(rowId); + } + } + + private static class TimestampMilliAccessor extends ArrowVectorAccessor { + + private final TimeStampVector accessor; + + TimestampMilliAccessor(TimeStampMilliVector vector) { + super(vector); + this.accessor = vector; + } + + @Override + final long getLong(int rowId) { + return accessor.get(rowId) * 1000; + } + } + + private static class ArrayAccessor extends ArrowVectorAccessor { + + private final ListVector accessor; + private final FlightArrowColumnVector arrayData; + + ArrayAccessor(ListVector vector) { + super(vector); + this.accessor = vector; + this.arrayData = new FlightArrowColumnVector(vector.getDataVector()); + } + + @Override + final boolean isNullAt(int rowId) { + // TODO: Workaround if vector has all non-null values, see ARROW-1948 + if (accessor.getValueCount() > 0 && accessor.getValidityBuffer().capacity() == 0) { + return false; + } else { + return super.isNullAt(rowId); + } + } + + @Override + final ColumnarArray getArray(int rowId) { + ArrowBuf offsets = accessor.getOffsetBuffer(); + int index = rowId * ListVector.OFFSET_WIDTH; + int start = offsets.getInt(index); + int end = offsets.getInt(index + ListVector.OFFSET_WIDTH); + return new ColumnarArray(arrayData, start, end - start); + } + } + + /** + * Any call to "get" method will throw UnsupportedOperationException. + *

+ * Access struct values in a ArrowColumnVector doesn't use this accessor. Instead, it uses + * getStruct() method defined in the parent class. Any call to "get" method in this class is a + * bug in the code. + */ + private static class StructAccessor extends ArrowVectorAccessor { + + StructAccessor(StructVector vector) { + super(vector); + } + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightClientFactory.java b/src/main/java/org/apache/arrow/flight/spark/FlightClientFactory.java new file mode 100644 index 0000000..671b6ab --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightClientFactory.java @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.flight.spark; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.Location; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; + +public class FlightClientFactory implements AutoCloseable { + private final BufferAllocator allocator = new RootAllocator(); + private final Location defaultLocation; + private final FlightClientOptions clientOptions; + + public FlightClientFactory(Location defaultLocation, FlightClientOptions clientOptions) { + this.defaultLocation = defaultLocation; + this.clientOptions = clientOptions; + } + + public FlightClient apply() { + FlightClient.Builder builder = FlightClient.builder(allocator, defaultLocation); + + if (!clientOptions.getTrustedCertificates().isEmpty()) { + builder.trustedCertificates(new ByteArrayInputStream(clientOptions.getTrustedCertificates().getBytes())); + } + + if (!clientOptions.getClientCertificate().isEmpty()) { + InputStream clientCert = new ByteArrayInputStream(clientOptions.getClientCertificate().getBytes()); + InputStream clientKey = new ByteArrayInputStream(clientOptions.getClientKey().getBytes()); + builder.clientCertificate(clientCert, clientKey); + } + + // Add client middleware + clientOptions.getMiddleware().stream().forEach(middleware -> builder.intercept(middleware)); + + FlightClient client = builder.build(); + if (!clientOptions.getUsername().isEmpty()) { + client.authenticateBasic(clientOptions.getUsername(), clientOptions.getPassword()); + } + + return client; + } + + @Override + public void close() { + allocator.close(); + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightClientMiddlewareFactory.java b/src/main/java/org/apache/arrow/flight/spark/FlightClientMiddlewareFactory.java new file mode 100644 index 0000000..538b289 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightClientMiddlewareFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import java.io.Serializable; + +import org.apache.arrow.flight.FlightClientMiddleware; + +public interface FlightClientMiddlewareFactory extends FlightClientMiddleware.Factory, Serializable { + +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightClientOptions.java b/src/main/java/org/apache/arrow/flight/spark/FlightClientOptions.java new file mode 100644 index 0000000..6bbae18 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightClientOptions.java @@ -0,0 +1,62 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import java.io.Serializable; +import java.util.List; + +public class FlightClientOptions implements Serializable { + private final String username; + private final String password; + private final String trustedCertificates; + private final String clientCertificate; + private final String clientKey; + private final List middleware; + + public FlightClientOptions(String username, String password, String trustedCertificates, String clientCertificate, String clientKey, List middleware) { + this.username = username; + this.password = password; + this.trustedCertificates = trustedCertificates; + this.clientCertificate = clientCertificate; + this.clientKey = clientKey; + this.middleware = middleware; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public String getTrustedCertificates() { + return trustedCertificates; + } + + public String getClientCertificate() { + return clientCertificate; + } + + public String getClientKey() { + return clientKey; + } + + public List getMiddleware() { + return middleware; + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightColumnarPartitionReader.java b/src/main/java/org/apache/arrow/flight/spark/FlightColumnarPartitionReader.java new file mode 100644 index 0000000..d130673 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightColumnarPartitionReader.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import java.io.IOException; + +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.util.AutoCloseables; +import org.apache.spark.sql.vectorized.ColumnVector; + +public class FlightColumnarPartitionReader implements PartitionReader { + private final FlightClientFactory clientFactory;; + private final FlightClient client; + private final FlightStream stream; + + public FlightColumnarPartitionReader(FlightClientOptions clientOptions, FlightPartition partition) { + // TODO - Should we handle multiple locations? + clientFactory = new FlightClientFactory(partition.getEndpoint().get().getLocations().get(0), clientOptions); + client = clientFactory.apply(); + stream = client.getStream(partition.getEndpoint().get().getTicket()); + } + + // This is written this way because the Spark interface iterates in a different way. + // E.g., .next() -> .get() vs. .hasNext() -> .next() + @Override + public boolean next() throws IOException { + try { + return stream.next(); + } catch (RuntimeException e) { + throw new IOException(e); + } + } + + @Override + public ColumnarBatch get() { + ColumnarBatch batch = new ColumnarBatch( + stream.getRoot().getFieldVectors() + .stream() + .map(FlightArrowColumnVector::new) + .toArray(ColumnVector[]::new) + ); + batch.setNumRows(stream.getRoot().getRowCount()); + return batch; + } + + @Override + public void close() throws IOException { + try { + AutoCloseables.close(stream, client, clientFactory); + } catch (Exception e) { + throw new IOException(e); + } + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightEndpointWrapper.java b/src/main/java/org/apache/arrow/flight/spark/FlightEndpointWrapper.java new file mode 100644 index 0000000..10aa8e7 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightEndpointWrapper.java @@ -0,0 +1,57 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.net.URI; +import java.util.ArrayList; +import java.util.stream.Collectors; + +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.Ticket; + +// This is needed for FlightEndpoint to be Serializable in spark. +// org.apache.arrow.flight.FlightEndpoint is a POJO of Serializable types. +// However if spark is using build-in serialization instead of Kyro then we must implement Serializable +public class FlightEndpointWrapper implements Serializable { + private FlightEndpoint inner; + + public FlightEndpointWrapper(FlightEndpoint inner) { + this.inner = inner; + } + + public FlightEndpoint get() { + return inner; + } + + private void writeObject(ObjectOutputStream out) throws IOException { + ArrayList locations = inner.getLocations().stream().map(location -> location.getUri()).collect(Collectors.toCollection(ArrayList::new)); + out.writeObject(locations); + out.write(inner.getTicket().getBytes()); + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + @SuppressWarnings("unchecked") + Location[] locations = ((ArrayList) in.readObject()).stream().map(l -> new Location(l)).toArray(Location[]::new); + byte[] ticket = in.readAllBytes(); + this.inner = new FlightEndpoint(new Ticket(ticket), locations); + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightPartition.java b/src/main/java/org/apache/arrow/flight/spark/FlightPartition.java new file mode 100644 index 0000000..db837e8 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightPartition.java @@ -0,0 +1,36 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import org.apache.spark.sql.connector.read.InputPartition; + +public class FlightPartition implements InputPartition { + private final FlightEndpointWrapper endpoint; + + public FlightPartition(FlightEndpointWrapper endpoint) { + this.endpoint = endpoint; + } + + @Override + public String[] preferredLocations() { + return endpoint.get().getLocations().stream().map(location -> location.getUri().getHost()).toArray(String[]::new); + } + + public FlightEndpointWrapper getEndpoint() { + return endpoint; + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightPartitionReader.java b/src/main/java/org/apache/arrow/flight/spark/FlightPartitionReader.java new file mode 100644 index 0000000..631d26d --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightPartitionReader.java @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Optional; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightStream; +import org.apache.arrow.util.AutoCloseables; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +public class FlightPartitionReader implements PartitionReader { + private final FlightClientFactory clientFactory;; + private final FlightClient client; + private final FlightStream stream; + private Optional> batch; + private InternalRow row; + + public FlightPartitionReader(FlightClientOptions clientOptions, FlightPartition partition) { + // TODO - Should we handle multiple locations? + clientFactory = new FlightClientFactory(partition.getEndpoint().get().getLocations().get(0), clientOptions); + client = clientFactory.apply(); + stream = client.getStream(partition.getEndpoint().get().getTicket()); + } + + private Iterator getNextBatch() { + ColumnarBatch batch = new ColumnarBatch( + stream.getRoot().getFieldVectors() + .stream() + .map(FlightArrowColumnVector::new) + .toArray(ColumnVector[]::new) + ); + batch.setNumRows(stream.getRoot().getRowCount()); + return batch.rowIterator(); + } + + // This is written this way because the Spark interface iterates in a different way. + // E.g., .next() -> .get() vs. .hasNext() -> .next() + @Override + public boolean next() throws IOException { + try { + // Try the iterator first then get next batch + // Not quite rust match expressions... + return batch.map(currentBatch -> { + // Are there still rows in this batch? + if (currentBatch.hasNext()) { + row = currentBatch.next(); + return true; + // No more rows, get the next batch + } else { + // Is there another batch? + if (stream.next()) { + // Yes, then fetch it. + Iterator nextBatch = getNextBatch(); + batch = Optional.of(nextBatch); + if (currentBatch.hasNext()) { + row = currentBatch.next(); + return true; + // Odd, we got an empty batch + } else { + return false; + } + // This partition / stream is complete + } else { + return false; + } + } + // Fetch the first batch + }).orElseGet(() -> { + // Is the stream empty? + if (stream.next()) { + // No, then fetch the first batch + Iterator firstBatch = getNextBatch(); + batch = Optional.of(firstBatch); + if (firstBatch.hasNext()) { + row = firstBatch.next(); + return true; + // Odd, we got an empty batch + } else { + return false; + } + // The stream was empty... + } else { + return false; + } + }); + } catch (RuntimeException e) { + throw new IOException(e); + } + } + + @Override + public InternalRow get() { + return row; + } + + @Override + public void close() throws IOException { + try { + AutoCloseables.close(stream, client, clientFactory); + } catch (Exception e) { + throw new IOException(e); + } + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightPartitionReaderFactory.java b/src/main/java/org/apache/arrow/flight/spark/FlightPartitionReaderFactory.java new file mode 100644 index 0000000..addc952 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightPartitionReaderFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReader; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +public class FlightPartitionReaderFactory implements PartitionReaderFactory { + private final Broadcast clientOptions; + + public FlightPartitionReaderFactory(Broadcast clientOptions) { + this.clientOptions = clientOptions; + } + + @Override + public PartitionReader createReader(InputPartition iPartition) { + // This feels wrong but this is what upstream spark sources do to. + FlightPartition partition = (FlightPartition) iPartition; + return new FlightPartitionReader(clientOptions.getValue(), partition); + } + + @Override + public PartitionReader createColumnarReader(InputPartition iPartition) { + // This feels wrong but this is what upstream spark sources do to. + FlightPartition partition = (FlightPartition) iPartition; + return new FlightColumnarPartitionReader(clientOptions.getValue(), partition); + } + + @Override + public boolean supportColumnarReads(InputPartition partition) { + return true; + } + +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightScan.java b/src/main/java/org/apache/arrow/flight/spark/FlightScan.java new file mode 100644 index 0000000..48bfe9f --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightScan.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import org.apache.spark.sql.connector.read.Scan; + +import org.apache.arrow.flight.FlightInfo; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.Batch; +import org.apache.spark.sql.connector.read.InputPartition; +import org.apache.spark.sql.connector.read.PartitionReaderFactory; +import org.apache.spark.sql.types.StructType; + +public class FlightScan implements Scan, Batch { + private final StructType schema; + private final FlightInfo info; + private final Broadcast clientOptions; + + public FlightScan(StructType schema, FlightInfo info, Broadcast clientOptions) { + this.schema = schema; + this.info = info; + this.clientOptions = clientOptions; + } + + @Override + public StructType readSchema() { + return schema; + } + + @Override + public Batch toBatch() { + return this; + } + + @Override + public InputPartition[] planInputPartitions() { + InputPartition[] batches = info.getEndpoints().stream().map(endpoint -> { + FlightEndpointWrapper endpointWrapper = new FlightEndpointWrapper(endpoint); + return new FlightPartition(endpointWrapper); + }).toArray(InputPartition[]::new); + return batches; + } + + @Override + public PartitionReaderFactory createReaderFactory() { + return new FlightPartitionReaderFactory(clientOptions); + } + +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightScanBuilder.java b/src/main/java/org/apache/arrow/flight/spark/FlightScanBuilder.java new file mode 100644 index 0000000..6932db5 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightScanBuilder.java @@ -0,0 +1,299 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.arrow.flight.FlightClient; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.SchemaResult; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.read.Scan; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.connector.read.SupportsPushDownFilters; +import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; +import org.apache.spark.sql.sources.*; +import org.apache.spark.sql.types.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.collection.JavaConversions; + +import com.google.common.collect.Lists; +import com.google.common.base.Joiner; + +public class FlightScanBuilder implements ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownFilters { + private static final Logger LOGGER = LoggerFactory.getLogger(FlightScanBuilder.class); + private static final Joiner WHERE_JOINER = Joiner.on(" and "); + private static final Joiner PROJ_JOINER = Joiner.on(", "); + private SchemaResult flightSchema; + private StructType schema; + private final Location location; + private final Broadcast clientOptions; + private FlightDescriptor descriptor; + private String sql; + private Filter[] pushed; + + public FlightScanBuilder(Location location, Broadcast clientOptions, String sql) { + this.location = location; + this.clientOptions = clientOptions; + this.sql = sql; + descriptor = getDescriptor(sql); + } + + private class Client implements AutoCloseable { + private final FlightClientFactory clientFactory; + private final FlightClient client; + + public Client(Location location, FlightClientOptions clientOptions) { + this.clientFactory = new FlightClientFactory(location, clientOptions); + this.client = clientFactory.apply(); + } + + public FlightClient get() { + return client; + } + + @Override + public void close() throws Exception { + AutoCloseables.close(client, clientFactory); + } + } + + private void getFlightSchema(FlightDescriptor descriptor) { + try (Client client = new Client(location, clientOptions.getValue())) { + LOGGER.info("getSchema() descriptor: %s", descriptor); + flightSchema = client.get().getSchema(descriptor); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public Scan build() { + try (Client client = new Client(location, clientOptions.getValue())) { + FlightDescriptor descriptor = FlightDescriptor.command(sql.getBytes()); + LOGGER.info("getInfo() descriptor: %s", descriptor); + FlightInfo info = client.get().getInfo(descriptor); + return new FlightScan(readSchema(), info, clientOptions); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private boolean canBePushed(Filter filter) { + if (filter instanceof IsNotNull) { + return true; + } else if (filter instanceof EqualTo) { + return true; + } + if (filter instanceof GreaterThan) { + return true; + } + if (filter instanceof GreaterThanOrEqual) { + return true; + } + if (filter instanceof LessThan) { + return true; + } + if (filter instanceof LessThanOrEqual) { + return true; + } + LOGGER.error("Cant push filter of type " + filter.toString()); + return false; + } + + private String valueToString(Object value) { + if (value instanceof String) { + return String.format("'%s'", value); + } + return value.toString(); + } + + private String generateWhereClause(List pushed) { + List filterStr = Lists.newArrayList(); + for (Filter filter : pushed) { + if (filter instanceof IsNotNull) { + filterStr.add(String.format("isnotnull(\"%s\")", ((IsNotNull) filter).attribute())); + } else if (filter instanceof EqualTo) { + filterStr.add(String.format("\"%s\" = %s", ((EqualTo) filter).attribute(), valueToString(((EqualTo) filter).value()))); + } else if (filter instanceof GreaterThan) { + filterStr.add(String.format("\"%s\" > %s", ((GreaterThan) filter).attribute(), valueToString(((GreaterThan) filter).value()))); + } else if (filter instanceof GreaterThanOrEqual) { + filterStr.add(String.format("\"%s\" <= %s", ((GreaterThanOrEqual) filter).attribute(), valueToString(((GreaterThanOrEqual) filter).value()))); + } else if (filter instanceof LessThan) { + filterStr.add(String.format("\"%s\" < %s", ((LessThan) filter).attribute(), valueToString(((LessThan) filter).value()))); + } else if (filter instanceof LessThanOrEqual) { + filterStr.add(String.format("\"%s\" <= %s", ((LessThanOrEqual) filter).attribute(), valueToString(((LessThanOrEqual) filter).value()))); + } + //todo fill out rest of Filter types + } + return WHERE_JOINER.join(filterStr); + } + + private FlightDescriptor getDescriptor(String sql) { + return FlightDescriptor.command(sql.getBytes()); + } + + private void mergeWhereDescriptors(String whereClause) { + sql = String.format("select * from (%s) as where_merge where %s", sql, whereClause); + descriptor = getDescriptor(sql); + } + + @Override + public Filter[] pushFilters(Filter[] filters) { + List notPushed = Lists.newArrayList(); + List pushed = Lists.newArrayList(); + for (Filter filter : filters) { + boolean isPushed = canBePushed(filter); + if (isPushed) { + pushed.add(filter); + } else { + notPushed.add(filter); + } + } + this.pushed = pushed.toArray(new Filter[0]); + if (!pushed.isEmpty()) { + String whereClause = generateWhereClause(pushed); + mergeWhereDescriptors(whereClause); + getFlightSchema(descriptor); + } + return notPushed.toArray(new Filter[0]); + } + + @Override + public Filter[] pushedFilters() { + return pushed; + } + + private DataType sparkFromArrow(FieldType fieldType) { + switch (fieldType.getType().getTypeID()) { + case Null: + return DataTypes.NullType; + case Struct: + throw new UnsupportedOperationException("have not implemented Struct type yet"); + case List: + throw new UnsupportedOperationException("have not implemented List type yet"); + case FixedSizeList: + throw new UnsupportedOperationException("have not implemented FixedSizeList type yet"); + case Union: + throw new UnsupportedOperationException("have not implemented Union type yet"); + case Int: + ArrowType.Int intType = (ArrowType.Int) fieldType.getType(); + int bitWidth = intType.getBitWidth(); + if (bitWidth == 8) { + return DataTypes.ByteType; + } else if (bitWidth == 16) { + return DataTypes.ShortType; + } else if (bitWidth == 32) { + return DataTypes.IntegerType; + } else if (bitWidth == 64) { + return DataTypes.LongType; + } + throw new UnsupportedOperationException("unknown int type with bitwidth " + bitWidth); + case FloatingPoint: + ArrowType.FloatingPoint floatType = (ArrowType.FloatingPoint) fieldType.getType(); + FloatingPointPrecision precision = floatType.getPrecision(); + switch (precision) { + case HALF: + case SINGLE: + return DataTypes.FloatType; + case DOUBLE: + return DataTypes.DoubleType; + } + case Utf8: + return DataTypes.StringType; + case Binary: + case FixedSizeBinary: + return DataTypes.BinaryType; + case Bool: + return DataTypes.BooleanType; + case Decimal: + throw new UnsupportedOperationException("have not implemented Decimal type yet"); + case Date: + return DataTypes.DateType; + case Time: + return DataTypes.TimestampType; // note i don't know what this will do! + case Timestamp: + return DataTypes.TimestampType; + case Interval: + return DataTypes.CalendarIntervalType; + case NONE: + return DataTypes.NullType; + default: + throw new IllegalStateException("Unexpected value: " + fieldType); + } + } + + private StructType readSchemaImpl() { + if (flightSchema == null) { + getFlightSchema(descriptor); + } + StructField[] fields = flightSchema.getSchema().getFields().stream() + .map(field -> new StructField(field.getName(), + sparkFromArrow(field.getFieldType()), + field.isNullable(), + Metadata.empty())) + .toArray(StructField[]::new); + return new StructType(fields); + } + + public StructType readSchema() { + if (schema == null) { + schema = readSchemaImpl(); + } + return schema; + } + + private void mergeProjDescriptors(String projClause) { + sql = String.format("select %s from (%s) as proj_merge", projClause, sql); + descriptor = getDescriptor(sql); + } + + @Override + public void pruneColumns(StructType requiredSchema) { + if (requiredSchema.toSeq().isEmpty()) { + return; + } + StructType schema = readSchema(); + List fields = Lists.newArrayList(); + List fieldsLeft = Lists.newArrayList(); + Map fieldNames = JavaConversions.seqAsJavaList(schema.toSeq()).stream() + .collect(Collectors.toMap(StructField::name, f -> f)); + for (StructField field : JavaConversions.seqAsJavaList(requiredSchema.toSeq())) { + String name = field.name(); + StructField f = fieldNames.remove(name); + if (f != null) { + fields.add(String.format("\"%s\"", name)); + fieldsLeft.add(f); + } + } + if (!fieldNames.isEmpty()) { + this.schema = new StructType(fieldsLeft.toArray(new StructField[0])); + mergeProjDescriptors(PROJ_JOINER.join(fields)); + getFlightSchema(descriptor); + } + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightSparkContext.java b/src/main/java/org/apache/arrow/flight/spark/FlightSparkContext.java new file mode 100644 index 0000000..f57a9af --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightSparkContext.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.flight.spark; + +import org.apache.spark.SparkConf; +import org.apache.spark.sql.DataFrameReader; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +public class FlightSparkContext { + + private SparkConf conf; + + private final DataFrameReader reader; + + public FlightSparkContext(SparkSession spark) { + this.conf = spark.sparkContext().getConf(); + reader = spark.read().format("org.apache.arrow.flight.spark"); + } + + public Dataset read(String s) { + return reader.option("port", Integer.parseInt(conf.get("spark.flight.endpoint.port"))) + .option("uri", String.format( + "grpc://%s:%s", + conf.get("spark.flight.endpoint.host"), + conf.get("spark.flight.endpoint.port"))) + .option("username", conf.get("spark.flight.auth.username")) + .option("password", conf.get("spark.flight.auth.password")) + .load(s); + } + + public Dataset readSql(String s) { + return reader.option("port", Integer.parseInt(conf.get("spark.flight.endpoint.port"))) + .option("uri", String.format( + "grpc://%s:%s", + conf.get("spark.flight.endpoint.host"), + conf.get("spark.flight.endpoint.port"))) + .option("username", conf.get("spark.flight.auth.username")) + .option("password", conf.get("spark.flight.auth.password")) + .load(s); + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/FlightTable.java b/src/main/java/org/apache/arrow/flight/spark/FlightTable.java new file mode 100644 index 0000000..ee954ae --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/FlightTable.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import java.util.Set; + +import org.apache.arrow.flight.Location; +import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.sql.connector.catalog.SupportsRead; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCapability; +import org.apache.spark.sql.connector.read.ScanBuilder; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class FlightTable implements Table, SupportsRead { + private static final Set CAPABILITIES = Set.of(TableCapability.BATCH_READ); + private final String name; + private final Location location; + private final String sql; + private final Broadcast clientOptions; + private StructType schema; + + public FlightTable(String name, Location location, String sql, Broadcast clientOptions) { + this.name = name; + this.location = location; + this.sql = sql; + this.clientOptions = clientOptions; + } + + @Override + public String name() { + return name; + } + + @Override + public StructType schema() { + if (schema == null) { + FlightScanBuilder scanBuilder = new FlightScanBuilder(location, clientOptions, sql); + schema = scanBuilder.readSchema(); + } + return schema; + } + + // TODO - We could probably implement partitioning() but it would require server side support + + @Override + public Set capabilities() { + // We only support reading for now + return CAPABILITIES; + } + + @Override + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { + return new FlightScanBuilder(location, clientOptions, sql); + } +} diff --git a/src/main/java/org/apache/arrow/flight/spark/TokenClientMiddleware.java b/src/main/java/org/apache/arrow/flight/spark/TokenClientMiddleware.java new file mode 100644 index 0000000..8055c64 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/TokenClientMiddleware.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import org.apache.arrow.flight.CallHeaders; +import org.apache.arrow.flight.CallStatus; +import org.apache.arrow.flight.FlightClientMiddleware; + +public class TokenClientMiddleware implements FlightClientMiddleware { + private final String token; + + public TokenClientMiddleware(String token) { + this.token = token; + } + + @Override + public void onBeforeSendingHeaders(CallHeaders outgoingHeaders) { + outgoingHeaders.insert("authorization", String.format("Bearer %s", token)); + } + + @Override + public void onHeadersReceived(CallHeaders incomingHeaders) { + // Nothing needed here + } + + @Override + public void onCallCompleted(CallStatus status) { + // Nothing needed here + } + +} diff --git a/src/main/java/org/apache/arrow/flight/spark/TokenClientMiddlewareFactory.java b/src/main/java/org/apache/arrow/flight/spark/TokenClientMiddlewareFactory.java new file mode 100644 index 0000000..c424030 --- /dev/null +++ b/src/main/java/org/apache/arrow/flight/spark/TokenClientMiddlewareFactory.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.spark; + +import org.apache.arrow.flight.CallInfo; +import org.apache.arrow.flight.FlightClientMiddleware; + +public class TokenClientMiddlewareFactory implements FlightClientMiddlewareFactory { + private final String token; + + public TokenClientMiddlewareFactory(String token) { + this.token = token; + } + + @Override + public FlightClientMiddleware onCallStarted(CallInfo info) { + return new TokenClientMiddleware(token); + } + +} diff --git a/src/main/scala/org/apache/spark/sql/execution/arrow/FlightArrowUtils.scala b/src/main/scala/org/apache/spark/sql/execution/arrow/FlightArrowUtils.scala new file mode 100644 index 0000000..f6c569c --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/execution/arrow/FlightArrowUtils.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.arrow + +import org.apache.arrow.memory.RootAllocator +import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema} +import org.apache.arrow.vector.types.{DateUnit, FloatingPointPrecision, TimeUnit} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import scala.collection.JavaConverters._ + +/** + * FlightArrowUtils is a copy of ArrowUtils with extra support for DateMilli and TimestampMilli + */ +object FlightArrowUtils { + + val rootAllocator = new RootAllocator(Long.MaxValue) + + // todo: support more types. + + /** Maps data type from Spark to Arrow. NOTE: timeZoneId required for TimestampTypes */ + def toArrowType(dt: DataType, timeZoneId: String): ArrowType = dt match { + case BooleanType => ArrowType.Bool.INSTANCE + case ByteType => new ArrowType.Int(8, true) + case ShortType => new ArrowType.Int(8 * 2, true) + case IntegerType => new ArrowType.Int(8 * 4, true) + case LongType => new ArrowType.Int(8 * 8, true) + case FloatType => new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE) + case DoubleType => new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE) + case StringType => ArrowType.Utf8.INSTANCE + case BinaryType => ArrowType.Binary.INSTANCE + case DecimalType.Fixed(precision, scale) => new ArrowType.Decimal(precision, scale) + case DateType => new ArrowType.Date(DateUnit.DAY) + case TimestampType => + if (timeZoneId == null) { + throw new UnsupportedOperationException( + s"${TimestampType.catalogString} must supply timeZoneId parameter") + } else { + new ArrowType.Timestamp(TimeUnit.MICROSECOND, timeZoneId) + } + case _ => + throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}") + } + + def fromArrowType(dt: ArrowType): DataType = dt match { + case ArrowType.Bool.INSTANCE => BooleanType + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == 8 => ByteType + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == 8 * 2 => ShortType + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == 8 * 4 => IntegerType + case int: ArrowType.Int if int.getIsSigned && int.getBitWidth == 8 * 8 => LongType + case float: ArrowType.FloatingPoint + if float.getPrecision() == FloatingPointPrecision.SINGLE => FloatType + case float: ArrowType.FloatingPoint + if float.getPrecision() == FloatingPointPrecision.DOUBLE => DoubleType + case ArrowType.Utf8.INSTANCE => StringType + case ArrowType.Binary.INSTANCE => BinaryType + case d: ArrowType.Decimal => DecimalType(d.getPrecision, d.getScale) + case date: ArrowType.Date if date.getUnit == DateUnit.DAY || date.getUnit == DateUnit.MILLISECOND => DateType + case ts: ArrowType.Timestamp if ts.getUnit == TimeUnit.MICROSECOND || ts.getUnit == TimeUnit.MILLISECOND => TimestampType + case _ => throw new UnsupportedOperationException(s"Unsupported data type: $dt") + } + + /** Maps field from Spark to Arrow. NOTE: timeZoneId required for TimestampType */ + def toArrowField( + name: String, dt: DataType, nullable: Boolean, timeZoneId: String): Field = { + dt match { + case ArrayType(elementType, containsNull) => + val fieldType = new FieldType(nullable, ArrowType.List.INSTANCE, null) + new Field(name, fieldType, + Seq(toArrowField("element", elementType, containsNull, timeZoneId)).asJava) + case StructType(fields) => + val fieldType = new FieldType(nullable, ArrowType.Struct.INSTANCE, null) + new Field(name, fieldType, + fields.map { field => + toArrowField(field.name, field.dataType, field.nullable, timeZoneId) + }.toSeq.asJava) + case dataType => + val fieldType = new FieldType(nullable, toArrowType(dataType, timeZoneId), null) + new Field(name, fieldType, Seq.empty[Field].asJava) + } + } + + def fromArrowField(field: Field): DataType = { + field.getType match { + case ArrowType.List.INSTANCE => + val elementField = field.getChildren().get(0) + val elementType = fromArrowField(elementField) + ArrayType(elementType, containsNull = elementField.isNullable) + case ArrowType.Struct.INSTANCE => + val fields = field.getChildren().asScala.map { child => + val dt = fromArrowField(child) + StructField(child.getName, dt, child.isNullable) + } + StructType(fields) + case arrowType => fromArrowType(arrowType) + } + } + + /** Maps schema from Spark to Arrow. NOTE: timeZoneId required for TimestampType in StructType */ + def toArrowSchema(schema: StructType, timeZoneId: String): Schema = { + new Schema(schema.map { field => + toArrowField(field.name, field.dataType, field.nullable, timeZoneId) + }.asJava) + } + + def fromArrowSchema(schema: Schema): StructType = { + StructType(schema.getFields.asScala.map { field => + val dt = fromArrowField(field) + StructField(field.getName, dt, field.isNullable) + }) + } + + /** Return Map with conf settings to be used in ArrowPythonRunner */ + def getPythonRunnerConfMap(conf: SQLConf): Map[String, String] = { + val timeZoneConf = Seq(SQLConf.SESSION_LOCAL_TIMEZONE.key -> + conf.sessionLocalTimeZone) + val pandasColsByName = Seq(SQLConf.PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME.key -> + conf.pandasGroupedMapAssignColumnsByName.toString) + Map(timeZoneConf ++ pandasColsByName: _*) + } +} diff --git a/src/test/java/com/dremio/spark/TestConnector.java b/src/test/java/com/dremio/spark/TestConnector.java deleted file mode 100644 index 5889986..0000000 --- a/src/test/java/com/dremio/spark/TestConnector.java +++ /dev/null @@ -1,48 +0,0 @@ -package com.dremio.spark; - -import com.dremio.BaseTestQuery; -import com.dremio.exec.ExecTest; -import com.dremio.service.InitializerRegistry; -import org.apache.spark.SparkConf; -import org.apache.spark.sql.SQLContext; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Ignore; -import org.junit.Test; - -import org.apache.spark.api.java.JavaSparkContext; - -public class TestConnector extends BaseTestQuery { - private static SparkConf conf; - private static JavaSparkContext sc; - private static FlightSparkContext csc; - private static InitializerRegistry registry; - - @BeforeClass - public static void setUp() throws Exception { - registry = new InitializerRegistry(ExecTest.CLASSPATH_SCAN_RESULT, getBindingProvider()); - registry.start(); - conf = new SparkConf() - .setAppName("flightTest") - .setMaster("local[*]") -// .set("spark.driver.allowMultipleContexts","true") - .set("spark.flight.endpoint.host", "localhost") - .set("spark.flight.endpoint.port", "47470") - .set("spark.flight.username", "dremio") - .set("spark.flight.password", "dremio123") - ; - sc = new JavaSparkContext(conf); - csc = FlightSparkContext.flightContext(sc); - } - - @AfterClass - public static void tearDown() throws Exception { - registry.close(); - sc.close(); - } - - @Test - public void testConnect() { - csc.read("sys.options"); - } -} diff --git a/src/test/java/org/apache/arrow/flight/spark/TestConnector.java b/src/test/java/org/apache/arrow/flight/spark/TestConnector.java new file mode 100644 index 0000000..4ac7c37 --- /dev/null +++ b/src/test/java/org/apache/arrow/flight/spark/TestConnector.java @@ -0,0 +1,307 @@ +/* + * Copyright (C) 2019 The flight-spark-source Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.flight.spark; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.function.Consumer; + +import org.apache.arrow.flight.Action; +import org.apache.arrow.flight.FlightDescriptor; +import org.apache.arrow.flight.FlightEndpoint; +import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightServer; +import org.apache.arrow.flight.FlightTestUtil; +import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.NoOpFlightProducer; +import org.apache.arrow.flight.Result; +import org.apache.arrow.flight.Ticket; +import org.apache.arrow.flight.auth.ServerAuthHandler; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.Types; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.Text; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.Test.None; + +import com.google.common.collect.ImmutableList; + +public class TestConnector { + private static final BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + private static Location location; + private static FlightServer server; + private static SparkSession spark; + private static FlightSparkContext csc; + + @BeforeClass + public static void setUp() throws Exception { + server = FlightTestUtil.getStartedServer(location -> FlightServer.builder(allocator, location, new TestProducer()).authHandler( + new ServerAuthHandler() { + @Override + public Optional isValid(byte[] token) { + return Optional.of("xxx"); + } + + @Override + public boolean authenticate(ServerAuthSender outgoing, Iterator incoming) { + incoming.next(); + outgoing.send(new byte[0]); + return true; + } + }).build() + ); + location = server.getLocation(); + spark = SparkSession.builder() + .appName("flightTest") + .master("local[*]") + .config("spark.driver.host", "127.0.0.1") + .config("spark.driver.allowMultipleContexts", "true") + .config("spark.flight.endpoint.host", location.getUri().getHost()) + .config("spark.flight.endpoint.port", Integer.toString(location.getUri().getPort())) + .config("spark.flight.auth.username", "xxx") + .config("spark.flight.auth.password", "yyy") + .getOrCreate(); + csc = new FlightSparkContext(spark); + } + + @AfterClass + public static void tearDown() throws Exception { + AutoCloseables.close(server, allocator, spark); + } + + private class DummyObjectOutputStream extends ObjectOutputStream { + public DummyObjectOutputStream() throws IOException { + super(new ByteArrayOutputStream()); + } + } + + @Test(expected = None.class) + public void testFlightPartitionReaderFactorySerialization() throws IOException { + List middleware = new ArrayList<>(); + FlightClientOptions clientOptions = new FlightClientOptions("xxx", "yyy", "FooBar", "FooBar", "FooBar", middleware); + FlightPartitionReaderFactory readerFactory = new FlightPartitionReaderFactory(JavaSparkContext.fromSparkContext(spark.sparkContext()).broadcast(clientOptions)); + + try (ObjectOutputStream oos = new DummyObjectOutputStream()) { + oos.writeObject(readerFactory); + } + } + + @Test(expected = None.class) + public void testFlightPartitionSerialization() throws IOException { + Ticket ticket = new Ticket("FooBar".getBytes()); + FlightEndpoint endpoint = new FlightEndpoint(ticket, location); + FlightPartition partition = new FlightPartition(new FlightEndpointWrapper(endpoint)); + try (ObjectOutputStream oos = new DummyObjectOutputStream()) { + oos.writeObject(partition); + } + } + + @Test + public void testConnect() { + csc.read("test.table"); + } + + @Test + public void testRead() { + long count = csc.read("test.table").count(); + Assert.assertEquals(20, count); + } + + @Test + public void testSql() { + long count = csc.readSql("select * from test.table").count(); + Assert.assertEquals(20, count); + } + + @Test + public void testFilter() { + Dataset df = csc.readSql("select * from test.table"); + long count = df.filter(df.col("symbol").equalTo("USDCAD")).count(); + long countOriginal = csc.readSql("select * from test.table").count(); + Assert.assertTrue(count < countOriginal); + } + + private static class SizeConsumer implements Consumer { + private int length = 0; + private int width = 0; + + @Override + public void accept(Row row) { + length += 1; + width = row.length(); + } + } + + @Test + public void testProject() { + Dataset df = csc.readSql("select * from test.table"); + SizeConsumer c = new SizeConsumer(); + df.select("bid", "ask", "symbol").toLocalIterator().forEachRemaining(c); + long count = c.width; + long countOriginal = csc.readSql("select * from test.table").columns().length; + Assert.assertTrue(count < countOriginal); + } + + private static class TestProducer extends NoOpFlightProducer { + private boolean parallel = false; + + @Override + public void doAction(CallContext context, Action action, StreamListener listener) { + parallel = true; + listener.onNext(new Result("ok".getBytes())); + listener.onCompleted(); + } + + @Override + public FlightInfo getFlightInfo(CallContext context, FlightDescriptor descriptor) { + Schema schema; + List endpoints; + if (parallel) { + endpoints = ImmutableList.of(new FlightEndpoint(new Ticket(descriptor.getCommand()), location), + new FlightEndpoint(new Ticket(descriptor.getCommand()), location)); + } else { + endpoints = ImmutableList.of(new FlightEndpoint(new Ticket(descriptor.getCommand()), location)); + } + if (new String(descriptor.getCommand()).equals("select \"bid\", \"ask\", \"symbol\" from (select * from test.table))")) { + schema = new Schema(ImmutableList.of( + Field.nullable("bid", Types.MinorType.FLOAT8.getType()), + Field.nullable("ask", Types.MinorType.FLOAT8.getType()), + Field.nullable("symbol", Types.MinorType.VARCHAR.getType())) + ); + + } else { + schema = new Schema(ImmutableList.of( + Field.nullable("bid", Types.MinorType.FLOAT8.getType()), + Field.nullable("ask", Types.MinorType.FLOAT8.getType()), + Field.nullable("symbol", Types.MinorType.VARCHAR.getType()), + Field.nullable("bidsize", Types.MinorType.BIGINT.getType()), + Field.nullable("asksize", Types.MinorType.BIGINT.getType())) + ); + } + return new FlightInfo(schema, descriptor, endpoints, 1000000, 10); + } + + @Override + public void getStream(CallContext context, Ticket ticket, ServerStreamListener listener) { + final int size = (new String(ticket.getBytes()).contains("USDCAD")) ? 5 : 10; + + if (new String(ticket.getBytes()).equals("select \"bid\", \"ask\", \"symbol\" from (select * from test.table))")) { + Float8Vector b = new Float8Vector("bid", allocator); + Float8Vector a = new Float8Vector("ask", allocator); + VarCharVector s = new VarCharVector("symbol", allocator); + + VectorSchemaRoot root = VectorSchemaRoot.of(b, a, s); + listener.start(root); + + //batch 1 + root.allocateNew(); + for (int i = 0; i < size; i++) { + b.set(i, (double) i); + a.set(i, (double) i); + s.set(i, (i % 2 == 0) ? new Text("USDCAD") : new Text("EURUSD")); + } + b.setValueCount(size); + a.setValueCount(size); + s.setValueCount(size); + root.setRowCount(size); + listener.putNext(); + + // batch 2 + + root.allocateNew(); + for (int i = 0; i < size; i++) { + b.set(i, (double) i); + a.set(i, (double) i); + s.set(i, (i % 2 == 0) ? new Text("USDCAD") : new Text("EURUSD")); + } + b.setValueCount(size); + a.setValueCount(size); + s.setValueCount(size); + root.setRowCount(size); + listener.putNext(); + root.clear(); + listener.completed(); + } else { + BigIntVector bs = new BigIntVector("bidsize", allocator); + BigIntVector as = new BigIntVector("asksize", allocator); + Float8Vector b = new Float8Vector("bid", allocator); + Float8Vector a = new Float8Vector("ask", allocator); + VarCharVector s = new VarCharVector("symbol", allocator); + + VectorSchemaRoot root = VectorSchemaRoot.of(b, a, s, bs, as); + listener.start(root); + + //batch 1 + root.allocateNew(); + for (int i = 0; i < size; i++) { + bs.set(i, (long) i); + as.set(i, (long) i); + b.set(i, (double) i); + a.set(i, (double) i); + s.set(i, (i % 2 == 0) ? new Text("USDCAD") : new Text("EURUSD")); + } + bs.setValueCount(size); + as.setValueCount(size); + b.setValueCount(size); + a.setValueCount(size); + s.setValueCount(size); + root.setRowCount(size); + listener.putNext(); + + // batch 2 + + root.allocateNew(); + for (int i = 0; i < size; i++) { + bs.set(i, (long) i); + as.set(i, (long) i); + b.set(i, (double) i); + a.set(i, (double) i); + s.set(i, (i % 2 == 0) ? new Text("USDCAD") : new Text("EURUSD")); + } + bs.setValueCount(size); + as.setValueCount(size); + b.setValueCount(size); + a.setValueCount(size); + s.setValueCount(size); + root.setRowCount(size); + listener.putNext(); + root.clear(); + listener.completed(); + } + } + + + } +} diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 4a54f7d..62e5964 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -1,13 +1,13 @@