Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26594][SQL] DataSourceOptions.asMap should return CaseInsensitiveMap #24062

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package org.apache.spark.sql.kafka010
import java.{util => ju}
import java.util.{Locale, UUID}

import scala.collection.JavaConverters._

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
Expand Down Expand Up @@ -104,7 +102,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

override def getTable(options: DataSourceOptions): KafkaTable = {
new KafkaTable(strategy(options.asMap().asScala.toMap))
new KafkaTable(strategy(options.asMap().toMap))
}

/**
Expand Down Expand Up @@ -372,11 +370,9 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

override def buildForStreaming(): StreamingWrite = {
import scala.collection.JavaConverters._

assert(inputSchema != null)
val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
val producerParams = kafkaParamsForProducer(options.asMap.toMap)
new KafkaStreamingWrite(topic, producerParams, inputSchema)
}
}
Expand All @@ -388,7 +384,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
val parameters = options.asMap().asScala.toMap
val parameters = options.asMap().toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
Expand Down Expand Up @@ -417,7 +413,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
val parameters = options.asMap().asScala.toMap
val parameters = options.asMap().toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
Expand Down
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ object MimaExcludes {

// Exclude rules for 3.0.x
lazy val v30excludes = v24excludes ++ Seq(
// [SPARK-26594] DataSourceOptions.asMap should return CaseInsensitiveMap
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.sources.v2.DataSourceOptions.asMap"),

// [SPARK-25838] Remove formatVersion from Saveable
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.DistributedLDAModel.formatVersion"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.LocalLDAModel.formatVersion"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.util

import java.util.Locale

import scala.collection.JavaConverters._

/**
* Builds a map in which keys are case insensitive. Input map can be accessed for cases where
* case-sensitive information is required. The primary constructor is marked private to avoid
Expand Down Expand Up @@ -53,5 +55,8 @@ object CaseInsensitiveMap {
case caseSensitiveMap: CaseInsensitiveMap[T] => caseSensitiveMap
case _ => new CaseInsensitiveMap(params)
}

def apply(params: java.util.Map[String, String]): CaseInsensitiveMap[String] =
new CaseInsensitiveMap(params.asScala.toMap)
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap;
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap$;

/**
* An immutable string-to-string map in which keys are case-insensitive. This is used to represent
Expand Down Expand Up @@ -92,8 +94,8 @@ public DataSourceOptions(Map<String, String> originalMap) {
}
}

public Map<String, String> asMap() {
return new HashMap<>(keyLowerCasedMap);
public CaseInsensitiveMap<String> asMap() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there was a discussion that returning CaseInsensitiveMap was a bad design before (IIRC). I think that's the reason why it return a regular map instead. Why don't we just clearly document what it returns? I think this can be a conservative compromise as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any link for the discussion? I didn't find in #19925.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I talked with @cloud-fan and @rxin. I might remember this wrongly. CaseInsensitiveMap is an internal class from catalyst. We shouldn't expose it like this anyway.

return CaseInsensitiveMap$.MODULE$.apply(keyLowerCasedMap);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.spark.sql.execution.datasources.v2

import scala.collection.JavaConverters._

import org.apache.hadoop.fs.FileStatus

import org.apache.spark.sql.{AnalysisException, SparkSession}
Expand All @@ -33,12 +31,12 @@ abstract class FileTable(
lazy val fileIndex: PartitioningAwareFileIndex = {
val filePaths = options.paths()
val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().toMap)
val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf,
checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist())
val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
new InMemoryFileIndex(sparkSession, rootPathsSpecified,
options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache)
options.asMap().toMap, userSpecifiedSchema, fileStatusCache)
}

lazy val dataSchema: StructType = userSpecifiedSchema.orElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.datasources.v2
import java.io.IOException
import java.util.UUID

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
Expand Down Expand Up @@ -64,7 +62,7 @@ abstract class FileWriteBuilder(options: DataSourceOptions)
val pathName = options.paths().head
val path = new Path(pathName)
val sparkSession = SparkSession.active
val optionsAsScala = options.asMap().asScala.toMap
val optionsAsScala = options.asMap().toMap
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala)
val job = getJobInstance(hadoopConf, path)
val committer = FileCommitProtocol.instantiate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.datasources.v2.orc

import scala.collection.JavaConverters._

import org.apache.orc.mapreduce.OrcInputFormat

import org.apache.spark.sql.SparkSession
Expand All @@ -37,7 +35,7 @@ case class OrcScanBuilder(
dataSchema: StructType,
options: DataSourceOptions) extends FileScanBuilder(schema) {
lazy val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap)
sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().toMap)

override def build(): Scan = {
OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1186,7 +1186,7 @@ class JDBCSuite extends QueryTest
testJdbcOptions(new JDBCOptions(parameters))
testJdbcOptions(new JDBCOptions(CaseInsensitiveMap(parameters)))
// test add/remove key-value from the case-insensitive map
var modifiedParameters = CaseInsensitiveMap(Map.empty) ++ parameters
var modifiedParameters = CaseInsensitiveMap(Map.empty[String, Nothing]) ++ parameters
testJdbcOptions(new JDBCOptions(modifiedParameters))
modifiedParameters -= "dbtable"
assert(modifiedParameters.get("dbTAblE").isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,9 @@ class DataSourceOptionsSuite extends SparkFunSuite {

assert(options.paths().toSeq == Seq("c", "d\"e"))
}

test("asMap") {
val map = new DataSourceOptions(Map("fooBar" -> "x").asJava).asMap
Copy link
Member Author

@xuanyuanking xuanyuanking Mar 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After #24025, below test passed.

val map = new CaseInsensitiveStringMap(Map("fooBar" -> "x").asJava).asCaseSensitiveMap()
assert(map.get("fooBar") == "x")

#24094 also add asCaseSensitiveMap in CaseInsensitiveStringMap for Hadoop Configurations. So this PR is no more needed.

assert(map.get("fooBar").get == "x")
}
}