Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25528][SQL] data source v2 API refactor (batch read) #23086

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.clients.producer.ProducerRecord

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.streaming.Trigger

Expand Down Expand Up @@ -208,7 +208,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.executedPlan.collectFirst {
case scan: DataSourceV2ScanExec
case scan: DataSourceV2StreamingScanExec
if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
}.exists { config =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.SparkContext
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2StreamingScanExec
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.streaming.Trigger
Expand All @@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.executedPlan.collectFirst {
case scan: DataSourceV2ScanExec
case scan: DataSourceV2StreamingScanExec
if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
}.exists(_.knownPartitions.size == newCount),
Expand Down
48 changes: 16 additions & 32 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,37 +197,6 @@ object MimaExcludes {
// [SPARK-23781][CORE] Merge token renewer functionality into HadoopDelegationTokenManager
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.deploy.SparkHadoopUtil.nextCredentialRenewalTime"),

// Data Source V2 API changes
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.ContinuousReadSupport"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.ReadSupport"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.WriteSupport"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.StreamWriteSupport"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.MicroBatchReadSupport"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.DataSourceReader"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder.build"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.InputPartition.createPartitionReader"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics.estimateStatistics"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.fullSchema"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.planInputPartitions"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning.outputPartitioning"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning.outputPartitioning"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.fullSchema"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ReadSupport.planInputPartitions"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder.build"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.ContinuousInputPartition"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.InputPartitionReader"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.DataSourceWriter"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.sources.v2.writer.DataWriterFactory.createWriter"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter"),

// [SPARK-26133][ML] Remove deprecated OneHotEncoder and rename OneHotEncoderEstimator to OneHotEncoder
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ml.feature.OneHotEncoderEstimator"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.ml.feature.OneHotEncoder"),
Expand All @@ -243,7 +212,22 @@ object MimaExcludes {
// [SPARK-26141] Enable custom metrics implementation in shuffle write
// Following are Java private classes
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.shuffle.sort.UnsafeShuffleWriter.this"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.TimeTrackingOutputStream.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.storage.TimeTrackingOutputStream.this"),

// Data Source V2 API changes
(problem: Problem) => problem match {
case MissingClassProblem(cls) =>
!cls.fullName.startsWith("org.apache.spark.sql.sources.v2")
case MissingTypesProblem(newCls, _) =>
!newCls.fullName.startsWith("org.apache.spark.sql.sources.v2")
case InheritedNewAbstractMethodProblem(cls, _) =>
!cls.fullName.startsWith("org.apache.spark.sql.sources.v2")
case DirectMissingMethodProblem(meth) =>
!meth.owner.fullName.startsWith("org.apache.spark.sql.sources.v2")
case ReversedMissingMethodProblem(meth) =>
!meth.owner.fullName.startsWith("org.apache.spark.sql.sources.v2")
case _ => true
}
)

// Exclude rules for 2.4.x
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.Scan;
import org.apache.spark.sql.sources.v2.reader.ScanBuilder;

/**
* An empty mix-in interface for {@link Table}, to indicate this table supports batch scan.
* <p>
* If a {@link Table} implements this interface, its {@link Table#newScanBuilder(DataSourceOptions)}
* must return a {@link ScanBuilder} that builds {@link Scan} with {@link Scan#toBatch()}
* implemented.
* </p>
*/
@Evolving
public interface SupportsBatchRead extends Table { }
65 changes: 65 additions & 0 deletions sql/core/src/main/java/org/apache/spark/sql/sources/v2/Table.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#21306 (TableCatalog support) adds this class as org.apache.spark.sql.catalog.v2.Table in the spark-catalyst module. I think it needs to be in the catalyst module and should probably be in the o.a.s.sql.catalog.v2 package as well.

The important one is moving this to the catalyst module. The analyzer is in catalyst and all of the v2 logical plans and analysis rules will be in catalyst as well, because we are standardizing behavior. The standard validation rules should be in catalyst, not in a source-specific or hive-specific package in the sql-core or hive modules.

Because the logical plans and validation rules are in the catalyst package, the TableCatalog API needs to be there as well. For example, when a catalog table identifier is resolved for a read query, one of the results is a TableCatalog instance for the catalog portion of the identifier. That catalog is used to load the v2 table, which is then wrapped in a v2 relation for further analysis. Similarly, the write path should also validate that the catalog exists during analysis by loading it, and would then pass the catalog in a v2 logical plan for CreateTable or CreateTableAsSelect.

I also think that it makes sense to use the org.apache.spark.sql.catalog.v2 package for Table because Table is more closely tied to the TableCatalog API than to the data source API. The link to DSv2 is that Table carries newScanBuilder, but the rest of the methods exposed by Table are for catalog functions, like inspecting a table's partitioning or table properties.

