Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27106][SQL] merge CaseInsensitiveStringMap and DataSourceOptions #24025

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming._
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
* read by per-task consumers generated later.
* @param kafkaParams String params for per-task Kafka consumers.
* @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
* are not Kafka consumer params.
* @param sourceOptions Params which are not Kafka consumer params.
* @param metadataPath Path to a directory this reader can use for writing metadata.
* @param initialOffsets The Kafka offsets to start reading data at.
* @param failOnDataLoss Flag indicating whether reading should fail in data loss
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
import org.apache.spark.sql.execution.streaming.sources.RateControlMicroBatchStream
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.UninterruptibleThread

/**
Expand All @@ -57,7 +57,7 @@ import org.apache.spark.util.UninterruptibleThread
private[kafka010] class KafkaMicroBatchStream(
kafkaOffsetReader: KafkaOffsetReader,
executorKafkaParams: ju.Map[String, Object],
options: DataSourceOptions,
options: CaseInsensitiveStringMap,
metadataPath: String,
startingOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean) extends RateControlMicroBatchStream with Logging {
Expand All @@ -66,8 +66,7 @@ private[kafka010] class KafkaMicroBatchStream(
"kafkaConsumer.pollTimeoutMs",
SparkEnv.get.conf.getTimeAsSeconds("spark.network.timeout", "120s") * 1000L)

private val maxOffsetsPerTrigger =
Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
private val maxOffsetsPerTrigger = Option(options.get("maxOffsetsPerTrigger")).map(_.toLong)

private val rangeCalculator = KafkaOffsetRangeCalculator(options)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.kafka010

import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.util.CaseInsensitiveStringMap


/**
Expand Down Expand Up @@ -91,8 +91,8 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int

private[kafka010] object KafkaOffsetRangeCalculator {

def apply(options: DataSourceOptions): KafkaOffsetRangeCalculator = {
val optionalValue = Option(options.get("minPartitions").orElse(null)).map(_.toInt)
def apply(options: CaseInsensitiveStringMap): KafkaOffsetRangeCalculator = {
val optionalValue = Option(options.get("minPartitions")).map(_.toInt)
new KafkaOffsetRangeCalculator(optionalValue)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.sources.v2.writer.WriteBuilder
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
* The provider class for all Kafka readers and writers. It is designed such that it throws
Expand Down Expand Up @@ -103,8 +104,8 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
failOnDataLoss(caseInsensitiveParams))
}

override def getTable(options: DataSourceOptions): KafkaTable = {
new KafkaTable(strategy(options.asMap().asScala.toMap))
override def getTable(options: CaseInsensitiveStringMap): KafkaTable = {
new KafkaTable(strategy(options.asScala.toMap))
}

/**
Expand Down Expand Up @@ -358,11 +359,11 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister

override def schema(): StructType = KafkaOffsetReader.kafkaSchema

override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new ScanBuilder {
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = new ScanBuilder {
override def build(): Scan = new KafkaScan(options)
}

override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
override def newWriteBuilder(options: CaseInsensitiveStringMap): WriteBuilder = {
new WriteBuilder {
private var inputSchema: StructType = _

Expand All @@ -375,20 +376,20 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
import scala.collection.JavaConverters._

assert(inputSchema != null)
val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
val topic = Option(options.get(TOPIC_OPTION_KEY)).map(_.trim)
val producerParams = kafkaParamsForProducer(options.asScala.toMap)
new KafkaStreamingWrite(topic, producerParams, inputSchema)
}
}
}
}

class KafkaScan(options: DataSourceOptions) extends Scan {
class KafkaScan(options: CaseInsensitiveStringMap) extends Scan {

override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema

override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
val parameters = options.asMap().asScala.toMap
val parameters = options.asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
Expand Down Expand Up @@ -417,7 +418,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}

override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
val parameters = options.asMap().asScala.toMap
val parameters = options.asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.util.CaseInsensitiveStringMap

abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with KafkaTest {

Expand Down Expand Up @@ -1118,7 +1118,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
"kafka.bootstrap.servers" -> testUtils.brokerAddress,
"subscribe" -> topic
) ++ Option(minPartitions).map { p => "minPartitions" -> p}
val dsOptions = new DataSourceOptions(options.asJava)
val dsOptions = new CaseInsensitiveStringMap(options.asJava)
val table = provider.getTable(dsOptions)
val stream = table.newScanBuilder(dsOptions).build().toMicroBatchStream(dir.getAbsolutePath)
val inputPartitions = stream.planInputPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,21 @@ import scala.collection.JavaConverters._
import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.util.CaseInsensitiveStringMap

class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {

def testWithMinPartitions(name: String, minPartition: Int)
(f: KafkaOffsetRangeCalculator => Unit): Unit = {
val options = new DataSourceOptions(Map("minPartitions" -> minPartition.toString).asJava)
val options = new CaseInsensitiveStringMap(Map("minPartitions" -> minPartition.toString).asJava)
test(s"with minPartition = $minPartition: $name") {
f(KafkaOffsetRangeCalculator(options))
}
}


test("with no minPartition: N TopicPartitions to N offset ranges") {
val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
assert(
calc.getRanges(
fromOffsets = Map(tp1 -> 1),
Expand Down Expand Up @@ -64,7 +64,7 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
}

test("with no minPartition: empty ranges ignored") {
val calc = KafkaOffsetRangeCalculator(DataSourceOptions.empty())
val calc = KafkaOffsetRangeCalculator(CaseInsensitiveStringMap.empty())
assert(
calc.getRanges(
fromOffsets = Map(tp1 -> 1, tp2 -> 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,20 @@
* This is used to pass options to v2 implementations to ensure consistent case insensitivity.
* <p>
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
* keys converted to lower case.
* keys converted to lower case. This map doesn't allow null key.
*/
@Experimental
public class CaseInsensitiveStringMap implements Map<String, String> {

public static CaseInsensitiveStringMap empty() {
return new CaseInsensitiveStringMap();
return new CaseInsensitiveStringMap(new HashMap<>(0));
}

private final Map<String, String> delegate;

private CaseInsensitiveStringMap() {
this.delegate = new HashMap<>();
public CaseInsensitiveStringMap(Map<String, String> originalMap) {
this.delegate = new HashMap<>(originalMap.size());
putAll(originalMap);
}

@Override
Expand All @@ -56,9 +57,13 @@ public boolean isEmpty() {
return delegate.isEmpty();
}

private String toLowerCase(Object key) {
return key.toString().toLowerCase(Locale.ROOT);
}

@Override
public boolean containsKey(Object key) {
return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
return delegate.containsKey(toLowerCase(key));
}

@Override
Expand All @@ -68,17 +73,17 @@ public boolean containsValue(Object value) {

@Override
public String get(Object key) {
return delegate.get(key.toString().toLowerCase(Locale.ROOT));
return delegate.get(toLowerCase(key));
}

@Override
public String put(String key, String value) {
return delegate.put(key.toLowerCase(Locale.ROOT), value);
return delegate.put(toLowerCase(key), value);
}

@Override
public String remove(Object key) {
return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
return delegate.remove(toLowerCase(key));
}

@Override
Expand Down Expand Up @@ -107,4 +112,49 @@ public Collection<String> values() {
public Set<Map.Entry<String, String>> entrySet() {
return delegate.entrySet();
}

/**
* Returns the boolean value to which the specified key is mapped,
* or defaultValue if there is no mapping for the key. The key match is case-insensitive.
*/
public boolean getBoolean(String key, boolean defaultValue) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 4 methods are from DataSourceOptions, which are pretty general and useful.

String value = get(key);
// We can't use `Boolean.parseBoolean` here, as it returns false for invalid strings.
if (value == null) {
return defaultValue;
} else if (value.equalsIgnoreCase("true")) {
return true;
} else if (value.equalsIgnoreCase("false")) {
return false;
} else {
throw new IllegalArgumentException(value + " is not a boolean string.");
}
}

/**
* Returns the integer value to which the specified key is mapped,
* or defaultValue if there is no mapping for the key. The key match is case-insensitive.
*/
public int getInt(String key, int defaultValue) {
String value = get(key);
return value == null ? defaultValue : Integer.parseInt(value);
}

/**
* Returns the long value to which the specified key is mapped,
* or defaultValue if there is no mapping for the key. The key match is case-insensitive.
*/
public long getLong(String key, long defaultValue) {
String value = get(key);
return value == null ? defaultValue : Long.parseLong(value);
}

/**
* Returns the double value to which the specified key is mapped,
* or defaultValue if there is no mapping for the key. The key match is case-insensitive.
*/
public double getDouble(String key, double defaultValue) {
String value = get(key);
return value == null ? defaultValue : Double.parseDouble(value);
}
}

This file was deleted.

Loading