Skip to content

Commit

Permalink
[SPARK-25530][SQL] data source v2 API refactor (batch write)
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Adjust the batch write API to match the read API refactor after #23086

The doc with high-level ideas:
https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing

Basically it renames `BatchWriteSupportProvider` to `SupportsBatchWrite`, and make it extend `Table`. Renames `WriteSupport` to `Write`. It also cleans up some code as batch API is completed.

This PR also removes the test from #22688 . Now data source must return a table for read/write.

A few notes about future changes:
1. We will create `SupportsStreamingWrite` later for streaming APIs
2. We will create `SupportsBatchReplaceWhere`, `SupportsBatchAppend`, etc. for the new end-user write APIs. I think streaming APIs would remain to use `OutputMode`, and new end-user write APIs will apply to batch only, at least in the near future.
3. We will remove `SaveMode` from data source API: https://issues.apache.org/jira/browse/SPARK-26356

## How was this patch tested?

existing tests

Closes #23208 from cloud-fan/refactor-batch.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
cloud-fan authored and gatorsmile committed Jan 15, 2019
1 parent 1b75f3b commit 954ef96
Show file tree
Hide file tree
Showing 21 changed files with 330 additions and 245 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,7 @@
import org.apache.spark.annotation.Evolving;

/**
* The base interface for data source v2. Implementations must have a public, 0-arg constructor.
*
* Note that this is an empty interface. Data source implementations must mix in interfaces such as
* {@link BatchReadSupportProvider} or {@link BatchWriteSupportProvider}, which can provide
* batch or streaming read/write support instances. Otherwise it's just a dummy data source which
* is un-readable/writable.
*
* If Spark fails to execute any methods in the implementations of this interface (by throwing an
* exception), the read action will fail and no Spark job will be submitted.
* TODO: remove it when we finish the API refactor for streaming side.
*/
@Evolving
public interface DataSourceV2 {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;

/**
* An empty mix-in interface for {@link Table}, to indicate this table supports batch write.
* <p>
* If a {@link Table} implements this interface, the
* {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder}
* with {@link WriteBuilder#buildForBatch()} implemented.
* </p>
*/
@Evolving
public interface SupportsBatchWrite extends SupportsWrite {}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ interface SupportsRead extends Table {

/**
* Returns a {@link ScanBuilder} which can be used to build a {@link Scan}. Spark will call this
* method to configure each scan.
* method to configure each data source scan.
*
* @param options The options for reading, which is an immutable case-insensitive
* string-to-string map.
*/
ScanBuilder newScanBuilder(DataSourceOptions options);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2;

import org.apache.spark.sql.sources.v2.writer.BatchWrite;
import org.apache.spark.sql.sources.v2.writer.WriteBuilder;

/**
* An internal base interface of mix-in interfaces for writable {@link Table}. This adds
* {@link #newWriteBuilder(DataSourceOptions)} that is used to create a write
* for batch or streaming.
*/
interface SupportsWrite extends Table {

/**
* Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call
* this method to configure each data source write.
*/
WriteBuilder newWriteBuilder(DataSourceOptions options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* Please refer to the documentation of commit/abort methods for detailed specifications.
*/
@Evolving
public interface BatchWriteSupport {
public interface BatchWrite {

/**
* Creates a writer factory which will be serialized and sent to executors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
*
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* {@link BatchWriteSupport#commit(WriterCommitMessage[])} with commit messages from other data
* {@link BatchWrite#commit(WriterCommitMessage[])} with commit messages from other data
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
* exception will be sent to the driver side, and Spark may retry this writing task a few times.
* In each retry, {@link DataWriterFactory#createWriter(int, long)} will receive a
* different `taskId`. Spark will call {@link BatchWriteSupport#abort(WriterCommitMessage[])}
* different `taskId`. Spark will call {@link BatchWrite#abort(WriterCommitMessage[])}
* when the configured number of retries is exhausted.
*
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
Expand Down Expand Up @@ -71,11 +71,11 @@ public interface DataWriter<T> {
/**
* Commits this writer after all records are written successfully, returns a commit message which
* will be sent back to driver side and passed to
* {@link BatchWriteSupport#commit(WriterCommitMessage[])}.
* {@link BatchWrite#commit(WriterCommitMessage[])}.
*
* The written data should only be visible to data source readers after
* {@link BatchWriteSupport#commit(WriterCommitMessage[])} succeeds, which means this method
* should still "hide" the written data and ask the {@link BatchWriteSupport} at driver side to
* {@link BatchWrite#commit(WriterCommitMessage[])} succeeds, which means this method
* should still "hide" the written data and ask the {@link BatchWrite} at driver side to
* do the final commit via {@link WriterCommitMessage}.
*
* If this method fails (by throwing an exception), {@link #abort()} will be called and this
Expand All @@ -93,7 +93,7 @@ public interface DataWriter<T> {
* failed.
*
* If this method fails(by throwing an exception), the underlying data source may have garbage
* that need to be cleaned by {@link BatchWriteSupport#abort(WriterCommitMessage[])} or manually,
* that need to be cleaned by {@link BatchWrite#abort(WriterCommitMessage[])} or manually,
* but these garbage should not be visible to data source readers.
*
* @throws IOException if failure happens during disk/network IO like writing files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.spark.sql.catalyst.InternalRow;

/**
* A factory of {@link DataWriter} returned by {@link BatchWriteSupport#createBatchWriterFactory()},
* A factory of {@link DataWriter} returned by {@link BatchWrite#createBatchWriterFactory()},
* which is responsible for creating and initializing the actual data writer at executor side.
*
* Note that, the writer factory will be serialized and sent to executors, then the data writer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.sql.SaveMode;

// A temporary mixin trait for `WriteBuilder` to support `SaveMode`. Will be removed before
// Spark 3.0 when all the new write operators are finished. See SPARK-26356 for more details.
public interface SupportsSaveMode extends WriteBuilder {
WriteBuilder mode(SaveMode mode);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.SupportsBatchWrite;
import org.apache.spark.sql.sources.v2.Table;
import org.apache.spark.sql.types.StructType;

/**
* An interface for building the {@link BatchWrite}. Implementations can mix in some interfaces to
* support different ways to write data to data sources.
*
* Unless modified by a mixin interface, the {@link BatchWrite} configured by this builder is to
* append data without affecting existing data.
*/
@Evolving
public interface WriteBuilder {

/**
* Passes the `queryId` from Spark to data source. `queryId` is a unique string of the query. It's
* possible that there are many queries running at the same time, or a query is restarted and
* resumed. {@link BatchWrite} can use this id to identify the query.
*
* @return a new builder with the `queryId`. By default it returns `this`, which means the given
* `queryId` is ignored. Please override this method to take the `queryId`.
*/
default WriteBuilder withQueryId(String queryId) {
return this;
}

/**
* Passes the schema of the input data from Spark to data source.
*
* @return a new builder with the `schema`. By default it returns `this`, which means the given
* `schema` is ignored. Please override this method to take the `schema`.
*/
default WriteBuilder withInputDataSchema(StructType schema) {
return this;
}

/**
* Returns a {@link BatchWrite} to write data to batch source. By default this method throws
* exception, data sources must overwrite this method to provide an implementation, if the
* {@link Table} that creates this scan implements {@link SupportsBatchWrite}.
*
* Note that, the returned {@link BatchWrite} can be null if the implementation supports SaveMode,
* to indicate that no writing is needed. We can clean it up after removing
* {@link SupportsSaveMode}.
*/
default BatchWrite buildForBatch() {
throw new UnsupportedOperationException("Batch scans are not supported");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
* as the input parameter of {@link BatchWriteSupport#commit(WriterCommitMessage[])} or
* as the input parameter of {@link BatchWrite#commit(WriterCommitMessage[])} or
* {@link StreamingWriteSupport#commit(long, WriterCommitMessage[])}.
*
* This is an empty interface, data sources should define their own message class and use it when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down Expand Up @@ -209,10 +209,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
case _ => provider.getTable(dsOptions)
}
table match {
case s: SupportsBatchRead =>
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(
provider, s, finalOptions, userSpecifiedSchema = userSpecifiedSchema))

case _: SupportsBatchRead =>
Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions))
case _ => loadV1Source(paths: _*)
}
} else {
Expand Down
Loading

0 comments on commit 954ef96

Please sign in to comment.