diff --git a/java/CONTRIBUTING.rst b/java/CONTRIBUTING.rst index 55f53455..9f3712cf 100644 --- a/java/CONTRIBUTING.rst +++ b/java/CONTRIBUTING.rst @@ -30,22 +30,13 @@ install needed packages via pip, to build the Java cookbook. The dependency packages managed via pip by build scripts are found at `requirements.txt `_. -Java Shell -^^^^^^^^^^^^^^^^^^^^^^^^^ -For Java cookbook we are running these with Java Shell tool - -`JShell `_ - -.. code-block:: bash - - > java --version - java 11.0.14 2022-01-18 LTS - Java(TM) SE Runtime Environment 18.9 (build 11.0.14+8-LTS-263) - -.. code-block:: bash +Java +^^^^ - > jshell --version - jshell 11.0.14 +The Java cookbooks require: +- Java JDK (11+) +- Maven Build Process ------------------------- @@ -81,6 +72,21 @@ additional ``.rst`` files in the ``source`` directory. You just need to remember to add them to the ``index.rst`` file in the ``toctree`` for them to become visible. +When run, Java code snippets are wrapped in a simple main class + +.. code-block:: java + + // Any imports get put here + + public class Example { + public static void main (String[] args) { + // Your code gets inserted here + } + } + +If your code is more complicated, you can explicitly define ``public class Example``, +the above wrapping won't happen and the code will be run as-is. + Java Sphinx Directive ===================== diff --git a/java/ext/javadoctest.py b/java/ext/javadoctest.py index ae109308..296fe469 100644 --- a/java/ext/javadoctest.py +++ b/java/ext/javadoctest.py @@ -18,9 +18,16 @@ import pathlib import subprocess from typing import Any, Dict +import tempfile +import shutil -from sphinx.ext.doctest import (DocTestBuilder, TestcodeDirective, - TestoutputDirective, doctest, sphinx) +from sphinx.ext.doctest import ( + DocTestBuilder, + TestcodeDirective, + TestoutputDirective, + doctest, + sphinx, +) from sphinx.locale import __ @@ -34,6 +41,10 @@ def run(self): class JavaDocTestBuilder(DocTestBuilder): """ Runs java test snippets in the documentation. + + The code in each testcode block is insert into a template Maven project, + run through exec:java, its output captured and post-processed, and finally + compared to whatever's in the corresponding testoutput. """ name = "javadoctest" @@ -45,46 +56,49 @@ class JavaDocTestBuilder(DocTestBuilder): def compile( self, code: str, name: str, type: str, flags: Any, dont_inherit: bool ) -> Any: - # go to project that contains all your arrow maven dependencies - path_arrow_project = pathlib.Path(__file__).parent.parent / "source" / "demo" - # create list of all arrow jar dependencies - subprocess.check_call( - [ - "mvn", - "-q", - "dependency:build-classpath", - "-DincludeTypes=jar", - "-Dmdep.outputFile=.cp.tmp", - f"-Darrow.version={self.env.config.version}", - ], - cwd=path_arrow_project, - text=True, - ) - if not (path_arrow_project / ".cp.tmp").exists(): - raise RuntimeError( - __("invalid process to create jshell dependencies library") + source_dir = pathlib.Path(__file__).parent.parent / "source" / "demo" + + with tempfile.TemporaryDirectory() as project_dir: + shutil.copytree(source_dir, project_dir, dirs_exist_ok=True) + + template_file_path = ( + pathlib.Path(project_dir) + / "src" + / "main" + / "java" + / "org" + / "example" + / "Example.java" ) - # get list of all arrow jar dependencies - with open(path_arrow_project / ".cp.tmp") as f: - stdout_dependency = f.read() - if not stdout_dependency: - raise RuntimeError( - __("invalid process to list jshell dependencies library") + with open(template_file_path, "r") as infile: + template = infile.read() + + filled_template = self.fill_template(template, code) + + with open(template_file_path, "w") as outfile: + outfile.write(filled_template) + + # Support JPMS (since Arrow 16) + modified_env = os.environ.copy() + modified_env["_JAVA_OPTIONS"] = "--add-opens=java.base/java.nio=ALL-UNNAMED" + + test_proc = subprocess.Popen( + [ + "mvn", + "-f", + project_dir, + "compile", + "exec:java", + "-Dexec.mainClass=org.example.Example", + ], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + text=True, + env=modified_env, ) - # execute java testing code thru jshell and read output - # JDK11 support '-' This allows the pipe to work as expected without requiring a shell - # Migrating to /dev/stdin to also support JDK9+ - proc_jshell_process = subprocess.Popen( - ["jshell", "-R--add-opens=java.base/java.nio=ALL-UNNAMED", "--class-path", stdout_dependency, "-s", "/dev/stdin"], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - text=True, - ) - out_java_arrow, err_java_arrow = proc_jshell_process.communicate(code) - if err_java_arrow: - raise RuntimeError(__("invalid process to run jshell")) + out_java_arrow, err_java_arrow = test_proc.communicate() # continue with python logic code to do java output validation output = f"print('''{self.clean_output(out_java_arrow)}''')" @@ -93,12 +107,53 @@ def compile( return compile(output, name, self.type, flags, dont_inherit) def clean_output(self, output: str): - if output[-3:] == '-> ': - output = output[:-3] - if output[-1:] == '\n': - output = output[:-1] - output = (4*' ').join(output.split('\t')) - return output + lines = output.split("\n") + + # Remove log lines from output + lines = [l for l in lines if not l.startswith("[INFO]")] + lines = [l for l in lines if not l.startswith("[WARNING]")] + lines = [l for l in lines if not l.startswith("Download")] + lines = [l for l in lines if not l.startswith("Progress")] + + result = "\n".join(lines) + + # Sometimes the testoutput content is smushed onto the same line as + # following log line, probably just when the testcode code doesn't print + # its own final newline. This example is the only case I found so I + # didn't pull out the re module (i.e., this works) + result = result.replace( + "[INFO] ------------------------------------------------------------------------", + "", + ) + + # Convert all tabs to 4 spaces, Sphinx seems to eat tabs even if we + # explicitly put them in the testoutput block so we instead modify + # the output + result = (4 * " ").join(result.split("\t")) + + return result.strip() + + def fill_template(self, template, code): + # Detect the special case where cookbook code is already wrapped in a + # class and just use the code as-is without wrapping it up + if code.find("public class Example") >= 0: + return template + code + + # Split input code into imports and not-imports + lines = code.split("\n") + code_imports = [l for l in lines if l.startswith("import")] + code_rest = [l for l in lines if not l.startswith("import")] + + pieces = [ + template, + "\n".join(code_imports), + "\n\npublic class Example {\n public static void main(String[] args) {\n", + "\n".join(code_rest), + " }\n}", + ] + + return "\n".join(pieces) + def setup(app) -> Dict[str, Any]: app.add_directive("testcode", JavaTestcodeDirective) diff --git a/java/source/dataset.rst b/java/source/dataset.rst index f7ee556a..d8597483 100644 --- a/java/source/dataset.rst +++ b/java/source/dataset.rst @@ -344,6 +344,7 @@ In case we need to project only certain columns we could configure ScanOptions w import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowReader; + import java.util.Optional; String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/parquetfiles/data1.parquet"; String[] projection = new String[] {"name"}; diff --git a/java/source/demo/pom.xml b/java/source/demo/pom.xml index 1a2c42d4..02a5d8cf 100644 --- a/java/source/demo/pom.xml +++ b/java/source/demo/pom.xml @@ -24,6 +24,13 @@ 1.6.1 + + + org.codehaus.mojo + exec-maven-plugin + 3.0.0 + + @@ -32,8 +39,8 @@ - 8 - 8 + 11 + 11 15.0.2 @@ -107,5 +114,10 @@ core 0.26.0 + + org.apache.calcite + calcite-core + 1.37.0 + diff --git a/java/source/demo/src/main/java/org/example/Example.java b/java/source/demo/src/main/java/org/example/Example.java new file mode 100644 index 00000000..7b0693dc --- /dev/null +++ b/java/source/demo/src/main/java/org/example/Example.java @@ -0,0 +1,17 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.example; diff --git a/java/source/flight.rst b/java/source/flight.rst index 6c5c57be..875f19ed 100644 --- a/java/source/flight.rst +++ b/java/source/flight.rst @@ -70,6 +70,7 @@ Flight Client and Server import java.util.Collections; import java.util.Iterator; import java.util.List; + import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; class Dataset implements AutoCloseable { diff --git a/java/source/schema.rst b/java/source/schema.rst index f5c33f38..4c3bd5a9 100644 --- a/java/source/schema.rst +++ b/java/source/schema.rst @@ -61,6 +61,8 @@ Fields are used to denote the particular columns of tabular data. import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; + import java.util.ArrayList; + import java.util.List; FieldType intType = new FieldType(true, new ArrowType.Int(32, true), null); FieldType listType = new FieldType(true, new ArrowType.List(), null); @@ -118,6 +120,8 @@ In case we need to add metadata to our Field we could use: import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; + import java.util.HashMap; + import java.util.Map; Map metadata = new HashMap<>(); metadata.put("A", "Id card"); diff --git a/java/source/substrait.rst b/java/source/substrait.rst index 8fd9d4f4..1feab88a 100644 --- a/java/source/substrait.rst +++ b/java/source/substrait.rst @@ -61,48 +61,55 @@ Here is an example of a Java program that queries a Parquet file: import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; + import java.util.Collections; + + public class Example { + Plan queryTableNation() throws SqlParseException { + String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17"; + String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " + + "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))"; + SqlToSubstrait sqlToSubstrait = new SqlToSubstrait(); + Plan plan = sqlToSubstrait.execute(sql, Collections.singletonList(nation)); + return plan; + } - Plan queryTableNation() throws SqlParseException { - String sql = "SELECT * FROM NATION WHERE N_NATIONKEY = 17"; - String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, N_NAME CHAR(25), " + - "N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))"; - SqlToSubstrait sqlToSubstrait = new SqlToSubstrait(); - Plan plan = sqlToSubstrait.execute(sql, Collections.singletonList(nation)); - return plan; - } - - void queryDatasetThruSubstraitPlanDefinition() { - String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; - ScanOptions options = new ScanOptions(/*batchSize*/ 32768); - try ( - BufferAllocator allocator = new RootAllocator(); - DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), - FileFormat.PARQUET, uri); - Dataset dataset = datasetFactory.finish(); - Scanner scanner = dataset.newScan(options); - ArrowReader reader = scanner.scanBatches() - ) { - Map mapTableToArrowReader = new HashMap<>(); - mapTableToArrowReader.put("NATION", reader); - // get binary plan - Plan plan = queryTableNation(); - ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length); - substraitPlan.put(plan.toByteArray()); - // run query - try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery( - substraitPlan, - mapTableToArrowReader - )) { - while (arrowReader.loadNextBatch()) { - System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString()); + void queryDatasetThruSubstraitPlanDefinition() { + String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + BufferAllocator allocator = new RootAllocator(); + DatasetFactory datasetFactory = new FileSystemDatasetFactory(allocator, NativeMemoryPool.getDefault(), + FileFormat.PARQUET, uri); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches() + ) { + Map mapTableToArrowReader = new HashMap<>(); + mapTableToArrowReader.put("NATION", reader); + // get binary plan + Plan plan = queryTableNation(); + ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.toByteArray().length); + substraitPlan.put(plan.toByteArray()); + // run query + try (ArrowReader arrowReader = new AceroSubstraitConsumer(allocator).runQuery( + substraitPlan, + mapTableToArrowReader + )) { + while (arrowReader.loadNextBatch()) { + System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString()); + } } + } catch (Exception e) { + e.printStackTrace(); } - } catch (Exception e) { - e.printStackTrace(); - } - } + } - queryDatasetThruSubstraitPlanDefinition(); + public static void main(String[] args) { + Example ex = new Example(); + + ex.queryDatasetThruSubstraitPlanDefinition(); + } + } .. testoutput:: @@ -134,66 +141,72 @@ For example, we can join the nation and customer tables from the TPC-H benchmark import java.util.HashMap; import java.util.Map; - Plan queryTableNationJoinCustomer() throws SqlParseException { - String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " + - "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " + - "GROUP BY n.n_name"; - String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, " + - "N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))"; - String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, " + - "C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT NULL, " + - "C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " + - "C_COMMENT VARCHAR(117) )"; - SqlToSubstrait sqlToSubstrait = new SqlToSubstrait(); - Plan plan = sqlToSubstrait.execute(sql, - Arrays.asList(nation, customer)); - return plan; - } + public class Example { + Plan queryTableNationJoinCustomer() throws SqlParseException { + String sql = "SELECT n.n_name, COUNT(*) AS NUMBER_CUSTOMER FROM NATION n JOIN CUSTOMER c " + + "ON n.n_nationkey = c.c_nationkey WHERE n.n_nationkey = 17 " + + "GROUP BY n.n_name"; + String nation = "CREATE TABLE NATION (N_NATIONKEY BIGINT NOT NULL, " + + "N_NAME CHAR(25), N_REGIONKEY BIGINT NOT NULL, N_COMMENT VARCHAR(152))"; + String customer = "CREATE TABLE CUSTOMER (C_CUSTKEY BIGINT NOT NULL, " + + "C_NAME VARCHAR(25), C_ADDRESS VARCHAR(40), C_NATIONKEY BIGINT NOT NULL, " + + "C_PHONE CHAR(15), C_ACCTBAL DECIMAL, C_MKTSEGMENT CHAR(10), " + + "C_COMMENT VARCHAR(117) )"; + SqlToSubstrait sqlToSubstrait = new SqlToSubstrait(); + Plan plan = sqlToSubstrait.execute(sql, + Arrays.asList(nation, customer)); + return plan; + } - void queryTwoDatasetsThruSubstraitPlanDefinition() { - String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; - String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet"; - ScanOptions options = new ScanOptions(/*batchSize*/ 32768); - try ( - BufferAllocator allocator = new RootAllocator(); - DatasetFactory datasetFactory = new FileSystemDatasetFactory( - allocator, NativeMemoryPool.getDefault(), - FileFormat.PARQUET, uriNation); - Dataset dataset = datasetFactory.finish(); - Scanner scanner = dataset.newScan(options); - ArrowReader readerNation = scanner.scanBatches(); - DatasetFactory datasetFactoryCustomer = new FileSystemDatasetFactory( - allocator, NativeMemoryPool.getDefault(), - FileFormat.PARQUET, uriCustomer); - Dataset datasetCustomer = datasetFactoryCustomer.finish(); - Scanner scannerCustomer = datasetCustomer.newScan(options); - ArrowReader readerCustomer = scannerCustomer.scanBatches() - ) { - // map table to reader - Map mapTableToArrowReader = new HashMap<>(); - mapTableToArrowReader.put("NATION", readerNation); - mapTableToArrowReader.put("CUSTOMER", readerCustomer); - // get binary plan - Plan plan = queryTableNationJoinCustomer(); - ByteBuffer substraitPlan = ByteBuffer.allocateDirect( - plan.toByteArray().length); - substraitPlan.put(plan.toByteArray()); - // run query - try (ArrowReader arrowReader = new AceroSubstraitConsumer( - allocator).runQuery( - substraitPlan, - mapTableToArrowReader - )) { - while (arrowReader.loadNextBatch()) { - System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString()); + void queryTwoDatasetsThruSubstraitPlanDefinition() { + String uriNation = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; + String uriCustomer = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/customer.parquet"; + ScanOptions options = new ScanOptions(/*batchSize*/ 32768); + try ( + BufferAllocator allocator = new RootAllocator(); + DatasetFactory datasetFactory = new FileSystemDatasetFactory( + allocator, NativeMemoryPool.getDefault(), + FileFormat.PARQUET, uriNation); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader readerNation = scanner.scanBatches(); + DatasetFactory datasetFactoryCustomer = new FileSystemDatasetFactory( + allocator, NativeMemoryPool.getDefault(), + FileFormat.PARQUET, uriCustomer); + Dataset datasetCustomer = datasetFactoryCustomer.finish(); + Scanner scannerCustomer = datasetCustomer.newScan(options); + ArrowReader readerCustomer = scannerCustomer.scanBatches() + ) { + // map table to reader + Map mapTableToArrowReader = new HashMap<>(); + mapTableToArrowReader.put("NATION", readerNation); + mapTableToArrowReader.put("CUSTOMER", readerCustomer); + // get binary plan + Plan plan = queryTableNationJoinCustomer(); + ByteBuffer substraitPlan = ByteBuffer.allocateDirect( + plan.toByteArray().length); + substraitPlan.put(plan.toByteArray()); + // run query + try (ArrowReader arrowReader = new AceroSubstraitConsumer( + allocator).runQuery( + substraitPlan, + mapTableToArrowReader + )) { + while (arrowReader.loadNextBatch()) { + System.out.print(arrowReader.getVectorSchemaRoot().contentToTSVString()); + } } + } catch (Exception e) { + e.printStackTrace(); } - } catch (Exception e) { - e.printStackTrace(); } - } - queryTwoDatasetsThruSubstraitPlanDefinition(); + public static void main(String[] args) { + Example ex = new Example(); + + ex.queryTwoDatasetsThruSubstraitPlanDefinition(); + } + } .. testoutput:: @@ -223,6 +236,7 @@ Here is an example of a Java program that filters a Parquet file: import io.substrait.proto.ExtendedExpression; import java.nio.ByteBuffer; import java.util.Optional; + import java.util.Collections; import org.apache.arrow.dataset.file.FileFormat; import org.apache.arrow.dataset.file.FileSystemDatasetFactory; import org.apache.arrow.dataset.jni.NativeMemoryPool; @@ -235,43 +249,53 @@ Here is an example of a Java program that filters a Parquet file: import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.calcite.sql.parser.SqlParseException; - ByteBuffer getFilterExpression() throws SqlParseException { - String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15"; - String nation = - "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), " - + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)"; - SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait(); - ExtendedExpression expression = - expressionToSubstrait.convert(sqlExpression, Collections.singletonList(nation)); - byte[] expressionToByte = expression.toByteArray(); - ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length); - byteBuffer.put(expressionToByte); - return byteBuffer; - } + public class Example { + ByteBuffer getFilterExpression() throws SqlParseException { + String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15"; + String nation = + "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), " + + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)"; + SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait(); + ExtendedExpression expression = + expressionToSubstrait.convert(sqlExpression, Collections.singletonList(nation)); + byte[] expressionToByte = expression.toByteArray(); + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length); + byteBuffer.put(expressionToByte); + return byteBuffer; + } - void filterDataset() throws SqlParseException { - String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; - ScanOptions options = - new ScanOptions.Builder(/*batchSize*/ 32768) - .columns(Optional.empty()) - .substraitFilter(getFilterExpression()) - .build(); - try (BufferAllocator allocator = new RootAllocator(); - DatasetFactory datasetFactory = - new FileSystemDatasetFactory( - allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); - Dataset dataset = datasetFactory.finish(); - Scanner scanner = dataset.newScan(options); - ArrowReader reader = scanner.scanBatches()) { - while (reader.loadNextBatch()) { - System.out.print(reader.getVectorSchemaRoot().contentToTSVString()); + void filterDataset() throws SqlParseException { + String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; + ScanOptions options = + new ScanOptions.Builder(/*batchSize*/ 32768) + .columns(Optional.empty()) + .substraitFilter(getFilterExpression()) + .build(); + try (BufferAllocator allocator = new RootAllocator(); + DatasetFactory datasetFactory = + new FileSystemDatasetFactory( + allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches()) { + while (reader.loadNextBatch()) { + System.out.print(reader.getVectorSchemaRoot().contentToTSVString()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - filterDataset(); + public static void main(String[] args) { + Example ex = new Example(); + + try { + ex.filterDataset(); + } catch (SqlParseException e) { + System.out.println(e.getMessage()); + } + } + } .. testoutput:: @@ -302,6 +326,7 @@ a Parquet file: import io.substrait.proto.ExtendedExpression; import java.nio.ByteBuffer; import java.util.Optional; + import java.util.Collections; import org.apache.arrow.dataset.file.FileFormat; import org.apache.arrow.dataset.file.FileSystemDatasetFactory; import org.apache.arrow.dataset.jni.NativeMemoryPool; @@ -314,58 +339,68 @@ a Parquet file: import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.calcite.sql.parser.SqlParseException; - ByteBuffer getProjectExpression() throws SqlParseException { - String[] sqlExpression = new String[]{"N_NAME", "N_NATIONKEY > 12", "N_NATIONKEY + 31"}; - String nation = - "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), " - + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)"; - SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait(); - ExtendedExpression expression = - expressionToSubstrait.convert(sqlExpression, Collections.singletonList(nation)); - byte[] expressionToByte = expression.toByteArray(); - ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length); - byteBuffer.put(expressionToByte); - return byteBuffer; - } + public class Example { + ByteBuffer getProjectExpression() throws SqlParseException { + String[] sqlExpression = new String[]{"N_NAME", "N_NATIONKEY > 12", "N_NATIONKEY + 31"}; + String nation = + "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), " + + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)"; + SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait(); + ExtendedExpression expression = + expressionToSubstrait.convert(sqlExpression, Collections.singletonList(nation)); + byte[] expressionToByte = expression.toByteArray(); + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length); + byteBuffer.put(expressionToByte); + return byteBuffer; + } - ByteBuffer getFilterExpression() throws SqlParseException { - String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15"; - String nation = - "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), " - + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)"; - SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait(); - ExtendedExpression expression = - expressionToSubstrait.convert(sqlExpression, Collections.singletonList(nation)); - byte[] expressionToByte = expression.toByteArray(); - ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length); - byteBuffer.put(expressionToByte); - return byteBuffer; - } + ByteBuffer getFilterExpression() throws SqlParseException { + String sqlExpression = "N_NATIONKEY > 10 AND N_NATIONKEY < 15"; + String nation = + "CREATE TABLE NATION (N_NATIONKEY INT NOT NULL, N_NAME CHAR(25), " + + "N_REGIONKEY INT NOT NULL, N_COMMENT VARCHAR)"; + SqlExpressionToSubstrait expressionToSubstrait = new SqlExpressionToSubstrait(); + ExtendedExpression expression = + expressionToSubstrait.convert(sqlExpression, Collections.singletonList(nation)); + byte[] expressionToByte = expression.toByteArray(); + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(expressionToByte.length); + byteBuffer.put(expressionToByte); + return byteBuffer; + } - void filterAndProjectDataset() throws SqlParseException { - String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; - ScanOptions options = - new ScanOptions.Builder(/*batchSize*/ 32768) - .columns(Optional.empty()) - .substraitFilter(getFilterExpression()) - .substraitProjection(getProjectExpression()) - .build(); - try (BufferAllocator allocator = new RootAllocator(); - DatasetFactory datasetFactory = - new FileSystemDatasetFactory( - allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); - Dataset dataset = datasetFactory.finish(); - Scanner scanner = dataset.newScan(options); - ArrowReader reader = scanner.scanBatches()) { - while (reader.loadNextBatch()) { - System.out.print(reader.getVectorSchemaRoot().contentToTSVString()); + void filterAndProjectDataset() throws SqlParseException { + String uri = "file:" + System.getProperty("user.dir") + "/thirdpartydeps/tpch/nation.parquet"; + ScanOptions options = + new ScanOptions.Builder(/*batchSize*/ 32768) + .columns(Optional.empty()) + .substraitFilter(getFilterExpression()) + .substraitProjection(getProjectExpression()) + .build(); + try (BufferAllocator allocator = new RootAllocator(); + DatasetFactory datasetFactory = + new FileSystemDatasetFactory( + allocator, NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches()) { + while (reader.loadNextBatch()) { + System.out.print(reader.getVectorSchemaRoot().contentToTSVString()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - filterAndProjectDataset(); + public static void main (String[] args) { + Example ex = new Example(); + + try { + ex.filterAndProjectDataset(); + } catch (SqlParseException e) { + System.out.println(e.getMessage()); + } + } + } .. testoutput::