Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23158] [SQL] Move HadoopFsRelationTest test suites to from sql/hive to sql/core #20331

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources
package org.apache.spark.sql.execution.datasources

import java.io.File

Expand All @@ -26,15 +26,14 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.sources.SimpleTextSource
import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
import org.apache.spark.sql.types._


abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with TestHiveSingleton {
import spark.implicits._
abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
import testImplicits._

val dataSourceName: String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources
package org.apache.spark.sql.execution.datasources.json

import java.math.BigDecimal

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.execution.datasources.HadoopFsRelationTest
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
class JsonHadoopFsRelationSuite extends HadoopFsRelationTest with SharedSQLContext {
override val dataSourceName: String = "json"

private val badJson = "\u0000\u0000\u0000A\u0001AAA"
Expand Down Expand Up @@ -110,14 +113,16 @@ class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {

test("invalid json with leading nulls - from file (multiLine=true)") {
import testImplicits._
withTempDir { tempDir =>
val path = tempDir.getAbsolutePath
Seq(badJson, """{"a":1}""").toDS().write.mode("overwrite").text(path)
val expected = s"""$badJson\n{"a":1}\n"""
val schema = new StructType().add("a", IntegerType).add("_corrupt_record", StringType)
val df =
spark.read.format(dataSourceName).option("multiLine", true).schema(schema).load(path)
checkAnswer(df, Row(null, expected))
withSQLConf(SQLConf.MAX_RECORDS_PER_FILE.key -> "2") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test will fail if SQLConf.MAX_RECORDS_PER_FILE.key is less than 2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default value won't be less than 2, we don't need to be so careful...

withTempDir { tempDir =>
val path = tempDir.getAbsolutePath
Seq(badJson, """{"a":1}""").toDS().repartition(1).write.mode("overwrite").text(path)
val expected = s"""$badJson\n{"a":1}\n"""
val schema = new StructType().add("a", IntegerType).add("_corrupt_record", StringType)
val df =
spark.read.format(dataSourceName).option("multiLine", true).schema(schema).load(path)
checkAnswer(df, Row(null, expected))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,20 @@
* limitations under the License.
*/

package org.apache.spark.sql.hive.orc

import java.io.File
package org.apache.spark.sql.execution.datasources.orc

import org.apache.hadoop.fs.Path

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.execution.datasources.HadoopFsRelationTest
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.HadoopFsRelationTest
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
class OrcHadoopFsRelationSuite extends SharedSQLContext

abstract class OrcHadoopFsRelationBase extends HadoopFsRelationTest {
import testImplicits._

override protected val enableAutoThreadAudit = false
Expand Down Expand Up @@ -82,44 +83,4 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
}
}
}

test("SPARK-13543: Support for specifying compression codec for ORC via option()") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")
df.write
.option("compression", "ZlIb")
.orc(path)

// Check if this is compressed as ZLIB.
val maybeOrcFile = new File(path).listFiles().find { f =>
!f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc")
}
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
OrcFileOperator.getFileReader(orcFilePath).get.getCompression
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same here.

assert("ZLIB" === expectedCompressionKind.name())

val copyDf = spark
.read
.orc(path)
checkAnswer(df, copyDf)
}
}

test("Default compression codec is snappy for ORC compression") {
withTempPath { file =>
spark.range(0, 10).write
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OrcFileOperator is defined in sql\hive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile . This test case should be tested on native implementation, too.
HiveOrcHadoopFsRelationSuite test coverage is only hive implementation.

assert("SNAPPY" === expectedCompressionKind.name())
}
}
}

class HiveOrcHadoopFsRelationSuite extends OrcHadoopFsRelationSuite {
Copy link
Member Author

@gatorsmile gatorsmile Jan 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Hive only. Thus, create a separate file for it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

override val dataSourceName: String =
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat].getCanonicalName
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources
package org.apache.spark.sql.execution.datasources.parquet

import java.io.File

Expand All @@ -25,12 +25,13 @@ import org.apache.parquet.hadoop.ParquetOutputFormat

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.execution.datasources.{HadoopFsRelationTest, SQLHadoopMapReduceCommitProtocol}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._


class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest with SharedSQLContext {
import testImplicits._

override val dataSourceName: String = "parquet"
Expand Down Expand Up @@ -162,7 +163,6 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
}

test("SPARK-11500: Not deterministic order of columns when using merging schemas.") {
import testImplicits._
withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
withTempPath { dir =>
val pathOne = s"${dir.getCanonicalPath}/part=1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.execution.datasources.HadoopFsRelationTest
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with PredicateHelper {
class SimpleTextHadoopFsRelationSuite
extends HadoopFsRelationTest with PredicateHelper with SharedSQLContext {
override val dataSourceName: String = classOf[SimpleTextSource].getCanonicalName

// We have a very limited number of supported types at here since it is just for a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.orc

import java.io.File

import org.apache.spark.sql.execution.datasources.orc.OrcHadoopFsRelationBase
import org.apache.spark.sql.hive.test.TestHiveSingleton

class HiveOrcHadoopFsRelationSuite extends OrcHadoopFsRelationBase with TestHiveSingleton {
import testImplicits._

override val dataSourceName: String =
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat].getCanonicalName

test("SPARK-13543: Support for specifying compression codec for ORC via option()") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
val df = (1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b")
df.write
.option("compression", "ZlIb")
.orc(path)

// Check if this is compressed as ZLIB.
val maybeOrcFile = new File(path).listFiles().find { f =>
!f.getName.startsWith("_") && f.getName.endsWith(".zlib.orc")
}
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
OrcFileOperator.getFileReader(orcFilePath).get.getCompression
assert("ZLIB" === expectedCompressionKind.name())

val copyDf = spark
.read
.orc(path)
checkAnswer(df, copyDf)
}
}

test("Default compression codec is snappy for ORC compression") {
withTempPath { file =>
spark.range(0, 10).write
.orc(file.getCanonicalPath)
val expectedCompressionKind =
OrcFileOperator.getFileReader(file.getCanonicalPath).get.getCompression
assert("SNAPPY" === expectedCompressionKind.name())
}
}
}