Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33169][SQL][TESTS] Check propagation of datasource options to underlying file system for built-in file-based datasources #30067

Closed
wants to merge 11 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils.{withDefaultTimeZone, LA, UTC}
import org.apache.spark.sql.execution.{FormattedMode, SparkPlan}
import org.apache.spark.sql.execution.datasources.{DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, DataSource, FilePartition}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -50,9 +50,15 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.v2.avro.AvroScan
import org.apache.spark.util.Utils

abstract class AvroSuite extends QueryTest with SharedSparkSession with NestedDataSourceSuiteBase {
abstract class AvroSuite
extends QueryTest
with SharedSparkSession
with CommonFileDataSourceSuite
with NestedDataSourceSuiteBase {

import testImplicits._

override protected def dataSourceFormat = "avro"
override val nestedDataSources = Seq("avro")
val episodesAvro = testFile("episodes.avro")
val testAvro = testFile("test.avro")
Expand Down Expand Up @@ -1807,16 +1813,6 @@ abstract class AvroSuite extends QueryTest with SharedSparkSession with NestedDa
}
}

test("SPARK-33089: should propagate Hadoop config from DS options to underlying file system") {
withSQLConf(
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
"fs.file.impl.disable.cache" -> "true") {
val conf = Map("ds_option" -> "value")
val path = "file:" + testAvro.stripPrefix("file:")
spark.read.format("avro").options(conf).load(path)
}
}

test("SPARK-33163: write the metadata key 'org.apache.spark.legacyDateTime'") {
def saveTs(dir: java.io.File): Unit = {
Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,26 @@ import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors}
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row, SaveMode}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
import org.apache.spark.util.Utils

class LibSVMRelationSuite
extends SparkFunSuite
with MLlibTestSparkContext
with CommonFileDataSourceSuite {

override protected def dataSourceFormat = "libsvm"
override protected def inputDataset = {
val rawData = new java.util.ArrayList[Row]()
rawData.add(Row(1.0, Vectors.sparse(1, Seq((0, 1.0)))))
val struct = new StructType()
.add("labelFoo", DoubleType, false)
.add("featuresBar", VectorType, false)
spark.createDataFrame(rawData, struct)
}

class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext with SQLHelper {
// Path for dataset
var path: String = _

Expand Down Expand Up @@ -212,13 +225,4 @@ class LibSVMRelationSuite extends SparkFunSuite with MLlibTestSparkContext with
assert(v == Vectors.sparse(2, Seq((0, 2.0), (1, 3.0))))
}
}

test("SPARK-33101: should propagate Hadoop config from DS options to underlying file system") {
withSQLConf(
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
"fs.file.impl.disable.cache" -> "true") {
val df = spark.read.option("ds_option", "value").format("libsvm").load(path)
assert(df.columns(0) == "label")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -842,22 +842,6 @@ class FileBasedDataSourceSuite extends QueryTest
}
}

test("SPARK-31935: Hadoop file system config should be effective in data source options") {
Seq("parquet", "").foreach { format =>
withSQLConf(
SQLConf.USE_V1_SOURCE_LIST.key -> format,
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
"fs.file.impl.disable.cache" -> "true") {
withTempDir { dir =>
val path = "file:" + dir.getCanonicalPath.stripPrefix("file:")
spark.range(10).write.option("ds_option", "value").mode("overwrite").parquet(path)
checkAnswer(
spark.read.option("ds_option", "value").parquet(path), spark.range(10).toDF())
}
}
}
}