Moving this class would make adding TableCatalog less intrusive.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving this to the Catalyst package would set a precedent for user-overridable behavior to live in the catalyst project. I'm not aware of anything in the Catalyst package being considered as public API right now. Are we allowed to start such a convention at this juncture?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything in catalyst is considered private (although public visibility for debugging) and it's best to stay that way.

Copy link
Contributor Author

@cloud-fan cloud-fan Nov 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this Table API need to be in catalyst? It's not even a plan. We can define a table LogicalPlan interface in catalyst, and implement it in the SQL module with this Table API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can understand wanting to keep everything in Catalyst private. That's fine with me, but I think that Catalyst does need to be able to interact with tables and catalogs that are supplied by users.

For example: Our tables support schema evolution. Specifically, reading files that were written before a column was added. When we add a column, Spark shouldn't start failing in analysis for an AppendData operation in a scheduled job (as it would today). We need to be able to signal to the validation rule that the table supports reading files that are missing columns, so that Spark can do the right validation and allow writes that used to work to continue.

How would that information -- support for reading missing columns -- be communicated to the analyzer?

Also, what about my example above: how will the analyzer load tables using a user-supplied catalog if catalyst can't use any user-supplied implementations?

We could move all of the v2 analysis rules, like ResolveRelations, into the core module, but it seems to me that this requirement is no longer providing value if we have to do that. I think that catalyst is the right place for common plans and analysis rules to live because it is the library of common SQL components.

Wherever the rules and plans end up, they will need to access to the TableCatalog API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's unclear to me what would be the best choice:

  1. move data source API to catalyst module
  2. move data source related rules to SQL core module
  3. define private catalog related APIs in catalyst module and implement them in SQL core

Can we delay the discussion when we have a PR to add catalog support after the refactor?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we delay the discussion when we have a PR to add catalog support after the refactor?

Yes, that works.

But, can we move Table to the org.apache.spark.sql.catalog.v2 package where TableCatalog is defined in the other PR? I think Table should be defined with the catalog API and moving that later would require import changes to any file that references Table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for other reviewers: in the ds v2 community sync, we decided to move data source v2 into a new module sql-api, and make catalyst depends on it. This will be done in a followup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just went to make this change, but it requires moving any SQL class from catalyst referenced by the API into the API module as well... Let's discuss the options more on the dev list thread.


import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.Scan;
import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
import org.apache.spark.sql.types.StructType;

/**
* An interface representing a logical structured data set of a data source. For example, the
* implementation can be a directory on the file system, a topic of Kafka, or a table in the
* catalog, etc.
* <p>
* This interface can mixin the following interfaces to support different operations:
* </p>
* <ul>
* <li>{@link SupportsBatchRead}: this table can be read in batch queries.</li>
* </ul>
*/
@Evolving
public interface Table {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful for a Table to also expose a name or identifier of some kind. The TableIdentifier passed into DataSourceV2Relation is only used in name to identify the relation's table. If the name (or location for path-based tables) were supplied by the table instead, it would remove the need to pass it in the relation.


/**
* A name to identify this table.
* <p>
* By default this returns the class name of the implementation. Please override it to provide a
* meaningful name, like the database and table name from catalog, or the location of files for
* this table.
* </p>
*/
default String name() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's better to just ask implementations to override toString? cc @rdblue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should have a default. Implementations should definitely implement this.

I think there is a difference between toString and name. An implementation may choose to display name when showing a table's string representation, but may choose to include extra information to show more about the table state, like Iceberg's snapshot ID.

return this.getClass().toString();
}

/**
* Returns the schema of this table.
*/
StructType schema();

