From b8b3a3c81e1b42c0481139e172aed59fedaca17e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 8 Mar 2019 23:43:03 +0800 Subject: [PATCH] merge CaseInsensitiveStringMap and DataSourceOptions --- .../sql/kafka010/KafkaContinuousStream.scala | 3 +- .../sql/kafka010/KafkaMicroBatchStream.scala | 7 +- .../kafka010/KafkaOffsetRangeCalculator.scala | 6 +- .../sql/kafka010/KafkaSourceProvider.scala | 19 +- .../kafka010/KafkaMicroBatchSourceSuite.scala | 4 +- .../KafkaOffsetRangeCalculatorSuite.scala | 8 +- .../sql/util/CaseInsensitiveStringMap.java | 57 ++++- .../util/CaseInsensitiveStringMapSuite.java | 48 ---- .../util/CaseInsensitiveStringMapSuite.scala} | 57 ++--- .../sql/sources/v2/DataSourceOptions.java | 210 ------------------ .../sql/sources/v2/SupportsBatchRead.java | 5 +- .../sql/sources/v2/SupportsBatchWrite.java | 5 +- .../sources/v2/SupportsContinuousRead.java | 5 +- .../sources/v2/SupportsMicroBatchRead.java | 5 +- .../spark/sql/sources/v2/SupportsRead.java | 7 +- .../sources/v2/SupportsStreamingWrite.java | 5 +- .../spark/sql/sources/v2/SupportsWrite.java | 5 +- .../spark/sql/sources/v2/TableProvider.java | 5 +- .../apache/spark/sql/DataFrameReader.scala | 12 +- .../apache/spark/sql/DataFrameWriter.scala | 8 +- .../datasources/FallbackOrcDataSourceV2.scala | 13 +- .../datasources/noop/NoopDataSource.scala | 5 +- .../v2/DataSourceV2Implicits.scala | 8 +- .../datasources/v2/DataSourceV2Relation.scala | 7 +- .../datasources/v2/DataSourceV2Strategy.scala | 8 +- .../datasources/v2/FileDataSourceV2.scala | 19 ++ .../execution/datasources/v2/FileTable.scala | 22 +- .../datasources/v2/FileWriteBuilder.scala | 15 +- .../v2/WriteToDataSourceV2Exec.scala | 18 +- .../datasources/v2/orc/OrcDataSourceV2.scala | 21 +- .../datasources/v2/orc/OrcScanBuilder.scala | 7 +- .../datasources/v2/orc/OrcTable.scala | 14 +- .../datasources/v2/orc/OrcWriteBuilder.scala | 6 +- .../streaming/MicroBatchExecution.scala | 3 +- .../execution/streaming/StreamExecution.scala | 5 +- .../streaming/StreamingRelation.scala | 3 +- .../sql/execution/streaming/console.scala | 5 +- .../continuous/ContinuousExecution.scala | 6 +- .../ContinuousRateStreamSource.scala | 6 +- .../ContinuousTextSocketSource.scala | 4 +- .../sql/execution/streaming/memory.scala | 7 +- .../streaming/sources/ConsoleWrite.scala | 4 +- .../sources/ForeachWriterTable.scala | 5 +- .../sources/RateStreamMicroBatchStream.scala | 4 +- .../sources/RateStreamProvider.scala | 9 +- .../sources/TextSocketMicroBatchStream.scala | 4 +- .../sources/TextSocketSourceProvider.scala | 17 +- .../streaming/sources/memoryV2.scala | 5 +- .../sql/streaming/DataStreamReader.scala | 5 +- .../sql/streaming/DataStreamWriter.scala | 5 +- .../sources/v2/JavaAdvancedDataSourceV2.java | 6 +- .../sources/v2/JavaColumnarDataSourceV2.java | 6 +- .../v2/JavaPartitionAwareDataSource.java | 6 +- .../v2/JavaReportStatisticsDataSource.java | 6 +- .../v2/JavaSchemaRequiredDataSource.java | 8 +- .../sources/v2/JavaSimpleDataSourceV2.java | 6 +- .../datasources/orc/OrcFilterSuite.scala | 5 +- .../sources/RateStreamProviderSuite.scala | 11 +- .../sources/TextSocketStreamSuite.scala | 18 +- .../sql/sources/v2/DataSourceV2Suite.scala | 35 +-- .../v2/FileDataSourceV2FallBackSuite.scala | 11 +- .../sources/v2/SimpleWritableDataSource.scala | 14 +- .../sources/StreamingDataSourceV2Suite.scala | 37 +-- 63 files changed, 354 insertions(+), 556 deletions(-) delete mode 100644 sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java rename sql/{core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala => catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala} (54%) delete mode 100644 sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 0e6171724402e..d60ee1cadd195 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -37,8 +37,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming._ * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be * read by per-task consumers generated later. * @param kafkaParams String params for per-task Kafka consumers. - * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which - * are not Kafka consumer params. + * @param sourceOptions Params which are not Kafka consumer params. * @param metadataPath Path to a directory this reader can use for writing metadata. * @param initialOffsets The Kafka offsets to start reading data at. * @param failOnDataLoss Flag indicating whether reading should fail in data loss diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index a6303461445fa..6972f391f2852 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset} +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.UninterruptibleThread /** @@ -57,7 +57,7 @@ import org.apache.spark.util.UninterruptibleThread private[kafka010] class KafkaMicroBatchStream( kafkaOffsetReader: KafkaOffsetReader, executorKafkaParams: ju.Map[String, Object], - options: DataSourceOptions, + options: CaseInsensitiveStringMap, metadataPath: String, startingOffsets: KafkaOffsetRangeLimit, failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging { @@ -66,8 +66,7 @@ private[kafka010] class KafkaMicroBatchStream( "kafkaConsumer.pollTimeoutMs", SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L) - private val maxOffsetsPerTrigger = - Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong) + private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong) private val rangeCalculator = KafkaOffsetRangeCalculator(options) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala index 6008794924052..1af8404b89c68 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition -import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** @@ -91,8 +91,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int private[kafka010] object KafkaOffsetRangeCalculator { - def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = { - val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt) + def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator = { + val optionalValue = Option(options.get("minPartitions")).map(_.toInt) new KafkaOffsetRangeCalculator(optionalValue) } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index b39e0d40fd31c..8496cbda261be 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.sources.v2.writer.WriteBuilder import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * The provider class for all Kafka readers and writers. It is designed such that it throws @@ -103,8 +104,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister failOnDataLoss(caseInsensitiveParams)) } - override def getTable(options: DataSourceOptions): KafkaTable = { - new KafkaTable(strategy(options.asMap().asScala.toMap)) + override def getTable(options: CaseInsensitiveStringMap): KafkaTable = { + new KafkaTable(strategy(options.asScala.toMap)) } /** @@ -358,11 +359,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister override def schema(): StructType = KafkaOffsetReader.kafkaSchema - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder { override def build(): Scan = new KafkaScan(options) } - override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { new WriteBuilder { private var inputSchema: StructType = _ @@ -375,20 +376,20 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister import scala.collection.JavaConverters._ assert(inputSchema != null) - val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim) - val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap) + val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim) + val producerParams = kafkaParamsForProducer(options.asScala.toMap) new KafkaStreamingWrite(topic, producerParams, inputSchema) } } } } - class KafkaScan(options: DataSourceOptions) extends Scan { + class KafkaScan(options: CaseInsensitiveStringMap) extends Scan { override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { - val parameters = options.asMap().asScala.toMap + val parameters = options.asScala.toMap validateStreamOptions(parameters) // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group @@ -417,7 +418,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } override def toContinuousStream(checkpointLocation: String): ContinuousStream = { - val parameters = options.asMap().asScala.toMap + val parameters = options.asScala.toMap validateStreamOptions(parameters) // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since Kafka will assign partitions to multiple consumers having the same group diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 8fd5790d753af..21634ae2abfa1 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -41,10 +41,10 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.util.CaseInsensitiveStringMap abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest { @@ -1118,7 +1118,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase { "kafka.bootstrap.servers" -> testUtils.brokerAddress, "subscribe" -> topic ) ++ Option(minPartitions).map { p => "minPartitions" -> p} - val dsOptions = new DataSourceOptions(options.asJava) + val dsOptions = new CaseInsensitiveStringMap(options.asJava) val table = provider.getTable(dsOptions) val stream = table.newScanBuilder(dsOptions).build().toMicroBatchStream(dir.getAbsolutePath) val inputPartitions = stream.planInputPartitions( diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala index 2ccf3e291bea7..7ffdaab3e74fb 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala @@ -22,13 +22,13 @@ import scala.collection.JavaConverters._ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.sources.v2.DataSourceOptions +import org.apache.spark.sql.util.CaseInsensitiveStringMap class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { def testWithMinPartitions(name: String, minPartition: Int) (f: KafkaOffsetRangeCalculator => Unit): Unit = { - val options = new DataSourceOptions(Map("minPartitions" -> minPartition.toString).asJava) + val options = new CaseInsensitiveStringMap(Map("minPartitions" -> minPartition.toString).asJava) test(s"with minPartition = $minPartition: $name") { f(KafkaOffsetRangeCalculator(options)) } @@ -36,7 +36,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { test("with no minPartition: N TopicPartitions to N offset ranges") { - val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty()) + val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty()) assert( calc.getRanges( fromOffsets = Map(tp1 -> 1), @@ -64,7 +64,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite { } test("with no minPartition: empty ranges ignored") { - val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty()) + val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty()) assert( calc.getRanges( fromOffsets = Map(tp1 -> 1, tp2 -> 1), diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java index 8c5a6c61d8658..b59334b208903 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/util/CaseInsensitiveStringMap.java @@ -31,19 +31,20 @@ * This is used to pass options to v2 implementations to ensure consistent case insensitivity. *

* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return - * keys converted to lower case. + * keys converted to lower case. This map doesn't allow null key. */ @Experimental public class CaseInsensitiveStringMap implements Map { public static CaseInsensitiveStringMap empty() { - return new CaseInsensitiveStringMap(); + return new CaseInsensitiveStringMap(new HashMap<>(0)); } private final Map delegate; - private CaseInsensitiveStringMap() { - this.delegate = new HashMap<>(); + public CaseInsensitiveStringMap(Map originalMap) { + this.delegate = new HashMap<>(originalMap.size()); + putAll(originalMap); } @Override @@ -56,9 +57,13 @@ public boolean isEmpty() { return delegate.isEmpty(); } + private String toLowerCase(Object key) { + return key.toString().toLowerCase(Locale.ROOT); + } + @Override public boolean containsKey(Object key) { - return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT)); + return delegate.containsKey(toLowerCase(key)); } @Override @@ -68,17 +73,17 @@ public boolean containsValue(Object value) { @Override public String get(Object key) { - return delegate.get(key.toString().toLowerCase(Locale.ROOT)); + return delegate.get(toLowerCase(key)); } @Override public String put(String key, String value) { - return delegate.put(key.toLowerCase(Locale.ROOT), value); + return delegate.put(toLowerCase(key), value); } @Override public String remove(Object key) { - return delegate.remove(key.toString().toLowerCase(Locale.ROOT)); + return delegate.remove(toLowerCase(key)); } @Override @@ -107,4 +112,40 @@ public Collection values() { public Set> entrySet() { return delegate.entrySet(); } + + /** + * Returns the boolean value to which the specified key is mapped, + * or defaultValue if there is no mapping for the key. The key match is case-insensitive + */ + public boolean getBoolean(String key, boolean defaultValue) { + String value = get(key); + return value == null ? defaultValue : Boolean.parseBoolean(value); + } + + /** + * Returns the integer value to which the specified key is mapped, + * or defaultValue if there is no mapping for the key. The key match is case-insensitive + */ + public int getInt(String key, int defaultValue) { + String value = get(key); + return value == null ? defaultValue : Integer.parseInt(value); + } + + /** + * Returns the long value to which the specified key is mapped, + * or defaultValue if there is no mapping for the key. The key match is case-insensitive + */ + public long getLong(String key, long defaultValue) { + String value = get(key); + return value == null ? defaultValue : Long.parseLong(value); + } + + /** + * Returns the double value to which the specified key is mapped, + * or defaultValue if there is no mapping for the key. The key match is case-insensitive + */ + public double getDouble(String key, double defaultValue) { + String value = get(key); + return value == null ? defaultValue : Double.parseDouble(value); + } } diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java deleted file mode 100644 index 76392777d42a4..0000000000000 --- a/sql/catalyst/src/test/java/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.util; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashSet; -import java.util.Set; - -public class CaseInsensitiveStringMapSuite { - @Test - public void testPutAndGet() { - CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); - options.put("kEy", "valUE"); - - Assert.assertEquals("Should return correct value for lower-case key", - "valUE", options.get("key")); - Assert.assertEquals("Should return correct value for upper-case key", - "valUE", options.get("KEY")); - } - - @Test - public void testKeySet() { - CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); - options.put("kEy", "valUE"); - - Set expectedKeySet = new HashSet<>(); - expectedKeySet.add("key"); - - Assert.assertEquals("Should return lower-case key set", expectedKeySet, options.keySet()); - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala similarity index 54% rename from sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala rename to sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala index cfa69a86de1a7..73d3a769533de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceOptionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/CaseInsensitiveStringMapSuite.scala @@ -15,31 +15,29 @@ * limitations under the License. */ -package org.apache.spark.sql.sources.v2 +package org.apache.spark.sql.util import scala.collection.JavaConverters._ import org.apache.spark.SparkFunSuite -/** - * A simple test suite to verify `DataSourceOptions`. - */ -class DataSourceOptionsSuite extends SparkFunSuite { +class CaseInsensitiveStringMapSuite extends SparkFunSuite { - test("key is case-insensitive") { - val options = new DataSourceOptions(Map("foo" -> "bar").asJava) - assert(options.get("foo").get() == "bar") - assert(options.get("FoO").get() == "bar") - assert(!options.get("abc").isPresent) + test("put and get") { + val options = CaseInsensitiveStringMap.empty() + options.put("kEy", "valUE") + assert(options.get("key") == "valUE") + assert(options.get("KEY") == "valUE") } - test("value is case-sensitive") { - val options = new DataSourceOptions(Map("foo" -> "bAr").asJava) - assert(options.get("foo").get == "bAr") + test("key and value set") { + val options = new CaseInsensitiveStringMap(Map("kEy" -> "valUE").asJava) + assert(options.keySet().asScala == Set("key")) + assert(options.values().asScala.toSeq == Seq("valUE")) } test("getInt") { - val options = new DataSourceOptions(Map("numFOo" -> "1", "foo" -> "bar").asJava) + val options = new CaseInsensitiveStringMap(Map("numFOo" -> "1", "foo" -> "bar").asJava) assert(options.getInt("numFOO", 10) == 1) assert(options.getInt("numFOO2", 10) == 10) @@ -49,7 +47,7 @@ class DataSourceOptionsSuite extends SparkFunSuite { } test("getBoolean") { - val options = new DataSourceOptions( + val options = new CaseInsensitiveStringMap( Map("isFoo" -> "true", "isFOO2" -> "false", "foo" -> "bar").asJava) assert(options.getBoolean("isFoo", false)) assert(!options.getBoolean("isFoo2", true)) @@ -59,7 +57,7 @@ class DataSourceOptionsSuite extends SparkFunSuite { } test("getLong") { - val options = new DataSourceOptions(Map("numFoo" -> "9223372036854775807", + val options = new CaseInsensitiveStringMap(Map("numFoo" -> "9223372036854775807", "foo" -> "bar").asJava) assert(options.getLong("numFOO", 0L) == 9223372036854775807L) assert(options.getLong("numFoo2", -1L) == -1L) @@ -70,7 +68,7 @@ class DataSourceOptionsSuite extends SparkFunSuite { } test("getDouble") { - val options = new DataSourceOptions(Map("numFoo" -> "922337.1", + val options = new CaseInsensitiveStringMap(Map("numFoo" -> "922337.1", "foo" -> "bar").asJava) assert(options.getDouble("numFOO", 0d) == 922337.1d) assert(options.getDouble("numFoo2", -1.02d) == -1.02d) @@ -79,29 +77,4 @@ class DataSourceOptionsSuite extends SparkFunSuite { options.getDouble("foo", 0.1d) } } - - test("standard options") { - val options = new DataSourceOptions(Map( - DataSourceOptions.PATH_KEY -> "abc", - DataSourceOptions.TABLE_KEY -> "tbl").asJava) - - assert(options.paths().toSeq == Seq("abc")) - assert(options.tableName().get() == "tbl") - assert(!options.databaseName().isPresent) - } - - test("standard options with both singular path and multi-paths") { - val options = new DataSourceOptions(Map( - DataSourceOptions.PATH_KEY -> "abc", - DataSourceOptions.PATHS_KEY -> """["c", "d"]""").asJava) - - assert(options.paths().toSeq == Seq("abc", "c", "d")) - } - - test("standard options with only multi-paths") { - val options = new DataSourceOptions(Map( - DataSourceOptions.PATHS_KEY -> """["c", "d\"e"]""").asJava) - - assert(options.paths().toSeq == Seq("c", "d\"e")) - } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java deleted file mode 100644 index 00af0bf1b172c..0000000000000 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/DataSourceOptions.java +++ /dev/null @@ -1,210 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.sources.v2; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.stream.Stream; - -import com.fasterxml.jackson.databind.ObjectMapper; - -import org.apache.spark.annotation.Evolving; - -/** - * An immutable string-to-string map in which keys are case-insensitive. This is used to represent - * data source options. - * - * Each data source implementation can define its own options and teach its users how to set them. - * Spark doesn't have any restrictions about what options a data source should or should not have. - * Instead Spark defines some standard options that data sources can optionally adopt. It's possible - * that some options are very common and many data sources use them. However different data - * sources may define the common options(key and meaning) differently, which is quite confusing to - * end users. - * - * The standard options defined by Spark: - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - * - *
Option keyOption value
pathA path string of the data files/directories, like - * path1, /absolute/file2, path3/*. The path can - * either be relative or absolute, points to either file or directory, and can contain - * wildcards. This option is commonly used by file-based data sources.
pathsA JSON array style paths string of the data files/directories, like - * ["path1", "/absolute/file2"]. The format of each path is same as the - * path option, plus it should follow JSON string literal format, e.g. quotes - * should be escaped, pa\"th means pa"th. - *
tableA table name string representing the table name directly without any interpretation. - * For example, db.tbl means a table called db.tbl, not a table called tbl - * inside database db. `t*b.l` means a table called `t*b.l`, not t*b.l.
databaseA database name string representing the database name directly without any - * interpretation, which is very similar to the table name option.
- */ -@Evolving -public class DataSourceOptions { - private final Map keyLowerCasedMap; - - private String toLowerCase(String key) { - return key.toLowerCase(Locale.ROOT); - } - - public static DataSourceOptions empty() { - return new DataSourceOptions(new HashMap<>()); - } - - public DataSourceOptions(Map originalMap) { - keyLowerCasedMap = new HashMap<>(originalMap.size()); - for (Map.Entry entry : originalMap.entrySet()) { - keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue()); - } - } - - public Map asMap() { - return new HashMap<>(keyLowerCasedMap); - } - - /** - * Returns the option value to which the specified key is mapped, case-insensitively. - */ - public Optional get(String key) { - return Optional.ofNullable(keyLowerCasedMap.get(toLowerCase(key))); - } - - /** - * Returns the boolean value to which the specified key is mapped, - * or defaultValue if there is no mapping for the key. The key match is case-insensitive - */ - public boolean getBoolean(String key, boolean defaultValue) { - String lcaseKey = toLowerCase(key); - return keyLowerCasedMap.containsKey(lcaseKey) ? - Boolean.parseBoolean(keyLowerCasedMap.get(lcaseKey)) : defaultValue; - } - - /** - * Returns the integer value to which the specified key is mapped, - * or defaultValue if there is no mapping for the key. The key match is case-insensitive - */ - public int getInt(String key, int defaultValue) { - String lcaseKey = toLowerCase(key); - return keyLowerCasedMap.containsKey(lcaseKey) ? - Integer.parseInt(keyLowerCasedMap.get(lcaseKey)) : defaultValue; - } - - /** - * Returns the long value to which the specified key is mapped, - * or defaultValue if there is no mapping for the key. The key match is case-insensitive - */ - public long getLong(String key, long defaultValue) { - String lcaseKey = toLowerCase(key); - return keyLowerCasedMap.containsKey(lcaseKey) ? - Long.parseLong(keyLowerCasedMap.get(lcaseKey)) : defaultValue; - } - - /** - * Returns the double value to which the specified key is mapped, - * or defaultValue if there is no mapping for the key. The key match is case-insensitive - */ - public double getDouble(String key, double defaultValue) { - String lcaseKey = toLowerCase(key); - return keyLowerCasedMap.containsKey(lcaseKey) ? - Double.parseDouble(keyLowerCasedMap.get(lcaseKey)) : defaultValue; - } - - /** - * The option key for singular path. - */ - public static final String PATH_KEY = "path"; - - /** - * The option key for multiple paths. - */ - public static final String PATHS_KEY = "paths"; - - /** - * The option key for table name. - */ - public static final String TABLE_KEY = "table"; - - /** - * The option key for database name. - */ - public static final String DATABASE_KEY = "database"; - - /** - * The option key for whether to check existence of files for a table. - */ - public static final String CHECK_FILES_EXIST_KEY = "check_files_exist"; - - /** - * Returns all the paths specified by both the singular path option and the multiple - * paths option. - */ - public String[] paths() { - String[] singularPath = - get(PATH_KEY).map(s -> new String[]{s}).orElseGet(() -> new String[0]); - Optional pathsStr = get(PATHS_KEY); - if (pathsStr.isPresent()) { - ObjectMapper objectMapper = new ObjectMapper(); - try { - String[] paths = objectMapper.readValue(pathsStr.get(), String[].class); - return Stream.of(singularPath, paths).flatMap(Stream::of).toArray(String[]::new); - } catch (IOException e) { - return singularPath; - } - } else { - return singularPath; - } - } - - /** - * Returns the value of the table name option. - */ - public Optional tableName() { - return get(TABLE_KEY); - } - - /** - * Returns the value of the database name option. - */ - public Optional databaseName() { - return get(DATABASE_KEY); - } - - public Boolean checkFilesExist() { - Optional result = get(CHECK_FILES_EXIST_KEY); - return result.isPresent() && result.get().equals("true"); - } -} diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java index 6c5a95d2a75b7..ea7c5d2b108f0 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchRead.java @@ -20,13 +20,14 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.v2.reader.Scan; import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * An empty mix-in interface for {@link Table}, to indicate this table supports batch scan. *

* If a {@link Table} implements this interface, the - * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that - * builds {@link Scan} with {@link Scan#toBatch()} implemented. + * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder} + * that builds {@link Scan} with {@link Scan#toBatch()} implemented. *

*/ @Evolving diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java index b2cd97a2f5332..09e23f84fd6bf 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsBatchWrite.java @@ -19,13 +19,14 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.v2.writer.WriteBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * An empty mix-in interface for {@link Table}, to indicate this table supports batch write. *

* If a {@link Table} implements this interface, the - * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder} - * with {@link WriteBuilder#buildForBatch()} implemented. + * {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)} must return a + * {@link WriteBuilder} with {@link WriteBuilder#buildForBatch()} implemented. *

*/ @Evolving diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java index b7fa3f24a238c..5cc9848d9da89 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsContinuousRead.java @@ -20,14 +20,15 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.v2.reader.Scan; import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with * continuous mode. *

* If a {@link Table} implements this interface, the - * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that - * builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented. + * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder} + * that builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented. *

*/ @Evolving diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java index 9408e323f9da1..c98f3f1aa5cba 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsMicroBatchRead.java @@ -20,14 +20,15 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.v2.reader.Scan; import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with * micro-batch mode. *

* If a {@link Table} implements this interface, the - * {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that - * builds {@link Scan} with {@link Scan#toMicroBatchStream(String)} implemented. + * {@link SupportsRead#newScanBuilder(CaseInsensitiveStringMap)} must return a {@link ScanBuilder} + * that builds {@link Scan} with {@link Scan#toMicroBatchStream(String)} implemented. *

*/ @Evolving diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java index 5031c71c0fd4d..14990effeda37 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsRead.java @@ -19,11 +19,12 @@ import org.apache.spark.sql.sources.v2.reader.Scan; import org.apache.spark.sql.sources.v2.reader.ScanBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * An internal base interface of mix-in interfaces for readable {@link Table}. This adds - * {@link #newScanBuilder(DataSourceOptions)} that is used to create a scan for batch, micro-batch, - * or continuous processing. + * {@link #newScanBuilder(CaseInsensitiveStringMap)} that is used to create a scan for batch, + * micro-batch, or continuous processing. */ interface SupportsRead extends Table { @@ -34,5 +35,5 @@ interface SupportsRead extends Table { * @param options The options for reading, which is an immutable case-insensitive * string-to-string map. */ - ScanBuilder newScanBuilder(DataSourceOptions options); + ScanBuilder newScanBuilder(CaseInsensitiveStringMap options); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java index 1050d35250c1f..ac11e483c18c4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsStreamingWrite.java @@ -20,13 +20,14 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.execution.streaming.BaseStreamingSink; import org.apache.spark.sql.sources.v2.writer.WriteBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * An empty mix-in interface for {@link Table}, to indicate this table supports streaming write. *

* If a {@link Table} implements this interface, the - * {@link SupportsWrite#newWriteBuilder(DataSourceOptions)} must return a {@link WriteBuilder} - * with {@link WriteBuilder#buildForStreaming()} implemented. + * {@link SupportsWrite#newWriteBuilder(CaseInsensitiveStringMap)} must return a + * {@link WriteBuilder} with {@link WriteBuilder#buildForStreaming()} implemented. *

*/ @Evolving diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java index ecdfe20730254..f0d8e44f15287 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/SupportsWrite.java @@ -19,10 +19,11 @@ import org.apache.spark.sql.sources.v2.writer.BatchWrite; import org.apache.spark.sql.sources.v2.writer.WriteBuilder; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * An internal base interface of mix-in interfaces for writable {@link Table}. This adds - * {@link #newWriteBuilder(DataSourceOptions)} that is used to create a write + * {@link #newWriteBuilder(CaseInsensitiveStringMap)} that is used to create a write * for batch or streaming. */ interface SupportsWrite extends Table { @@ -31,5 +32,5 @@ interface SupportsWrite extends Table { * Returns a {@link WriteBuilder} which can be used to create {@link BatchWrite}. Spark will call * this method to configure each data source write. */ - WriteBuilder newWriteBuilder(DataSourceOptions options); + WriteBuilder newWriteBuilder(CaseInsensitiveStringMap options); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java index a9b83b6de9950..04ad8fd90be9f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java +++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/TableProvider.java @@ -20,6 +20,7 @@ import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.sources.DataSourceRegister; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * The base interface for v2 data sources which don't have a real catalog. Implementations must @@ -37,7 +38,7 @@ public interface TableProvider { * @param options the user-specified options that can identify a table, e.g. file path, Kafka * topic name, etc. It's an immutable case-insensitive string-to-string map. */ - Table getTable(DataSourceOptions options); + Table getTable(CaseInsensitiveStringMap options); /** * Return a {@link Table} instance to do read/write with user-specified schema and options. @@ -50,7 +51,7 @@ public interface TableProvider { * @param schema the user-specified schema. * @throws UnsupportedOperationException */ - default Table getTable(DataSourceOptions options, StructType schema) { + default Table getTable(CaseInsensitiveStringMap options, StructType schema) { String name; if (this instanceof DataSourceRegister) { name = ((DataSourceRegister) this).shortName(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index a8562581ee85d..9f37e2b125f41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -40,6 +40,7 @@ import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils, FileDataSourceV2, FileTable} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.unsafe.types.UTF8String /** @@ -176,7 +177,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { */ def load(path: String): DataFrame = { // force invocation of `load(...varargs...)` - option(DataSourceOptions.PATH_KEY, path).load(Seq.empty: _*) + option("path", path).load(Seq.empty: _*) } /** @@ -208,18 +209,19 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { source = provider, conf = sparkSession.sessionState.conf) val pathsOption = { val objectMapper = new ObjectMapper() - DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray) + "path" -> objectMapper.writeValueAsString(paths.toArray) } - val checkFilesExistsOption = DataSourceOptions.CHECK_FILES_EXIST_KEY -> "true" + // TODO: remove this option. + val checkFilesExistsOption = "check_files_exist" -> "true" val finalOptions = sessionOptions ++ extraOptions.toMap + pathsOption + checkFilesExistsOption - val dsOptions = new DataSourceOptions(finalOptions.asJava) + val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) val table = userSpecifiedSchema match { case Some(schema) => provider.getTable(dsOptions, schema) case _ => provider.getTable(dsOptions) } table match { case _: SupportsBatchRead => - Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, finalOptions)) + Dataset.ofRows(sparkSession, DataSourceV2Relation.create(table, dsOptions)) case _ => loadV1Source(paths: _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 8d4d60ebdb547..d438f76d67679 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems, @@ -260,12 +261,13 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( provider, session.sessionState.conf) - val checkFilesExistsOption = DataSourceOptions.CHECK_FILES_EXIST_KEY -> "false" + // TODO: remove this option. + val checkFilesExistsOption = "check_files_exist" -> "false" val options = sessionOptions ++ extraOptions + checkFilesExistsOption - val dsOptions = new DataSourceOptions(options.asJava) + val dsOptions = new CaseInsensitiveStringMap(options.asJava) provider.getTable(dsOptions) match { case table: SupportsBatchWrite => - lazy val relation = DataSourceV2Relation.create(table, options) + lazy val relation = DataSourceV2Relation.create(table, dsOptions) mode match { case SaveMode.Append => runCommand(df.sparkSession, "save") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala index e22d6a6d399a5..7c72495548e3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallbackOrcDataSourceV2.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import scala.collection.JavaConverters._ + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule @@ -33,10 +35,15 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable */ class FallbackOrcDataSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoTable(d @DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) => + case i @ InsertIntoTable(d @ DataSourceV2Relation(table: OrcTable, _, _), _, _, _, _) => val v1FileFormat = new OrcFileFormat - val relation = HadoopFsRelation(table.fileIndex, table.fileIndex.partitionSchema, - table.schema(), None, v1FileFormat, d.options)(sparkSession) + val relation = HadoopFsRelation( + table.fileIndex, + table.fileIndex.partitionSchema, + table.schema(), + None, + v1FileFormat, + d.options.asScala.toMap)(sparkSession) i.copy(table = LogicalRelation(relation)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala index 22a74e3ccaeee..aa2a5e9a06fbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/noop/NoopDataSource.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * This is no-op datasource. It does not do anything besides consuming its input. @@ -31,11 +32,11 @@ import org.apache.spark.sql.types.StructType */ class NoopDataSource extends TableProvider with DataSourceRegister { override def shortName(): String = "noop" - override def getTable(options: DataSourceOptions): Table = NoopTable + override def getTable(options: CaseInsensitiveStringMap): Table = NoopTable } private[noop] object NoopTable extends Table with SupportsBatchWrite with SupportsStreamingWrite { - override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = NoopWriteBuilder + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = NoopWriteBuilder override def name(): String = "noop-table" override def schema(): StructType = new StructType() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala index c8542bfe5e59b..2081af35ce2d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala @@ -17,10 +17,8 @@ package org.apache.spark.sql.execution.datasources.v2 -import scala.collection.JavaConverters._ - import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table} +import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table} object DataSourceV2Implicits { implicit class TableHelper(table: Table) { @@ -42,8 +40,4 @@ object DataSourceV2Implicits { } } } - - implicit class OptionsHelper(options: Map[String, String]) { - def toDataSourceOptions: DataSourceOptions = new DataSourceOptions(options.asJava) - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 891694be46291..17407827d0564 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.{Statistics => V2Statistics, _} import org.apache.spark.sql.sources.v2.reader.streaming.{Offset, SparkDataStream} import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A logical plan representing a data source v2 table. @@ -36,7 +37,7 @@ import org.apache.spark.sql.sources.v2.writer._ case class DataSourceV2Relation( table: Table, output: Seq[AttributeReference], - options: Map[String, String]) + options: CaseInsensitiveStringMap) extends LeafNode with MultiInstanceRelation with NamedRelation { import DataSourceV2Implicits._ @@ -48,7 +49,7 @@ case class DataSourceV2Relation( } def newScanBuilder(): ScanBuilder = { - table.asBatchReadable.newScanBuilder(options.toDataSourceOptions) + table.asBatchReadable.newScanBuilder(options) } override def computeStats(): Statistics = { @@ -96,7 +97,7 @@ case class StreamingDataSourceV2Relation( } object DataSourceV2Relation { - def create(table: Table, options: Map[String, String]): DataSourceV2Relation = { + def create(table: Table, options: CaseInsensitiveStringMap): DataSourceV2Relation = { val output = table.schema().toAttributes DataSourceV2Relation(table, output, options) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index bf606267aa34d..424fbed6fc1e6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -148,8 +148,7 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil case AppendData(r: DataSourceV2Relation, query, _) => - AppendDataExec( - r.table.asBatchWritable, r.options.toDataSourceOptions, planLater(query)) :: Nil + AppendDataExec(r.table.asBatchWritable, r.options, planLater(query)) :: Nil case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, _) => // fail if any filter cannot be converted. correctness depends on removing all matching data. @@ -159,11 +158,10 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { }.toArray OverwriteByExpressionExec( - r.table.asBatchWritable, filters, r.options.toDataSourceOptions, planLater(query)) :: Nil + r.table.asBatchWritable, filters, r.options, planLater(query)) :: Nil case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _) => - OverwritePartitionsDynamicExec(r.table.asBatchWritable, - r.options.toDataSourceOptions, planLater(query)) :: Nil + OverwritePartitionsDynamicExec(r.table.asBatchWritable, r.options, planLater(query)) :: Nil case WriteToContinuousDataSource(writer, query) => WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index 06c57066aa240..8c0973299e903 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -16,10 +16,15 @@ */ package org.apache.spark.sql.execution.datasources.v2 +import java.io.IOException + +import com.fasterxml.jackson.databind.ObjectMapper + import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.sources.DataSourceRegister import org.apache.spark.sql.sources.v2.TableProvider +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A base interface for data source v2 implementations of the built-in file-based data sources. @@ -35,4 +40,18 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { def fallBackFileFormat: Class[_ <: FileFormat] lazy val sparkSession = SparkSession.active + + protected def getPaths(map: CaseInsensitiveStringMap): Seq[String] = { + val objectMapper = new ObjectMapper() + Option(map.get("path")).map { pathStr => + try { + val paths = objectMapper.readValue(pathStr, classOf[Array[String]]) + paths.toSeq + } catch { + case _: IOException => Seq(pathStr) + } + }.getOrElse { + throw new IllegalArgumentException("'path' must be given when reading files.") + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index 21d3e5e29cfb5..08873a3b5a643 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -22,23 +22,27 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchRead, SupportsBatchWrite, Table} +import org.apache.spark.sql.sources.v2.{SupportsBatchRead, SupportsBatchWrite, Table} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap abstract class FileTable( sparkSession: SparkSession, - options: DataSourceOptions, + options: CaseInsensitiveStringMap, + paths: Seq[String], userSpecifiedSchema: Option[StructType]) extends Table with SupportsBatchRead with SupportsBatchWrite { + lazy val fileIndex: PartitioningAwareFileIndex = { - val filePaths = options.paths() - val hadoopConf = - sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap) - val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(filePaths, hadoopConf, - checkEmptyGlobPath = true, checkFilesExist = options.checkFilesExist()) + val scalaMap = options.asScala.toMap + val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(scalaMap) + // This is an internal config so must be present. + val checkFilesExist = options.get("check_files_exist").toBoolean + val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(paths, hadoopConf, + checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) - new InMemoryFileIndex(sparkSession, rootPathsSpecified, - options.asMap().asScala.toMap, userSpecifiedSchema, fileStatusCache) + new InMemoryFileIndex( + sparkSession, rootPathsSpecified, scalaMap, userSpecifiedSchema, fileStatusCache) } lazy val dataSchema: StructType = userSpecifiedSchema.orElse { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala index 75c922424e8ef..e16ee4c460f39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileWriteBuilder.scala @@ -33,12 +33,12 @@ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, DataSource, OutputWriterFactory, WriteJobDescription} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.writer.{BatchWrite, SupportsSaveMode, WriteBuilder} import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration -abstract class FileWriteBuilder(options: DataSourceOptions) +abstract class FileWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String]) extends WriteBuilder with SupportsSaveMode { private var schema: StructType = _ private var queryId: String = _ @@ -61,18 +61,17 @@ abstract class FileWriteBuilder(options: DataSourceOptions) override def buildForBatch(): BatchWrite = { validateInputs() - val pathName = options.paths().head - val path = new Path(pathName) + val path = new Path(paths.head) val sparkSession = SparkSession.active - val optionsAsScala = options.asMap().asScala.toMap + val optionsAsScala = options.asScala.toMap val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(optionsAsScala) val job = getJobInstance(hadoopConf, path) val committer = FileCommitProtocol.instantiate( sparkSession.sessionState.conf.fileCommitProtocolClass, jobId = java.util.UUID.randomUUID().toString, - outputPath = pathName) + outputPath = paths.head) lazy val description = - createWriteJobDescription(sparkSession, hadoopConf, job, pathName, optionsAsScala) + createWriteJobDescription(sparkSession, hadoopConf, job, paths.head, optionsAsScala) val fs = path.getFileSystem(hadoopConf) mode match { @@ -127,7 +126,7 @@ abstract class FileWriteBuilder(options: DataSourceOptions) assert(schema != null, "Missing input data schema") assert(queryId != null, "Missing query ID") assert(mode != null, "Missing save mode") - assert(options.paths().length == 1) + assert(paths.length == 1) DataSource.validateSchema(schema) schema.foreach { field => if (!supportsDataType(field.dataType)) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index d7cb2457433b0..51606abdb563a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -31,8 +31,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.sources.{AlwaysTrue, Filter} -import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsBatchWrite} +import org.apache.spark.sql.sources.v2.SupportsBatchWrite import org.apache.spark.sql.sources.v2.writer.{BatchWrite, DataWriterFactory, SupportsDynamicOverwrite, SupportsOverwrite, SupportsSaveMode, SupportsTruncate, WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{LongAccumulator, Utils} /** @@ -53,7 +54,7 @@ case class WriteToDataSourceV2(batchWrite: BatchWrite, query: LogicalPlan) */ case class AppendDataExec( table: SupportsBatchWrite, - writeOptions: DataSourceOptions, + writeOptions: CaseInsensitiveStringMap, query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { override protected def doExecute(): RDD[InternalRow] = { @@ -81,7 +82,7 @@ case class AppendDataExec( case class OverwriteByExpressionExec( table: SupportsBatchWrite, deleteWhere: Array[Filter], - writeOptions: DataSourceOptions, + writeOptions: CaseInsensitiveStringMap, query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { private def isTruncate(filters: Array[Filter]): Boolean = { @@ -118,7 +119,7 @@ case class OverwriteByExpressionExec( */ case class OverwritePartitionsDynamicExec( table: SupportsBatchWrite, - writeOptions: DataSourceOptions, + writeOptions: CaseInsensitiveStringMap, query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper { override protected def doExecute(): RDD[InternalRow] = { @@ -139,12 +140,9 @@ case class OverwritePartitionsDynamicExec( case class WriteToDataSourceV2Exec( batchWrite: BatchWrite, - query: SparkPlan - ) extends V2TableWriteExec { + query: SparkPlan) extends V2TableWriteExec { - import DataSourceV2Implicits._ - - def writeOptions: DataSourceOptions = Map.empty[String, String].toDataSourceOptions + def writeOptions: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty() override protected def doExecute(): RDD[InternalRow] = { doWrite(batchWrite) @@ -157,7 +155,7 @@ case class WriteToDataSourceV2Exec( trait BatchWriteHelper { def table: SupportsBatchWrite def query: SparkPlan - def writeOptions: DataSourceOptions + def writeOptions: CaseInsensitiveStringMap def newWriteBuilder(): WriteBuilder = { table.newWriteBuilder(writeOptions) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala index f279af49ba9cf..900c94e937ffc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcDataSourceV2.scala @@ -19,8 +19,9 @@ package org.apache.spark.sql.execution.datasources.v2.orc import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.v2._ -import org.apache.spark.sql.sources.v2.{DataSourceOptions, Table} +import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap class OrcDataSourceV2 extends FileDataSourceV2 { @@ -28,18 +29,20 @@ class OrcDataSourceV2 extends FileDataSourceV2 { override def shortName(): String = "orc" - private def getTableName(options: DataSourceOptions): String = { - shortName() + ":" + options.paths().mkString(";") + private def getTableName(paths: Seq[String]): String = { + shortName() + ":" + paths.mkString(";") } - override def getTable(options: DataSourceOptions): Table = { - val tableName = getTableName(options) - OrcTable(tableName, sparkSession, options, None) + override def getTable(options: CaseInsensitiveStringMap): Table = { + val paths = getPaths(options) + val tableName = getTableName(paths) + OrcTable(tableName, sparkSession, options, paths, None) } - override def getTable(options: DataSourceOptions, schema: StructType): Table = { - val tableName = getTableName(options) - OrcTable(tableName, sparkSession, options, Some(schema)) + override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { + val paths = getPaths(options) + val tableName = getTableName(paths) + OrcTable(tableName, sparkSession, options, paths, Some(schema)) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala index eb27bbd3abeaa..0b153416b7bb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcScanBuilder.scala @@ -26,18 +26,17 @@ import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex import org.apache.spark.sql.execution.datasources.orc.OrcFilters import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder import org.apache.spark.sql.sources.Filter -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader.Scan import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap case class OrcScanBuilder( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, schema: StructType, dataSchema: StructType, - options: DataSourceOptions) extends FileScanBuilder(schema) { - lazy val hadoopConf = - sparkSession.sessionState.newHadoopConfWithOptions(options.asMap().asScala.toMap) + options: CaseInsensitiveStringMap) extends FileScanBuilder(schema) { + lazy val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(options.asScala.toMap) override def build(): Scan = { OrcScan(sparkSession, hadoopConf, fileIndex, dataSchema, readSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala index 249df8b8622fb..aac38fb3fa1ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -21,22 +21,24 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.orc.OrcUtils import org.apache.spark.sql.execution.datasources.v2.FileTable -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.writer.WriteBuilder import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap case class OrcTable( name: String, sparkSession: SparkSession, - options: DataSourceOptions, + options: CaseInsensitiveStringMap, + paths: Seq[String], userSpecifiedSchema: Option[StructType]) - extends FileTable(sparkSession, options, userSpecifiedSchema) { - override def newScanBuilder(options: DataSourceOptions): OrcScanBuilder = + extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { + + override def newScanBuilder(options: CaseInsensitiveStringMap): OrcScanBuilder = new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) override def inferSchema(files: Seq[FileStatus]): Option[StructType] = OrcUtils.readSchema(sparkSession, files) - override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = - new OrcWriteBuilder(options) + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = + new OrcWriteBuilder(options, paths) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala index 1aec4d872a64d..829ab5fbe1768 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcWriteBuilder.scala @@ -25,10 +25,12 @@ import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFac import org.apache.spark.sql.execution.datasources.orc.{OrcFileFormat, OrcOptions, OrcOutputWriter, OrcUtils} import org.apache.spark.sql.execution.datasources.v2.FileWriteBuilder import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class OrcWriteBuilder(options: CaseInsensitiveStringMap, paths: Seq[String]) + extends FileWriteBuilder(options, paths) { -class OrcWriteBuilder(options: DataSourceOptions) extends FileWriteBuilder(options) { override def prepareWrite( sqlConf: SQLConf, job: Job, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index de7cbe25ceb3b..8cf26ddc3f00a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -95,9 +95,8 @@ class MicroBatchExecution( val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" nextSourceId += 1 logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]") - val dsOptions = new DataSourceOptions(options.asJava) // TODO: operator pushdown. - val scan = table.newScanBuilder(dsOptions).build() + val scan = table.newScanBuilder(options).build() val stream = scan.toMicroBatchStream(metadataPath) StreamingDataSourceV2Relation(output, scan, stream) }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bba640eea7e5b..d7c07bf2c364a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -40,10 +40,11 @@ import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.command.StreamingExplainCommand import org.apache.spark.sql.execution.datasources.v2.StreamWriterCommitProgress import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite} +import org.apache.spark.sql.sources.v2.SupportsStreamingWrite import org.apache.spark.sql.sources.v2.writer.SupportsTruncate import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{Clock, UninterruptibleThread, Utils} /** States for [[StreamExecution]]'s lifecycle. */ @@ -584,7 +585,7 @@ abstract class StreamExecution( table: SupportsStreamingWrite, options: Map[String, String], inputPlan: LogicalPlan): StreamingWrite = { - val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava)) + val writeBuilder = table.newWriteBuilder(new CaseInsensitiveStringMap(options.asJava)) .withQueryId(runId.toString) .withInputDataSchema(inputPlan.schema) outputMode match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index 1b7aa548e6d21..0d7e9ba363d01 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.sources.v2.{Table, TableProvider} +import org.apache.spark.sql.util.CaseInsensitiveStringMap object StreamingRelation { def apply(dataSource: DataSource): StreamingRelation = { @@ -95,7 +96,7 @@ case class StreamingRelationV2( source: TableProvider, sourceName: String, table: Table, - extraOptions: Map[String, String], + extraOptions: CaseInsensitiveStringMap, output: Seq[Attribute], v1Relation: Option[StreamingRelation])(session: SparkSession) extends LeafNode with MultiInstanceRelation { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala index 923bd749b29b3..dbdfcf8085604 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/console.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.writer.{SupportsTruncate, WriteBuilder} import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap case class ConsoleRelation(override val sqlContext: SQLContext, data: DataFrame) extends BaseRelation { @@ -34,7 +35,7 @@ class ConsoleSinkProvider extends TableProvider with DataSourceRegister with CreatableRelationProvider { - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { ConsoleTable } @@ -62,7 +63,7 @@ object ConsoleTable extends Table with SupportsStreamingWrite { override def schema(): StructType = StructType(Nil) - override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { new WriteBuilder with SupportsTruncate { private var inputSchema: StructType = _ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 26b5642010335..70fbad38d4788 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -22,7 +22,6 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import java.util.function.UnaryOperator -import scala.collection.JavaConverters._ import scala.collection.mutable.{Map => MutableMap} import org.apache.spark.SparkEnv @@ -33,7 +32,7 @@ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _} import org.apache.spark.sql.sources.v2 -import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsContinuousRead, SupportsStreamingWrite} +import org.apache.spark.sql.sources.v2.{SupportsContinuousRead, SupportsStreamingWrite} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.Clock @@ -71,9 +70,8 @@ class ContinuousExecution( val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" nextSourceId += 1 logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]") - val dsOptions = new DataSourceOptions(options.asJava) // TODO: operator pushdown. - val scan = table.newScanBuilder(dsOptions).build() + val scan = table.newScanBuilder(options).build() val stream = scan.toContinuousStream(metadataPath) StreamingDataSourceV2Relation(output, scan, stream) }) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala index 48ff70f9c9d07..d55f71c7be830 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousRateStreamSource.scala @@ -23,17 +23,13 @@ import org.json4s.jackson.Serialization import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair} -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming._ case class RateStreamPartitionOffset( partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset -class RateStreamContinuousStream( - rowsPerSecond: Long, - numPartitions: Int, - options: DataSourceOptions) extends ContinuousStream { +class RateStreamContinuousStream(rowsPerSecond: Long, numPartitions: Int) extends ContinuousStream { implicit val defaultFormats: DefaultFormats = DefaultFormats val creationTime = System.currentTimeMillis() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala index e7bc71394061e..2263b42870a65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTextSocketSource.scala @@ -34,9 +34,9 @@ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.execution.streaming.{Offset => _, _} import org.apache.spark.sql.execution.streaming.sources.TextSocketReader -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.RpcUtils @@ -49,7 +49,7 @@ import org.apache.spark.util.RpcUtils * buckets and serves the messages to the executors via a RPC endpoint. */ class TextSocketContinuousStream( - host: String, port: Int, numPartitions: Int, options: DataSourceOptions) + host: String, port: Int, numPartitions: Int, options: CaseInsensitiveStringMap) extends ContinuousStream with Logging { implicit val defaultFormats: DefaultFormats = DefaultFormats diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index e71f81caeb974..df7990c6a652e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2} import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap object MemoryStream { protected val currentBlockId = new AtomicInteger(0) @@ -73,7 +74,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas MemoryStreamTableProvider, "memory", new MemoryStreamTable(this), - Map.empty, + CaseInsensitiveStringMap.empty(), attributes, None)(sqlContext.sparkSession) } @@ -84,7 +85,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas // This class is used to indicate the memory stream data source. We don't actually use it, as // memory stream is for test only and we never look it up by name. object MemoryStreamTableProvider extends TableProvider { - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { throw new IllegalStateException("MemoryStreamTableProvider should not be used.") } } @@ -96,7 +97,7 @@ class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table override def schema(): StructType = stream.fullSchema() - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MemoryStreamScanBuilder(stream) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala index f2ff30bcf1bef..dbe242784986d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWrite.scala @@ -20,13 +20,13 @@ package org.apache.spark.sql.execution.streaming.sources import org.apache.spark.internal.Logging import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** Common methods used to create writes for the the console sink */ -class ConsoleWrite(schema: StructType, options: DataSourceOptions) +class ConsoleWrite(schema: StructType, options: CaseInsensitiveStringMap) extends StreamingWrite with Logging { // Number of rows to display, by default 20 rows diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index c0ae44a128ca1..44516bbb2a5a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -22,10 +22,11 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.python.PythonForeachWriter -import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, Table} +import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, Table} import org.apache.spark.sql.sources.v2.writer.{DataWriter, SupportsTruncate, WriteBuilder, WriterCommitMessage} import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A write-only table for forwarding data into the specified [[ForeachWriter]]. @@ -44,7 +45,7 @@ case class ForeachWriterTable[T]( override def schema(): StructType = StructType(Nil) - override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { new WriteBuilder with SupportsTruncate { private var inputSchema: StructType = _ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala index a8feed34b96dc..e6cefd77640b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamMicroBatchStream.scala @@ -28,9 +28,9 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset} +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.{ManualClock, SystemClock} class RateStreamMicroBatchStream( @@ -38,7 +38,7 @@ class RateStreamMicroBatchStream( // The default values here are used in tests. rampUpTimeSeconds: Long = 0, numPartitions: Int = 1, - options: DataSourceOptions, + options: CaseInsensitiveStringMap, checkpointLocation: String) extends MicroBatchStream with Logging { import RateStreamProvider._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala index 3a0082536512d..3d8a90e99b85a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProvider.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A source that generates increment long values with timestamps. Each generated row has two @@ -43,14 +44,14 @@ import org.apache.spark.sql.types._ class RateStreamProvider extends TableProvider with DataSourceRegister { import RateStreamProvider._ - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { val rowsPerSecond = options.getLong(ROWS_PER_SECOND, 1) if (rowsPerSecond <= 0) { throw new IllegalArgumentException( s"Invalid value '$rowsPerSecond'. The option 'rowsPerSecond' must be positive") } - val rampUpTimeSeconds = Option(options.get(RAMP_UP_TIME).orElse(null)) + val rampUpTimeSeconds = Option(options.get(RAMP_UP_TIME)) .map(JavaUtils.timeStringAsSec) .getOrElse(0L) if (rampUpTimeSeconds < 0) { @@ -83,7 +84,7 @@ class RateStreamTable( override def schema(): StructType = RateStreamProvider.SCHEMA - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder { override def build(): Scan = new Scan { override def readSchema(): StructType = RateStreamProvider.SCHEMA @@ -93,7 +94,7 @@ class RateStreamTable( } override def toContinuousStream(checkpointLocation: String): ContinuousStream = { - new RateStreamContinuousStream(rowsPerSecond, numPartitions, options) + new RateStreamContinuousStream(rowsPerSecond, numPartitions) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala index 540131c8de8a1..9168d46493aef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketMicroBatchStream.scala @@ -29,7 +29,6 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.streaming.LongOffset -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset} import org.apache.spark.unsafe.types.UTF8String @@ -39,8 +38,7 @@ import org.apache.spark.unsafe.types.UTF8String * and debugging. This MicroBatchReadSupport will *not* work in production applications due to * multiple reasons, including no support for fault recovery. */ -class TextSocketMicroBatchStream( - host: String, port: Int, numPartitions: Int, options: DataSourceOptions) +class TextSocketMicroBatchStream(host: String, port: Int, numPartitions: Int) extends MicroBatchStream with Logging { @GuardedBy("this") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala index 8ac5bfc307aa3..0adbf1d9b3689 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketSourceProvider.scala @@ -30,20 +30,21 @@ import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap class TextSocketSourceProvider extends TableProvider with DataSourceRegister with Logging { - private def checkParameters(params: DataSourceOptions): Unit = { + private def checkParameters(params: CaseInsensitiveStringMap): Unit = { logWarning("The socket source should not be used for production applications! " + "It does not support recovery.") - if (!params.get("host").isPresent) { + if (!params.containsKey("host")) { throw new AnalysisException("Set a host to read from with option(\"host\", ...).") } - if (!params.get("port").isPresent) { + if (!params.containsKey("port")) { throw new AnalysisException("Set a port to read from with option(\"port\", ...).") } Try { - params.get("includeTimestamp").orElse("false").toBoolean + params.getBoolean("includeTimestamp", false) } match { case Success(_) => case Failure(_) => @@ -51,10 +52,10 @@ class TextSocketSourceProvider extends TableProvider with DataSourceRegister wit } } - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { checkParameters(options) new TextSocketTable( - options.get("host").get, + options.get("host"), options.getInt("port", -1), options.getInt("numPartitions", SparkSession.active.sparkContext.defaultParallelism), options.getBoolean("includeTimestamp", false)) @@ -77,12 +78,12 @@ class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimest } } - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder { override def build(): Scan = new Scan { override def readSchema(): StructType = schema() override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = { - new TextSocketMicroBatchStream(host, port, numPartitions, options) + new TextSocketMicroBatchStream(host, port, numPartitions) } override def toContinuousStream(checkpointLocation: String): ContinuousStream = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala index 397c5ff0dcb6a..22adceba930fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala @@ -31,10 +31,11 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics} import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.execution.streaming.{MemorySinkBase, Sink} -import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite} +import org.apache.spark.sql.sources.v2.SupportsStreamingWrite import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit @@ -46,7 +47,7 @@ class MemorySinkV2 extends SupportsStreamingWrite with MemorySinkBase with Loggi override def schema(): StructType = StructType(Nil) - override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { new WriteBuilder with SupportsTruncate { private var needTruncate: Boolean = false private var inputSchema: StructType = _ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 96b3a86f5df4d..01f29cdeddc2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRel import org.apache.spark.sql.sources.StreamSourceProvider import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems, @@ -175,7 +176,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions - val dsOptions = new DataSourceOptions(options.asJava) + val dsOptions = new CaseInsensitiveStringMap(options.asJava) val table = userSpecifiedSchema match { case Some(schema) => provider.getTable(dsOptions, schema) case _ => provider.getTable(dsOptions) @@ -185,7 +186,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo Dataset.ofRows( sparkSession, StreamingRelationV2( - provider, source, table, options, table.schema.toAttributes, v1Relation)( + provider, source, table, dsOptions, table.schema.toAttributes, v1Relation)( sparkSession)) // fallback to v1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 984199488fa7b..33d032eb78c2b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -31,7 +31,8 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.execution.streaming.sources._ -import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsStreamingWrite, TableProvider} +import org.apache.spark.sql.sources.v2.{SupportsStreamingWrite, TableProvider} +import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems, @@ -313,7 +314,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = df.sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions - val dsOptions = new DataSourceOptions(options.asJava) + val dsOptions = new CaseInsensitiveStringMap(options.asJava) provider.getTable(dsOptions) match { case s: SupportsStreamingWrite => s case _ => createV1Sink() diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java index 2612b6185fd4c..255a9f887878b 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaAdvancedDataSourceV2.java @@ -24,19 +24,19 @@ import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.GreaterThan; -import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.Table; import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; public class JavaAdvancedDataSourceV2 implements TableProvider { @Override - public Table getTable(DataSourceOptions options) { + public Table getTable(CaseInsensitiveStringMap options) { return new JavaSimpleBatchTable() { @Override - public ScanBuilder newScanBuilder(DataSourceOptions options) { + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { return new AdvancedScanBuilder(); } }; diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java index d72ab5338aa8c..699859cfaebe1 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaColumnarDataSourceV2.java @@ -21,11 +21,11 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector; -import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.Table; import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.vectorized.ColumnarBatch; @@ -49,10 +49,10 @@ public PartitionReaderFactory createReaderFactory() { } @Override - public Table getTable(DataSourceOptions options) { + public Table getTable(CaseInsensitiveStringMap options) { return new JavaSimpleBatchTable() { @Override - public ScanBuilder newScanBuilder(DataSourceOptions options) { + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { return new MyScanBuilder(); } }; diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java index a513bfb26ef1c..dfbea927e477b 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaPartitionAwareDataSource.java @@ -22,13 +22,13 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.Table; import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.sources.v2.reader.partitioning.ClusteredDistribution; import org.apache.spark.sql.sources.v2.reader.partitioning.Distribution; import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; public class JavaPartitionAwareDataSource implements TableProvider { @@ -54,10 +54,10 @@ public Partitioning outputPartitioning() { } @Override - public Table getTable(DataSourceOptions options) { + public Table getTable(CaseInsensitiveStringMap options) { return new JavaSimpleBatchTable() { @Override - public ScanBuilder newScanBuilder(DataSourceOptions options) { + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { return new MyScanBuilder(); } }; diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java index bbc8492ec4e16..f3755e18b58d5 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaReportStatisticsDataSource.java @@ -19,13 +19,13 @@ import java.util.OptionalLong; -import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.Table; import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.InputPartition; import org.apache.spark.sql.sources.v2.reader.ScanBuilder; import org.apache.spark.sql.sources.v2.reader.Statistics; import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; public class JavaReportStatisticsDataSource implements TableProvider { class MyScanBuilder extends JavaSimpleScanBuilder implements SupportsReportStatistics { @@ -54,10 +54,10 @@ public InputPartition[] planInputPartitions() { } @Override - public Table getTable(DataSourceOptions options) { + public Table getTable(CaseInsensitiveStringMap options) { return new JavaSimpleBatchTable() { @Override - public ScanBuilder newScanBuilder(DataSourceOptions options) { + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { return new MyScanBuilder(); } }; diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java index 815d57ba94139..3800a94f88898 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSchemaRequiredDataSource.java @@ -17,11 +17,11 @@ package test.org.apache.spark.sql.sources.v2; -import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.Table; import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.*; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; public class JavaSchemaRequiredDataSource implements TableProvider { @@ -45,7 +45,7 @@ public InputPartition[] planInputPartitions() { } @Override - public Table getTable(DataSourceOptions options, StructType schema) { + public Table getTable(CaseInsensitiveStringMap options, StructType schema) { return new JavaSimpleBatchTable() { @Override @@ -54,14 +54,14 @@ public StructType schema() { } @Override - public ScanBuilder newScanBuilder(DataSourceOptions options) { + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { return new MyScanBuilder(schema); } }; } @Override - public Table getTable(DataSourceOptions options) { + public Table getTable(CaseInsensitiveStringMap options) { throw new IllegalArgumentException("requires a user-supplied schema"); } } diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java index 852c4546df885..7474f36c97f75 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/v2/JavaSimpleDataSourceV2.java @@ -17,10 +17,10 @@ package test.org.apache.spark.sql.sources.v2; -import org.apache.spark.sql.sources.v2.DataSourceOptions; import org.apache.spark.sql.sources.v2.Table; import org.apache.spark.sql.sources.v2.TableProvider; import org.apache.spark.sql.sources.v2.reader.*; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; public class JavaSimpleDataSourceV2 implements TableProvider { @@ -36,10 +36,10 @@ public InputPartition[] planInputPartitions() { } @Override - public Table getTable(DataSourceOptions options) { + public Table getTable(CaseInsensitiveStringMap options) { return new JavaSimpleBatchTable() { @Override - public ScanBuilder newScanBuilder(DataSourceOptions options) { + public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { return new MyScanBuilder(); } }; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala index cccd8e9ee8bd1..034454d21d7ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilterSuite.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsR import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -58,7 +57,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { case PhysicalOperation(_, filters, DataSourceV2Relation(orcTable: OrcTable, _, options)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") - val scanBuilder = orcTable.newScanBuilder(new DataSourceOptions(options.asJava)) + val scanBuilder = orcTable.newScanBuilder(options) scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) val pushedFilters = scanBuilder.pushedFilters() assert(pushedFilters.nonEmpty, "No filter is pushed down") @@ -102,7 +101,7 @@ class OrcFilterSuite extends OrcTest with SharedSQLContext { case PhysicalOperation(_, filters, DataSourceV2Relation(orcTable: OrcTable, _, options)) => assert(filters.nonEmpty, "No filter is analyzed from the given query") - val scanBuilder = orcTable.newScanBuilder(new DataSourceOptions(options.asJava)) + val scanBuilder = orcTable.newScanBuilder(options) scanBuilder.pushFilters(filters.flatMap(DataSourceStrategy.translateFilter).toArray) val pushedFilters = scanBuilder.pushedFilters() if (noneSupported) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala index d0418f893143e..c04f6e3f255cb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RateStreamProviderSuite.scala @@ -29,9 +29,9 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.functions._ -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader.streaming.Offset import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.ManualClock class RateStreamProviderSuite extends StreamTest { @@ -135,7 +135,7 @@ class RateStreamProviderSuite extends StreamTest { withTempDir { temp => val stream = new RateStreamMicroBatchStream( rowsPerSecond = 100, - options = new DataSourceOptions(Map("useManualClock" -> "true").asJava), + options = new CaseInsensitiveStringMap(Map("useManualClock" -> "true").asJava), checkpointLocation = temp.getCanonicalPath) stream.clock.asInstanceOf[ManualClock].advance(100000) val startOffset = stream.initialOffset() @@ -154,7 +154,7 @@ class RateStreamProviderSuite extends StreamTest { withTempDir { temp => val stream = new RateStreamMicroBatchStream( rowsPerSecond = 20, - options = DataSourceOptions.empty(), + options = CaseInsensitiveStringMap.empty(), checkpointLocation = temp.getCanonicalPath) val partitions = stream.planInputPartitions(LongOffset(0L), LongOffset(1L)) val readerFactory = stream.createReaderFactory() @@ -173,7 +173,7 @@ class RateStreamProviderSuite extends StreamTest { val stream = new RateStreamMicroBatchStream( rowsPerSecond = 33, numPartitions = 11, - options = DataSourceOptions.empty(), + options = CaseInsensitiveStringMap.empty(), checkpointLocation = temp.getCanonicalPath) val partitions = stream.planInputPartitions(LongOffset(0L), LongOffset(1L)) val readerFactory = stream.createReaderFactory() @@ -309,8 +309,7 @@ class RateStreamProviderSuite extends StreamTest { } test("continuous data") { - val stream = new RateStreamContinuousStream( - rowsPerSecond = 20, numPartitions = 2, options = DataSourceOptions.empty()) + val stream = new RateStreamContinuousStream(rowsPerSecond = 20, numPartitions = 2) val partitions = stream.planInputPartitions(stream.initialOffset) val readerFactory = stream.createContinuousReaderFactory() assert(partitions.size == 2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala index e1769fb0b2881..a5ba4f9633e7b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/TextSocketStreamSuite.scala @@ -35,11 +35,11 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.continuous._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.sources.v2.DataSourceOptions import org.apache.spark.sql.sources.v2.reader.streaming.Offset import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ +import org.apache.spark.sql.util.CaseInsensitiveStringMap class TextSocketStreamSuite extends StreamTest with SharedSQLContext with BeforeAndAfterEach { @@ -176,13 +176,13 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before test("params not given") { val provider = new TextSocketSourceProvider intercept[AnalysisException] { - provider.getTable(new DataSourceOptions(Map.empty[String, String].asJava)) + provider.getTable(CaseInsensitiveStringMap.empty()) } intercept[AnalysisException] { - provider.getTable(new DataSourceOptions(Map("host" -> "localhost").asJava)) + provider.getTable(new CaseInsensitiveStringMap(Map("host" -> "localhost").asJava)) } intercept[AnalysisException] { - provider.getTable(new DataSourceOptions(Map("port" -> "1234").asJava)) + provider.getTable(new CaseInsensitiveStringMap(Map("port" -> "1234").asJava)) } } @@ -190,7 +190,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before val provider = new TextSocketSourceProvider val params = Map("host" -> "localhost", "port" -> "1234", "includeTimestamp" -> "fasle") intercept[AnalysisException] { - provider.getTable(new DataSourceOptions(params.asJava)) + provider.getTable(new CaseInsensitiveStringMap(params.asJava)) } } @@ -201,7 +201,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before StructField("area", StringType) :: Nil) val params = Map("host" -> "localhost", "port" -> "1234") val exception = intercept[UnsupportedOperationException] { - provider.getTable(new DataSourceOptions(params.asJava), userSpecifiedSchema) + provider.getTable(new CaseInsensitiveStringMap(params.asJava), userSpecifiedSchema) } assert(exception.getMessage.contains( "socket source does not support user-specified schema")) @@ -299,7 +299,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before host = "localhost", port = serverThread.port, numPartitions = 2, - options = DataSourceOptions.empty()) + options = CaseInsensitiveStringMap.empty()) val partitions = stream.planInputPartitions(stream.initialOffset()) assert(partitions.length == 2) @@ -351,7 +351,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before host = "localhost", port = serverThread.port, numPartitions = 2, - options = DataSourceOptions.empty()) + options = CaseInsensitiveStringMap.empty()) stream.startOffset = TextSocketOffset(List(5, 5)) assertThrows[IllegalStateException] { @@ -367,7 +367,7 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before host = "localhost", port = serverThread.port, numPartitions = 2, - options = new DataSourceOptions(Map("includeTimestamp" -> "true").asJava)) + options = new CaseInsensitiveStringMap(Map("includeTimestamp" -> "true").asJava)) val partitions = stream.planInputPartitions(stream.initialOffset()) assert(partitions.size == 2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala index e184bf57fa7d2..705559d099bec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.partitioning.{ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch class DataSourceV2Suite extends QueryTest with SharedSQLContext { @@ -349,7 +350,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext { val options = df.queryExecution.optimizedPlan.collectFirst { case d: DataSourceV2Relation => d.options }.get - assert(options(optionName) === "false") + assert(options.get(optionName) === "false") } } @@ -437,8 +438,8 @@ class SimpleSinglePartitionSource extends TableProvider { } } - override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable { - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder() } } @@ -454,8 +455,8 @@ class SimpleDataSourceV2 extends TableProvider { } } - override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable { - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder() } } @@ -463,8 +464,8 @@ class SimpleDataSourceV2 extends TableProvider { class AdvancedDataSourceV2 extends TableProvider { - override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable { - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new AdvancedScanBuilder() } } @@ -559,16 +560,16 @@ class SchemaRequiredDataSource extends TableProvider { override def readSchema(): StructType = schema } - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { throw new IllegalArgumentException("requires a user-supplied schema") } - override def getTable(options: DataSourceOptions, schema: StructType): Table = { + override def getTable(options: CaseInsensitiveStringMap, schema: StructType): Table = { val userGivenSchema = schema new SimpleBatchTable { override def schema(): StructType = userGivenSchema - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder(userGivenSchema) } } @@ -588,8 +589,8 @@ class ColumnarDataSourceV2 extends TableProvider { } } - override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable { - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder() } } @@ -659,8 +660,8 @@ class PartitionAwareDataSource extends TableProvider { override def outputPartitioning(): Partitioning = new MyPartitioning } - override def getTable(options: DataSourceOptions): Table = new SimpleBatchTable { - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + override def getTable(options: CaseInsensitiveStringMap): Table = new SimpleBatchTable { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder() } } @@ -699,7 +700,7 @@ class SchemaReadAttemptException(m: String) extends RuntimeException(m) class SimpleWriteOnlyDataSource extends SimpleWritableDataSource { - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { new MyTable(options) { override def schema(): StructType = { throw new SchemaReadAttemptException("schema should not be read.") @@ -725,9 +726,9 @@ class ReportStatisticsDataSource extends TableProvider { } } - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { new SimpleBatchTable { - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala index fd19a48497fe6..f9f9db35ac2dd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/FileDataSourceV2FallBackSuite.scala @@ -18,13 +18,14 @@ package org.apache.spark.sql.sources.v2 import org.apache.spark.sql.{AnalysisException, QueryTest} import org.apache.spark.sql.execution.datasources.FileFormat -import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetTest} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.reader.ScanBuilder import org.apache.spark.sql.sources.v2.writer.WriteBuilder import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 { @@ -32,7 +33,7 @@ class DummyReadOnlyFileDataSourceV2 extends FileDataSourceV2 { override def shortName(): String = "parquet" - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { new DummyReadOnlyFileTable } } @@ -42,7 +43,7 @@ class DummyReadOnlyFileTable extends Table with SupportsBatchRead { override def schema(): StructType = StructType(Nil) - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { throw new AnalysisException("Dummy file reader") } } @@ -53,7 +54,7 @@ class DummyWriteOnlyFileDataSourceV2 extends FileDataSourceV2 { override def shortName(): String = "parquet" - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { new DummyWriteOnlyFileTable } } @@ -63,7 +64,7 @@ class DummyWriteOnlyFileTable extends Table with SupportsBatchWrite { override def schema(): StructType = StructType(Nil) - override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = throw new AnalysisException("Dummy file writer") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala index c56a54598cd4c..160354520e432 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala @@ -25,12 +25,12 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext -import org.apache.spark.internal.config.SPECULATION_ENABLED import org.apache.spark.sql.SaveMode import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.writer._ import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.SerializableConfiguration /** @@ -141,22 +141,24 @@ class SimpleWritableDataSource extends TableProvider with SessionConfigSupport { } } - class MyTable(options: DataSourceOptions) extends SimpleBatchTable with SupportsBatchWrite { - private val path = options.get("path").get() + class MyTable(options: CaseInsensitiveStringMap) + extends SimpleBatchTable with SupportsBatchWrite { + + private val path = options.get("path") private val conf = SparkContext.getActive.get.hadoopConfiguration override def schema(): StructType = tableSchema - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = { + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = { new MyScanBuilder(new Path(path).toUri.toString, conf) } - override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { new MyWriteBuilder(path) } } - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { new MyTable(options) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 97b694eafc613..a98053e34b887 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming._ import org.apache.spark.sql.sources.v2.writer.WriteBuilder import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger} import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.util.Utils class FakeDataStream extends MicroBatchStream with ContinuousStream { @@ -62,19 +63,19 @@ class FakeScanBuilder extends ScanBuilder with Scan { trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead { override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder } trait FakeContinuousReadTable extends Table with SupportsContinuousRead { override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) - override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder + override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new FakeScanBuilder } trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite { override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) - override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { + override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = { throw new IllegalStateException("fake sink - cannot actually write") } } @@ -87,7 +88,7 @@ class FakeReadMicroBatchOnly override def keyPrefix: String = shortName() - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { LastReadOptions.options = options new FakeMicroBatchReadTable {} } @@ -101,7 +102,7 @@ class FakeReadContinuousOnly override def keyPrefix: String = shortName() - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { LastReadOptions.options = options new FakeContinuousReadTable {} } @@ -110,7 +111,7 @@ class FakeReadContinuousOnly class FakeReadBothModes extends DataSourceRegister with TableProvider { override def shortName(): String = "fake-read-microbatch-continuous" - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { new Table with FakeMicroBatchReadTable with FakeContinuousReadTable {} } } @@ -118,7 +119,7 @@ class FakeReadBothModes extends DataSourceRegister with TableProvider { class FakeReadNeitherMode extends DataSourceRegister with TableProvider { override def shortName(): String = "fake-read-neither-mode" - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { new Table { override def name(): String = "fake" override def schema(): StructType = StructType(Nil) @@ -134,7 +135,7 @@ class FakeWriteOnly override def keyPrefix: String = shortName() - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { LastWriteOptions.options = options new Table with FakeStreamingWriteTable { override def name(): String = "fake" @@ -145,7 +146,7 @@ class FakeWriteOnly class FakeNoWrite extends DataSourceRegister with TableProvider { override def shortName(): String = "fake-write-neither-mode" - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { new Table { override def name(): String = "fake" override def schema(): StructType = StructType(Nil) @@ -172,7 +173,7 @@ class FakeWriteSupportProviderV1Fallback extends DataSourceRegister override def shortName(): String = "fake-write-v1-fallback" - override def getTable(options: DataSourceOptions): Table = { + override def getTable(options: CaseInsensitiveStringMap): Table = { new Table with FakeStreamingWriteTable { override def name(): String = "fake" override def schema(): StructType = StructType(Nil) @@ -181,7 +182,7 @@ class FakeWriteSupportProviderV1Fallback extends DataSourceRegister } object LastReadOptions { - var options: DataSourceOptions = _ + var options: CaseInsensitiveStringMap = _ def clear(): Unit = { options = null @@ -189,7 +190,7 @@ object LastReadOptions { } object LastWriteOptions { - var options: DataSourceOptions = _ + var options: CaseInsensitiveStringMap = _ def clear(): Unit = { options = null @@ -306,8 +307,8 @@ class StreamingDataSourceV2Suite extends StreamTest { testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ => eventually(timeout(streamingTimeout)) { // Write options should not be set. - assert(LastWriteOptions.options.getBoolean(readOptionName, false) == false) - assert(LastReadOptions.options.getBoolean(readOptionName, false)) + assert(!LastWriteOptions.options.containsKey(readOptionName)) + assert(LastReadOptions.options.get(readOptionName) == "true") } } } @@ -317,8 +318,8 @@ class StreamingDataSourceV2Suite extends StreamTest { testPositiveCaseWithQuery(readSource, writeSource, trigger) { _ => eventually(timeout(streamingTimeout)) { // Read options should not be set. - assert(LastReadOptions.options.getBoolean(writeOptionName, false) == false) - assert(LastWriteOptions.options.getBoolean(writeOptionName, false)) + assert(!LastReadOptions.options.containsKey(writeOptionName)) + assert(LastWriteOptions.options.get(writeOptionName) == "true") } } } @@ -337,10 +338,10 @@ class StreamingDataSourceV2Suite extends StreamTest { for ((read, write, trigger) <- cases) { testQuietly(s"stream with read format $read, write format $write, trigger $trigger") { val sourceTable = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor() - .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty()) + .newInstance().asInstanceOf[TableProvider].getTable(CaseInsensitiveStringMap.empty()) val sinkTable = DataSource.lookupDataSource(write, spark.sqlContext.conf).getConstructor() - .newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty()) + .newInstance().asInstanceOf[TableProvider].getTable(CaseInsensitiveStringMap.empty()) (sourceTable, sinkTable, trigger) match { // Valid microbatch queries.