Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-25528][SQL] data source V2 read side API refactoring #22547

Closed
wants to merge 1 commit into from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Sep 25, 2018

What changes were proposed in this pull request?

Refactor the read side API according to the abstraction proposed in the dev list

batch: catalog -> table -> scan
streaming: catalog -> table -> stream -> scan

More concretely, this PR

  1. add a new interface called Format that can return Table
  2. rename ReadSupportProvider to Table, represents a logical data set, with a schema.
  3. add a new interface InputStream to represent a streaming source in a streaming query. It can create Scans.
  4. rename ReadSupport to Scan. Each Scan triggers one Spark job. (like an RDD)

A major change of this PR is about how we execute streaming queries.
Previously:

  1. create a query with StreamingRelationV2
  2. at the beginning of streaming execution, convert StreamingRelationV2 to StreamingExecutionRelation.
  3. for each micro-batch/continuous duration, convert StreamingExecutionRelation to StreamingDataSourceV2Relation. Then apply query optimization and planning, and run.

Currently:

  1. create a query with StreamingRelationV2
  2. at the beginning of streaming execution, convert StreamingRelationV2 to MicroBatch/ContinuousExecutionRelation.
  3. do a fake query optimization/planning, get the operator pushdown result and convert MicroBatch/ContinuousExecutionRelation to StreamingDataSourceV2Relation.
  4. for each micro-batch/continuous duration, set offsets for StreamingDataSourceV2Relation, do query optimization and planning, and run. Note that, the operator pushdown in the query optimization of this step will be exactly the same as the step 3.

followup:

  1. enable operator pushdown for streaming. prototype
  2. refactor the API at write side.
  3. adopt the idea of logical/phyiscal scan. This can unify the abstraction of batch and streaming, but not sure if it's clearer than the current one. We should investigate it later.

How was this patch tested?

