Skip to content

Commit

Permalink
[SPARK-26716][SPARK-26765][FOLLOWUP][SQL] Clean up schema validation …
Browse files Browse the repository at this point in the history
…methods and override toString method in Avro

## What changes were proposed in this pull request?

In #23639, the API `supportDataType` is refactored. We should also remove the method `verifyWriteSchema` and `verifyReadSchema` in `DataSourceUtils`.

Since the error message use `FileFormat.toString` to specify the data source naming,  this PR also overriding the `toString` method in `AvroFileFormat`.

## How was this patch tested?

Unit test.

Closes #23699 from gengliangwang/SPARK-26716-followup.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
gengliangwang authored and cloud-fan committed Jan 31, 2019
1 parent d4d6df2 commit 308996b
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ private[avro] class AvroFileFormat extends FileFormat

override def shortName(): String = "avro"

override def toString(): String = "Avro"

override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
sql("select testType()").write.format("avro").mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains(s"data source does not support calendarinterval data type."))
.contains(s"avro data source does not support calendarinterval data type."))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ case class DataSource(
hs.partitionSchema.map(_.name),
"in the partition schema",
equality)
DataSourceUtils.verifyReadSchema(hs.fileFormat, hs.dataSchema)
DataSourceUtils.verifySchema(hs.fileFormat, hs.dataSchema)
case _ =>
SchemaUtils.checkColumnNameDuplication(
relation.schema.map(_.name),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,11 @@ import org.apache.spark.sql.types._


object DataSourceUtils {

/**
* Verify if the schema is supported in datasource in write path.
*/
def verifyWriteSchema(format: FileFormat, schema: StructType): Unit = {
verifySchema(format, schema, isReadPath = false)
}

/**
* Verify if the schema is supported in datasource in read path.
*/
def verifyReadSchema(format: FileFormat, schema: StructType): Unit = {
verifySchema(format, schema, isReadPath = true)
}

/**
* Verify if the schema is supported in datasource. This verification should be done
* in a driver side.
*/
private def verifySchema(format: FileFormat, schema: StructType, isReadPath: Boolean): Unit = {
def verifySchema(format: FileFormat, schema: StructType): Unit = {
schema.foreach { field =>
if (!format.supportDataType(field.dataType)) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object FileFormatWriter extends Logging {
val caseInsensitiveOptions = CaseInsensitiveMap(options)

val dataSchema = dataColumns.toStructType
DataSourceUtils.verifyWriteSchema(fileFormat, dataSchema)
DataSourceUtils.verifySchema(fileFormat, dataSchema)
// Note: prepareWrite has side effect. It sets "job".
val outputWriterFactory =
fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataSchema)
Expand Down

0 comments on commit 308996b

Please sign in to comment.