diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 9f37e2b125f41..2cc9370346515 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -207,13 +207,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = sparkSession.sessionState.conf) - val pathsOption = { + val pathsOption = if (paths.isEmpty) { + None + } else { val objectMapper = new ObjectMapper() - "path" -> objectMapper.writeValueAsString(paths.toArray) + Some("paths" -> objectMapper.writeValueAsString(paths.toArray)) } - // TODO: remove this option. - val checkFilesExistsOption = "check_files_exist" -> "true" - val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption + checkFilesExistsOption + // TODO SPARK-27113: remove this option. + val checkFilesExistsOpt = "check_files_exist" -> "true" + val finalOptions = sessionOptions ++ extraOptions.toMap ++ pathsOption + checkFilesExistsOpt val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) val table = userSpecifiedSchema match { case Some(schema) => provider.getTable(dsOptions, schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index d438f76d67679..e58225e0f58ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -261,7 +261,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, session.sessionState.conf) - // TODO: remove this option. + // TODO SPARK-27113: remove this option. val checkFilesExistsOption = "check_files_exist" -> "false" val options = sessionOptions ++ extraOptions + checkFilesExistsOption val dsOptions = new CaseInsensitiveStringMap(options.asJava) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index 8c0973299e903..0545bcf207293 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import java.io.IOException - import com.fasterxml.jackson.databind.ObjectMapper import org.apache.spark.sql.SparkSession @@ -43,14 +41,9 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = { val objectMapper = new ObjectMapper() - Option(map.get("path")).map { pathStr => - try { - val paths = objectMapper.readValue(pathStr, classOf[Array[String]]) - paths.toSeq - } catch { - case _: IOException => Seq(pathStr) - } - }.getOrElse { + Option(map.get("paths")).map { pathStr => + objectMapper.readValue(pathStr, classOf[Array[String]]).toSeq + }.orElse(Option(map.get("path")).map(Seq(_))).getOrElse { throw new IllegalArgumentException("'path' must be given when reading files.") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index a98053e34b887..1fd962fd06ff1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -308,7 +308,7 @@ class StreamingDataSourceV2Suite extends StreamTest { eventually(timeout(streamingTimeout)) { // Write options should not be set. assert(!LastWriteOptions.options.containsKey(readOptionName)) - assert(LastReadOptions.options.get(readOptionName) == "true") + assert(LastReadOptions.options.getBoolean(readOptionName, false)) } } } @@ -319,7 +319,7 @@ class StreamingDataSourceV2Suite extends StreamTest { eventually(timeout(streamingTimeout)) { // Read options should not be set. assert(!LastReadOptions.options.containsKey(writeOptionName)) - assert(LastWriteOptions.options.get(writeOptionName) == "true") + assert(LastWriteOptions.options.getBoolean(writeOptionName, false)) } } }