existing tests.

}.toArray
}
override def createContinuousScan(start: Offset): ContinuousScan = {
val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

override def planInputPartitions(): Array[InputPartition] = {
val startPartitionOffsets = start.partitionToOffsets
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpointLocation: String,
config: ScanConfig,
options: DataSourceOptions): MicroBatchInputStream = {
val parameters = options.asMap().asScala.toMap
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkpointLocation: String,
config: ScanConfig,
options: DataSourceOptions): ContinuousInputStream = {
val parameters = options.asMap().asScala.toMap
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
}.exists { config =>
query.lastExecution.logical.collectFirst {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now the known partitions is tracked by the KafkaContinuousInputStream in logical plan: https://github.com/apache/spark/pull/22547/files#diff-5fa6c9fc023183f4a855f778944d23ebR62

@SparkQA

This comment has been minimized.

@cloud-fan cloud-fan force-pushed the new-idea branch 2 times, most recently from f08e02a to a349686 Compare September 25, 2018 17:33
@SparkQA

This comment has been minimized.

@SparkQA

This comment has been minimized.

@cloud-fan
Copy link
Contributor Author

cc @rxin @jose-torres @rdblue

@SparkQA
Copy link

SparkQA commented Sep 25, 2018

Test build #96570 has finished for PR 22547 at commit 59d0abb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 10, 2018

Test build #97206 has finished for PR 22547 at commit a35d98c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rdblue
Copy link
Contributor

rdblue commented Oct 17, 2018

@cloud-fan, sorry to look at this so late, I was out on vacation for a little while. Is this about ready for review?

@cloud-fan
Copy link
Contributor Author

Hi @rdblue welcome back! I just rebased it so it's ready for review :)

@SparkQA

This comment has been minimized.

*
* @param config a {@link ScanConfig} which may contains operator pushdown information.
* @param options the user-specified options, which is same as the one used to create the
* {@link ScanConfigBuilder} that built the given {@link ScanConfig}.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another choice is to let ScanConfig carry the options. But ScanConfig is an interface and doing this will put more work at user side, so I decided to pass the options again here. Feedbacks are welcome!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that options should be passed twice.

@@ -90,6 +140,8 @@ class ContinuousExecution(
do {
runContinuous(sparkSessionForStream)
} while (state.updateAndGet(stateUpdate) == ACTIVE)

stopSources()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the new abstraction, we should only stop sources when the stream query ends, instead of each reconfiguration.

@@ -60,6 +59,14 @@ class RateStreamMicroBatchReadSupport(options: DataSourceOptions, checkpointLoca
s" is $maxSeconds, but 'rampUpTimeSeconds' is $rampUpTimeSeconds.")
}

private val numPartitions = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -319,29 +307,18 @@ class RateSourceSuite extends StreamTest {
"rate source does not support user-specified schema"))
}

test("continuous in registry") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need this test now. With the new Format abstraction, the lookup logic is unified between microbatch and continuous

val v1Source = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
val v2Source = spark.readStream.format(classOf[FakeFormat].getName).load().select("a")

Seq(v1Source, v2Source).foreach { df =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

improve this test to make sure v2 also works.

@@ -381,7 +390,7 @@ class StreamSuite extends StreamTest {

test("insert an extraStrategy") {
try {
spark.experimental.extraStrategies = TestStrategy :: Nil
spark.experimental.extraStrategies = CustomStrategy :: Nil
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we need to do a temporary planning for streaming queries, we can't allow custom strategy to remove streaming leaf nodes.

@cloud-fan
Copy link
Contributor Author

A major part of this PR is to update existing streaming sources, which is just moving code around. There are 3 things we need to pay attention to during review:

  1. the naming and documentation of the new interfaces.
  2. the new streaming query planning workflow. See the PR description for details.
  3. the updated tests, make sure there is nothing wrong.

@SparkQA
Copy link

SparkQA commented Oct 18, 2018

Test build #97538 has finished for PR 22547 at commit f2ea923.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 18, 2018

Test build #97540 has finished for PR 22547 at commit 9f63721.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -46,17 +45,22 @@ import org.apache.spark.sql.types.StructType
* scenarios, where some offsets after the specified initial ones can't be
* properly read.
*/
class KafkaContinuousReadSupport(
class KafkaContinuousInputStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to break this change into multiple PRs for batch, microbatch, and continuous? It's really large and it would be nice if we could get the changes in incrementally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this. A lot of the changes right now are for moving around the streaming code especially, which makes it harder to isolate just the proposed API for review.

An alternative is to make this PR separate commits that, while the commits themselves may not compile because of mismatching signatures - but all the commits taken together would compile, and each commit can be reviewed individually for assessing the API and then the implementation.

For example I'd propose 3 PRs:

  • Batch reading, with a commit for the interface changes and a separate commit for the implementation changes
  • Micro Batch Streaming read, with a commit for the interface changes and a separate commit for the implementation changes
  • Continuous streaming read, similar to above

Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer that the commits themselves compile, but since this is separating the modes I think it could be done incrementally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I really consider this to be a blocker on getting this merged and approved. It's difficult to have confidence in a review over such a large change. Thoughts @cloud-fan @rdblue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I'll separate this PR into 3 smaller ones, after we have agreed on the high-level design at https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing

@rdblue
Copy link
Contributor

rdblue commented Oct 19, 2018

@cloud-fan, is there a design doc that outlines these changes and the new API structure?

Also, I don't agree that we should investigate logical/physical scan later. If this goes in, we've effectively decided not to have a unified abstraction between batch and streaming.

.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap
object KafkaTable extends Table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is KafkaTable an object, not a class? This doesn't seem to fit an abstraction.

/**
* Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
*/
PartitionReaderFactory createReaderFactory();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are BatchScan and PartitionReaderFactory different interfaces?

* @param options the user-specified options, which is same as the one used to create the
* {@link ScanConfigBuilder} that built the given {@link ScanConfig}.
*/
BatchScan createBatchScan(ScanConfig config, DataSourceOptions options);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a benefit to having both ScanConfig and BatchScan objects? Why not have ScanConfigBuilder return a BatchScan directly by calling buildBatch?

*/
PartitionReaderFactory createReaderFactory(ScanConfig config);
default ScanConfigBuilder newScanConfigBuilder(DataSourceOptions options) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be clear that these are scan-specific options. Maybe add some documentation with an example of something that would be passed to configure a scan, like a target split size for combining.

*/
interface StreamingReadSupport extends ReadSupport {
@InterfaceStability.Evolving
public interface InputStream extends BaseStreamingSource {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InputStream conflicts with a well-known JVM class, java.io.InputStream. I think this should be renamed to be more specific to a streaming table scan.

userSpecifiedSchema: Option[StructType] = None): DataSourceV2Relation = {
val readSupport = source.createReadSupport(options, userSpecifiedSchema)
val output = readSupport.fullSchema().toAttributes
userSpecifiedSchema: Option[StructType] = None): Option[DataSourceV2Relation] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't return an option. A relation is not a read-side structure, it is also used in write-side logical plans as the target of a write. Validation rules like PreprocessTableInsertion validate the write dataframe against the relation's schema. That's why the relation has a newWriteSupport method.

Creating a relation from a Table should always work, even if the table isn't readable or isn't writable. Analysis can be done later to validate whether the plan that contains a relation can actually use the table.

@rdblue
Copy link
Contributor

rdblue commented Oct 19, 2018

After looking at the changes, I want to reiterate that request for a design doc. I think that code is a great way to prototype a design, but that we need to step back and make sure that the design makes sense when you view it from a high level.

I have two main motivations for that point. First, there are some classes that I don't see a justification for, like having a separate ScanConfig, BatchScan, and PartitionReaderFactory. Are all of those separate classes necessary? Can a ScanConfigBuilder return a BatchScan? Can BatchScan expose a createBatchReader(InputPartition) method?

My second motivation for saying we need a clear design doc is that I think that the current way to interact with v2 doesn't fit well with catalogs. This is based around Format, which is based on the v1 method of loading read and write implementations. But that isn't the primary way that v2 will be used be used. It happens to be the only way to call into the v2 API from Spark today, but the primary use of v2 is to integrate sources that are actually modeled as tables in some catalog.

For example, Format exposes getTable that returns a Table implementation from DataSourceOptions. Those options have tableName and databaseName methods. But tables that are identified by name shouldn't be loaded by a Format, they should be loaded by a catalog. It also uses the options for both table options and read options because there isn't a way to pass both. But most tables will be created with table options by a catalog and will accept read-specific options passed to the DataFrameReader.

I think we would approach a usable API much sooner if this work was planned based on a shared understanding of how catalogs and tables will interact in the future. Not having a catalog API right now is affecting the way tables work in this PR, and that's a concern for me.

* The major responsibility of this interface is to return a {@link Table} for read/write.
*/
@InterfaceStability.Evolving
public interface Format extends DataSourceV2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is there both Format and DataSourceV2? What does DataSourceV2 do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the write API has not been migrated and still need DataSourceV2

/**
* Return a {@link Table} instance to do read/write with user-specified options.
*
* @param options the user-specified options that can identify a table, e.g. path, table name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to pass table name and database to Format? Format should only be used in 2 places to create tables. First, in the DataFrameReader (or writer) API when a format is specified directly instead of a catalog/database/table or catalog/path. Second, it would be used in catalogs that support pluggable implementations, like the current session catalog, which needs to dynamically instantiate implementations based on the table's provider.

@jose-torres
Copy link
Contributor

I agree that we need a shared understanding of the relationship between this work and the new catalog API. I was not under the impression that the primary purpose of v2 is to integrate catalog tables.

@rdblue
Copy link
Contributor

rdblue commented Oct 19, 2018

@jose-torres, I don't mean that the primary purpose of the v2 API is for catalog integration, I mean that the primary use of v2 is with tables that are stored in some catalog. So we should make sure that the plan and design work well with catalog tables.

Another reason that catalog tables are important is that the v2 plans require a catalog for consistent behavior. So catalogs are important and I think will affect the implementation details.

@cloud-fan
Copy link
Contributor Author

@mccheah
Copy link
Contributor

mccheah commented Nov 16, 2018

@cloud-fan @rdblue I believe we've converged on an appropriate API as per our last sync. Do we have a plan to move this forward, with the separated smaller patches?

@rdblue
Copy link
Contributor

rdblue commented Nov 17, 2018

I agree that there is consensus for the proposal in the design doc and I don't think there are any blockers. If there's something I can do to help, please let me know. Otherwise ping me to review!

@cloud-fan
Copy link
Contributor Author

I was stuck with some personal business recently, I'll send a PR for batch source after the weekend.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants