From 8b06606c263a5ee3607ca432020a0eae268835a0 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Mon, 11 Mar 2019 21:27:05 +0800 Subject: [PATCH 1/2] make DataSourceOptions.asMap return CaseInsensitiveMap --- .../spark/sql/kafka010/KafkaSourceProvider.scala | 12 ++++-------- .../spark/sql/catalyst/util/CaseInsensitiveMap.scala | 5 +++++ .../spark/sql/sources/v2/DataSourceOptions.java | 6 ++++-- .../sql/execution/datasources/v2/FileTable.scala | 6 ++---- .../execution/datasources/v2/FileWriteBuilder.scala | 4 +--- .../datasources/v2/orc/OrcScanBuilder.scala | 4 +--- .../scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 +- .../sql/sources/v2/DataSourceOptionsSuite.scala | 5 +++++ 8 files changed, 23 insertions(+), 21 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index b39e0d40fd31c..df769065ea7f2 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} import java.util.{Locale, UUID} -import scala.collection.JavaConverters._ - import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} @@ -104,7 +102,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } override def getTable(options: DataSourceOptions): KafkaTable = { - new KafkaTable(strategy(options.asMap().asScala.toMap)) + new KafkaTable(strategy(options.asMap().toMap)) } /** @@ -372,11 +370,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } override def buildForStreaming(): StreamingWrite = { - import scala.collection.JavaConverters._ - assert(inputSchema != null) val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim) - val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap) + val producerParams = kafkaParamsForProducer(options.asMap.toMap) new KafkaStreamingWrite(topic, producerParams, inputSchema) } } @@ -388,7 +384,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { - val parameters = options.asMap().asScala.toMap + val parameters = options.asMap().toMap validateStreamOptions(parameters) // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group @@ -417,7 +413,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } override def toContinuousStream(checkpointLocation: String): ContinuousStream = { - val parameters = options.asMap().asScala.toMap + val parameters = options.asMap().toMap validateStreamOptions(parameters) // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala index 06f95989f2e3a..a11b93c701252 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/CaseInsensitiveMap.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util import java.util.Locale +import scala.collection.JavaConverters._ + /** * Builds a map in which keys are case insensitive. Input map can be accessed for cases where * case-sensitive information is required. The primary constructor is marked private to avoid @@ -53,5 +55,8 @@ object CaseInsensitiveMap { case caseSensitiveMap: CaseInsensitiveMap[T] => caseSensitiveMap case _ => new CaseInsensitiveMap(params) } + + def apply(params: java.util.Map[String, String]): CaseInsensitiveMap[String] = + new CaseInsensitiveMap(params.asScala.toMap) } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java index 00af0bf1b172c..e494e71e85843 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java @@ -27,6 +27,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap; +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap$; /** * An immutable string-to-string map in which keys are case-insensitive. This is used to represent @@ -92,8 +94,8 @@ public DataSourceOptions(Map originalMap) { } } - public Map asMap() { - return new HashMap<>(keyLowerCasedMap); + public CaseInsensitiveMap asMap() { + return CaseInsensitiveMap$.MODULE$.apply(keyLowerCasedMap); } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 21d3e5e29cfb5..f9f9b76e90bd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution.datasources.v2 -import scala.collection.JavaConverters._ - import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{AnalysisException, SparkSession} @@ -33,12 +31,12 @@ abstract class FileTable( lazy val fileIndex: PartitioningAwareFileIndex = { val filePaths = options.paths() val hadoopConf = - sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap) + sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().toMap) val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf, checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist()) val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) new InMemoryFileIndex(sparkSession, rootPathsSpecified, - options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache) + options.asMap().toMap, userSpecifiedSchema, fileStatusCache) } lazy val dataSchema: StructType = userSpecifiedSchema.orElse { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index 75c922424e8ef..da2387fb83381 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.v2 import java.io.IOException import java.util.UUID -import scala.collection.JavaConverters._ - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job @@ -64,7 +62,7 @@ abstract class FileWriteBuilder(options: DataSourceOptions) val pathName = options.paths().head val path = new Path(pathName) val sparkSession = SparkSession.active - val optionsAsScala = options.asMap().asScala.toMap + val optionsAsScala = options.asMap().toMap val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala) val job = getJobInstance(hadoopConf, path) val committer = FileCommitProtocol.instantiate( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index eb27bbd3abeaa..d8571960c388e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution.datasources.v2.orc -import scala.collection.JavaConverters._ - import org.apache.orc.mapreduce.OrcInputFormat import org.apache.spark.sql.SparkSession @@ -37,7 +35,7 @@ case class OrcScanBuilder( dataSchema: StructType, options: DataSourceOptions) extends FileScanBuilder(schema) { lazy val hadoopConf = - sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap) + sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().toMap) override def build(): Scan = { OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index a4dc537d31b7e..d98eb7df7d7c8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -1186,7 +1186,7 @@ class JDBCSuite extends QueryTest testJdbcOptions(new JDBCOptions(parameters)) testJdbcOptions(new JDBCOptions(CaseInsensitiveMap(parameters))) // test add/remove key-value from the case-insensitive map - var modifiedParameters = CaseInsensitiveMap(Map.empty) ++ parameters + var modifiedParameters = CaseInsensitiveMap(Map.empty[String, Nothing]) ++ parameters testJdbcOptions(new JDBCOptions(modifiedParameters)) modifiedParameters -= "dbtable" assert(modifiedParameters.get("dbTAblE").isEmpty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala index cfa69a86de1a7..dc1527fe417cc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala @@ -104,4 +104,9 @@ class DataSourceOptionsSuite extends SparkFunSuite { assert(options.paths().toSeq == Seq("c", "d\"e")) } + + test("asMap") { + val map = new DataSourceOptions(Map("fooBar" -> "x").asJava).asMap + assert(map.get("fooBar").get == "x") + } } From 83e19607cbd6a7fd6b397921ca164a0e53aeeb25 Mon Sep 17 00:00:00 2001 From: Yuanjian Li Date: Tue, 12 Mar 2019 11:55:06 +0800 Subject: [PATCH 2/2] fix mima test --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index fdc5cf16af15c..a375bb22a5e7a 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,9 @@ object MimaExcludes { // Exclude rules for 3.0.x lazy val v30excludes = v24excludes ++ Seq( + // [SPARK-26594] DataSourceOptions.asMap should return CaseInsensitiveMap + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.sources.v2.DataSourceOptions.asMap"), + // [SPARK-25838] Remove formatVersion from Saveable ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.DistributedLDAModel.formatVersion"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.LocalLDAModel.formatVersion"),