/**
* Returns a {@link ScanBuilder} which can be used to build a {@link Scan} later. Spark will call
* this method for each data scanning query.
* <p>
* The builder can take some query specific information to do operators pushdown, and keep these
* information in the created {@link Scan}.
* </p>
*/
ScanBuilder newScanBuilder(DataSourceOptions options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataSourceOptions isn't simply a map for two main reasons that I can tell: first, it forces options to be case insensitive, and second, it exposes helper methods to identify tables, like tableName, databaseName, and paths. In the new abstraction, the second use of DataSourceOptions is no longer needed. The table is already instantiated by the time that this is called.

We should to reconsider DataSourceOptions. The tableName methods aren't needed and we also no longer need to forward properties from the session config because the way tables are configured has changed (catalogs handle that). I think we should remove this class and instead use the more direct implementation, CaseInsensitiveStringMap from #21306. The behavior of that class is obvious from its name and it would be shared between the v2 APIs, both catalog and data source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to me - DataSourceOptions was carrying along identifiers that really belong to a table identifier and that should be interpreted at the catalog level, not the data read level. In other words the implementation of this Table should already know what locations to look up (e.g. "files comprising dataset D"), now it's a matter of how (e.g. pushdown, filter predicates).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with it. Since CaseInsensitiveStringMap is not in the code base yet, shall we do it in the followup?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either in a follow-up or you can add the class in this PR. Either way works for me.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.DataSourceRegister;
import org.apache.spark.sql.types.StructType;

/**
* The base interface for v2 data sources which don't have a real catalog. Implementations must
* have a public, 0-arg constructor.
* <p>
* The major responsibility of this interface is to return a {@link Table} for read/write.
* </p>
*/
@Evolving
// TODO: do not extend `DataSourceV2`, after we finish the API refactor completely.
public interface TableProvider extends DataSourceV2 {

/**
* Return a {@link Table} instance to do read/write with user-specified options.
*
* @param options the user-specified options that can identify a table, e.g. file path, Kafka
* topic name, etc. It's an immutable case-insensitive string-to-string map.
*/
Table getTable(DataSourceOptions options);

/**
* Return a {@link Table} instance to do read/write with user-specified schema and options.
* <p>
* By default this method throws {@link UnsupportedOperationException}, implementations should
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadoc would normally also add @throws with this information. I agree it should be here as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I learned is that, we should only declare checked exceptions. See http://www.javapractices.com/topic/TopicAction.do?Id=171

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange, that page links to one with the opposite advice: http://www.javapractices.com/topic/TopicAction.do?Id=44

I think that @throws is a good idea whenever you want to document an exception type as part of the method contract. Since it is expected that this method isn't always implemented and may throw this exception, I think you were right to document it. And documenting exceptions is best done with @throws to highlight them in Javadoc.

The page you linked to makes the argument that unchecked exceptions aren't part of the method contract and cannot be relied on. But documenting this shows that it is part of the contract or expected behavior, so I think docs are appropriate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the throw clause.

* override this method to handle user-specified schema.
* </p>
* @param options the user-specified options that can identify a table, e.g. file path, Kafka
* topic name, etc. It's an immutable case-insensitive string-to-string map.
* @param schema the user-specified schema.
* @throws UnsupportedOperationException
*/
default Table getTable(DataSourceOptions options, StructType schema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this is from prior DataSourceV2 semantics, but what's the difference between providing the schema here and the column pruning aspect of ScanBuilder?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically just saying we should just push down this requested schema into the ScanBuilder.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a different thing. Think about you are reading a parquet file, and you know exactly what its physical schema is, and you don't want Spark to waste a job to infer the schema. Then you can specify the schema when reading.

Next, Spark will analyze the query, and figure out what the required schema is. This step is automatic and driven by Spark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @cloud-fan. These are slightly different uses.

Here, it is supplying a schema for how to interpret data files. Say you have CSV files with columns id, ts, and data and no headers. This tells the CSV reader what the columns are and how to convert the data to useful types (bigint, timestamp, and string). Column projection will later request those columns, maybe just id and data. If you only passed the projection schema, then the ts values would be returned for the data column.

String name;
if (this instanceof DataSourceRegister) {
name = ((DataSourceRegister) this).shortName();
} else {
name = this.getClass().getName();
}
throw new UnsupportedOperationException(
name + " source does not support user-specified schema");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import org.apache.spark.annotation.Evolving;

/**
* A physical representation of a data source scan for batch queries. This interface is used to
* provide physical information, like how many partitions the scanned data has, and how to read
* records from the partitions.
*/
@Evolving
public interface Batch {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BatchScan, perhaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong preference. I feel it's a little more clear to distinguish between scan and batch


/**
* Returns a list of {@link InputPartition input partitions}. Each {@link InputPartition}
* represents a data split that can be processed by one Spark task. The number of input
* partitions returned here is the same as the number of RDD partitions this scan outputs.
* <p>
* If the {@link Scan} supports filter pushdown, this Batch is likely configured with a filter
* and is responsible for creating splits for that filter, which is not a full scan.
* </p>
* <p>
* This method will be called only once during a data source scan, to launch one Spark job.
* </p>
*/
InputPartition[] planInputPartitions();

/**
* Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
*/
PartitionReaderFactory createReaderFactory();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;

/**
* A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to
* report data partitioning and try to avoid shuffle at Spark side.
*
* Note that, when a {@link ReadSupport} implementation creates exactly one {@link InputPartition},
* Spark may avoid adding a shuffle even if the reader does not implement this interface.
*/
@Evolving
// TODO: remove it, after we finish the API refactor completely.
public interface OldSupportsReportPartitioning extends ReadSupport {

/**
* Returns the output data partitioning that this reader guarantees.
*/
Partitioning outputPartitioning(ScanConfig config);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;

import org.apache.spark.annotation.Evolving;

/**
* A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to
* report statistics to Spark.
*
* As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the
* data source. Implementations that return more accurate statistics based on pushed operators will
* not improve query performance until the planner can push operators before getting stats.
*/
@Evolving
// TODO: remove it, after we finish the API refactor completely.
public interface OldSupportsReportStatistics extends ReadSupport {

/**
* Returns the estimated statistics of this data source scan.
*/
Statistics estimateStatistics(ScanConfig config);
}
Loading