Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Nov 29, 2018
1 parent a6e4655 commit 38fdac6
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface Table {
/**
* A name to identify this table.
* <p>
* By default this returns the class name of this implementation. Please override it to provide a
* By default this returns the class name of the implementation. Please override it to provide a
* meaningful name, like the database and table name from catalog, or the location of files for
* this table.
* </p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public interface TableProvider extends DataSourceV2 {
* @param options the user-specified options that can identify a table, e.g. file path, Kafka
* topic name, etc. It's an immutable case-insensitive string-to-string map.
* @param schema the user-specified schema.
* @throws UnsupportedOperationException
*/
default Table getTable(DataSourceOptions options, StructType schema) {
String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,25 @@ public interface Scan {
*/
StructType readSchema();

/**
* A description string of this scan, which may includes information like: what filters are
* configured for this scan, what's the value of some important options like path, etc. The
* description doesn't need to include {@link #readSchema()}, as Spark already knows it.
* <p>
* By default this returns the class name of the implementation. Please override it to provide a
* meaningful description.
* </p>
*/
default String description() {
return this.getClass().toString();
}

/**
* Returns the physical representation of this scan for batch query. By default this method throws
* exception, data sources must overwrite this method to provide an implementation, if the
* {@link Table} that creates this scan implements {@link SupportsBatchRead}.
*
* @throws UnsupportedOperationException
*/
default Batch toBatch() {
throw new UnsupportedOperationException("Batch scans are not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._

/**
* Physical plan node for scanning a batch of data from a data source.
*/
case class DataSourceV2ScanExec(
output: Seq[AttributeReference],
@transient source: DataSourceV2,
@transient options: Map[String, String],
@transient pushedFilters: Seq[Expression],
scanDesc: String,
@transient batch: Batch)
extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
extends LeafExecNode with ColumnarBatchScan {

override def simpleString: String = "ScanV2 " + metadataString
override def simpleString: String = {
s"ScanV2${truncatedString(output, "[", ", ", "]")} $scanDesc"
}

// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,7 @@ object DataSourceV2Strategy extends Strategy {
|Output: ${output.mkString(", ")}
""".stripMargin)

val plan = DataSourceV2ScanExec(
output,
relation.source,
relation.options,
pushedFilters,
scan.toBatch)
val plan = DataSourceV2ScanExec(output, scan.description(), scan.toBatch)

val filterCondition = postScanFilters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, plan)).getOrElse(plan)
Expand Down

0 comments on commit 38fdac6

Please sign in to comment.