diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 2b6940d055e1f..2c4169d1b921d 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2995,7 +2995,7 @@ the effect of the change is not well-defined. For all of them: - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) are allowed, but the semantics of the change depends on the code. -- *Changes in projection / filter / map-like operations**: Some cases are allowed. For example: +- *Changes in projection / filter / map-like operations*: Some cases are allowed. For example: - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 3a2e7368279f7..96d3f5c046fd3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2138,9 +2138,9 @@ class SQLConf extends Serializable with Logging { def continuousStreamingExecutorPollIntervalMs: Long = getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS) - def userV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST) + def useV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST) - def userV1SourceWriterList: String = getConf(USE_V1_SOURCE_WRITER_LIST) + def useV1SourceWriterList: String = getConf(USE_V1_SOURCE_WRITER_LIST) def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 3cd48baebf2e3..8460c7902e7d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -195,7 +195,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } val useV1Sources = - sparkSession.sessionState.conf.userV1SourceReaderList.toLowerCase(Locale.ROOT).split(",") + sparkSession.sessionState.conf.useV1SourceReaderList.toLowerCase(Locale.ROOT).split(",") val lookupCls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) val cls = lookupCls.newInstance() match { case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) || diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 3b8415121c275..18653b2bf5420 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -248,7 +248,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val session = df.sparkSession val useV1Sources = - session.sessionState.conf.userV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") + session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") val lookupCls = DataSource.lookupDataSource(source, session.sessionState.conf) val cls = lookupCls.newInstance() match { case f: FileDataSourceV2 if useV1Sources.contains(f.shortName()) || diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index f503ff03b971c..f16138e8d790f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -58,7 +58,7 @@ case class DataSourceResolution(conf: SQLConf) extends Rule[LogicalPlan] with Ca object V1WriteProvider { private val v1WriteOverrideSet = - conf.userV1SourceWriterList.toLowerCase(Locale.ROOT).split(",").toSet + conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",").toSet def unapply(provider: String): Option[String] = { if (v1WriteOverrideSet.contains(provider.toLowerCase(Locale.ROOT))) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 52b8e4732e808..6ff3c94c5c4d3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -294,7 +294,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be /** Execute arbitrary code */ object Execute { def apply(name: String)(func: StreamExecution => Any): AssertOnQuery = - AssertOnQuery(query => { func(query); true }, "name") + AssertOnQuery(query => { func(query); true }, name) def apply(func: StreamExecution => Any): AssertOnQuery = apply("Execute")(func) }