test("SPARK-31116: Select nested schema with case insensitive mode") {
// This test case failed at only Parquet. ORC is added for test coverage parity.
Seq("orc", "parquet").foreach { format =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import org.scalatest.funsuite.AnyFunSuite

import org.apache.spark.sql.{Dataset, Encoders, FakeFileSystemRequiringDSOption, SparkSession}
import org.apache.spark.sql.catalyst.plans.SQLHelper

/**
* The trait contains tests for all file-based data sources.
* The tests that are not applicable to all file-based data sources should be placed to
* [[org.apache.spark.sql.FileBasedDataSourceSuite]].
*/
trait CommonFileDataSourceSuite extends SQLHelper { self: AnyFunSuite =>

protected def spark: SparkSession
protected def dataSourceFormat: String
protected def inputDataset: Dataset[_] = spark.createDataset(Seq("abc"))(Encoders.STRING)

test(s"Propagate Hadoop configs from $dataSourceFormat options to underlying file system") {
withSQLConf(
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
"fs.file.impl.disable.cache" -> "true") {
Seq(false, true).foreach { mergeSchema =>
withTempPath { dir =>
val path = dir.getAbsolutePath
val conf = Map("ds_option" -> "value", "mergeSchema" -> mergeSchema.toString)
inputDataset
.write
.options(conf)
.format(dataSourceFormat)
.save(path)
Seq(path, "file:" + path.stripPrefix("file:")).foreach { p =>
val readback = spark
.read
.options(conf)
.format(dataSourceFormat)
.load(p)
// Checks that read doesn't throw the exception from `FakeFileSystemRequiringDSOption`
readback.write.mode("overwrite").format("noop").save()
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,21 @@ import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkConf, SparkException, TestUtils}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._

abstract class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
abstract class CSVSuite
extends QueryTest
with SharedSparkSession
with TestCsvData
with CommonFileDataSourceSuite {

import testImplicits._

override protected def dataSourceFormat = "csv"

private val carsFile = "test-data/cars.csv"
private val carsMalformedFile = "test-data/cars-malformed.csv"
private val carsFile8859 = "test-data/cars_iso-8859-1.csv"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.ExternalRDD
import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, NoopCache}
import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, DataSource, InMemoryFileIndex, NoopCache}
import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand All @@ -49,9 +49,16 @@ class TestFileFilter extends PathFilter {
override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
}

abstract class JsonSuite extends QueryTest with SharedSparkSession with TestJsonData {
abstract class JsonSuite
extends QueryTest
with SharedSparkSession
with TestJsonData
with CommonFileDataSourceSuite {

import testImplicits._

override protected def dataSourceFormat = "json"

test("Type promotion") {
def checkTypePromotion(expected: Any, actual: Any): Unit = {
assert(expected.getClass == actual.getClass,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,20 @@ import org.apache.orc.impl.RecordReaderImpl
import org.scalatest.BeforeAndAfterAll

import org.apache.spark.{SPARK_VERSION_SHORT, SparkException}
import org.apache.spark.sql.{FakeFileSystemRequiringDSOption, Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.execution.datasources.SchemaMergeUtils
import org.apache.spark.sql.{Row, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.execution.datasources.{CommonFileDataSourceSuite, SchemaMergeUtils}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.util.Utils

case class OrcData(intField: Int, stringField: String)

abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
abstract class OrcSuite extends OrcTest with BeforeAndAfterAll with CommonFileDataSourceSuite {
import testImplicits._

override protected def dataSourceFormat = "orc"

var orcTableDir: File = null
var orcTableAsDir: File = null

Expand Down Expand Up @@ -537,21 +539,6 @@ abstract class OrcSuite extends OrcTest with BeforeAndAfterAll {
}
}
}

test("SPARK-33094: should propagate Hadoop config from DS options to underlying file system") {
withSQLConf(
"fs.file.impl" -> classOf[FakeFileSystemRequiringDSOption].getName,
"fs.file.impl.disable.cache" -> "true") {
Seq(false, true).foreach { mergeSchema =>
withTempPath { dir =>
val path = dir.getAbsolutePath
val conf = Map("ds_option" -> "value", "mergeSchema" -> mergeSchema.toString)
spark.range(1).write.options(conf).orc(path)
checkAnswer(spark.read.options(conf).orc(path), Row(0))
}
}
}
}
}

class OrcSourceSuite extends OrcSuite with SharedSparkSession {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,19 @@ package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.SparkException
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSparkSession {
abstract class ParquetFileFormatSuite
extends QueryTest
with ParquetTest
with SharedSparkSession
with CommonFileDataSourceSuite {

override protected def dataSourceFormat = "parquet"

test("read parquet footers in parallel") {
def testReadFooters(ignoreCorruptFiles: Boolean): Unit = {
Expand Down Expand Up @@ -57,3 +64,17 @@ class ParquetFileFormatSuite extends QueryTest with ParquetTest with SharedSpark
assert(exception.getMessage().contains("Could not read footer for file"))
}
}

class ParquetFileFormatV1Suite extends ParquetFileFormatSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "parquet")
}

class ParquetFileFormatV2Suite extends ParquetFileFormatSuite {
override protected def sparkConf: SparkConf =
super
.sparkConf
.set(SQLConf.USE_V1_SOURCE_LIST, "")
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,17 @@ import org.apache.hadoop.io.compress.GzipCodec

import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row, SaveMode}
import org.apache.spark.sql.execution.datasources.CommonFileDataSourceSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.Utils

abstract class TextSuite extends QueryTest with SharedSparkSession {
abstract class TextSuite extends QueryTest with SharedSparkSession with CommonFileDataSourceSuite {
import testImplicits._

override protected def dataSourceFormat = "text"

test("reading text file") {
verifyFrame(spark.read.format("text").load(testFile))
}
Expand Down