Skip to content

Commit

Permalink
[GLUTEN-8327][CORE] Introduce the ConfigEntry to make the config defi…
Browse files Browse the repository at this point in the history
…nition more flexible (apache#8328)
  • Loading branch information
yikf authored Dec 30, 2024
1 parent eaf6548 commit 3c53f9f
Show file tree
Hide file tree
Showing 222 changed files with 1,138 additions and 633 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.shuffle

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.vectorized.BlockOutputStream
import org.apache.gluten.vectorized.CHStreamReader

Expand Down Expand Up @@ -59,14 +59,14 @@ private class CHCelebornColumnarBatchSerializerInstance(
with Logging {

private lazy val conf = SparkEnv.get.conf
private lazy val gluten_conf = GlutenConfig.getConf
private lazy val gluten_conf = GlutenConfig.get
private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
GlutenConfig.get.columnarShuffleCodecBackend.orNull)

override def deserializeStream(in: InputStream): DeserializationStream = {
new DeserializationStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.shuffle

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.ColumnarNativeIterator
import org.apache.gluten.memory.CHThreadGroup
import org.apache.gluten.vectorized._
Expand Down Expand Up @@ -80,10 +80,10 @@ class CHCelebornColumnarShuffleWriter[K, V](
nativeBufferSize,
capitalizedCompressionCodec,
compressionLevel,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
GlutenConfig.get.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
GlutenConfig.get.chColumnarForceMemorySortShuffle
|| ShuffleMode.SORT.name.equalsIgnoreCase(shuffleWriterType)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
Expand Down Expand Up @@ -68,7 +68,7 @@ class ClickhouseOptimisticTransaction(
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
val nativeWrite = GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
val nativeWrite = GlutenConfig.get.enableNativeWriter.getOrElse(false)
if (writingMergeTree) {
// TODO: update FallbackByBackendSettings for mergetree always return true
val onePipeline = nativeWrite && CHConf.get.enableOnePipelineMergeTreeWrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources

import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, GlutenPlan, SortExecTransformer}
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.execution.iceberg

import org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, IcebergScanTransformer}
import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

Expand Down Expand Up @@ -62,7 +62,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(
s"""
Expand All @@ -84,7 +84,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite

// Partition key of integer type.
withSQLConf(
GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
GlutenConfig.GLUTEN_ENABLED.key -> "false"
) {
// Gluten does not support write iceberg table.
spark.sql(
Expand Down Expand Up @@ -145,7 +145,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(
s"""
Expand All @@ -167,7 +167,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite

// Partition key of integer type.
withSQLConf(
GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
GlutenConfig.GLUTEN_ENABLED.key -> "false"
) {
// Gluten does not support write iceberg table.
spark.sql(
Expand Down Expand Up @@ -228,7 +228,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(
s"""
Expand All @@ -250,7 +250,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite

// Partition key of integer type.
withSQLConf(
GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
GlutenConfig.GLUTEN_ENABLED.key -> "false"
) {
// Gluten does not support write iceberg table.
spark.sql(
Expand Down Expand Up @@ -350,7 +350,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite

test("iceberg read mor table - delete and update") {
withTable("iceberg_mor_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
spark.sql(
"""
|create table iceberg_mor_tb (
Expand Down Expand Up @@ -403,7 +403,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
// TODO: support merge-on-read mode
ignore("iceberg read mor table - delete and update with merge-on-read mode") {
withTable("iceberg_mor_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
spark.sql(
"""
|create table iceberg_mor_tb (
Expand Down Expand Up @@ -458,7 +458,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite

test("iceberg read mor table - merge into") {
withTable("iceberg_mor_tb", "merge_into_source_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
spark.sql(
"""
|create table iceberg_mor_tb (
Expand Down Expand Up @@ -531,7 +531,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
// TODO: support merge-on-read mode
ignore("iceberg read mor table - merge into with merge-on-read mode") {
withTable("iceberg_mor_tb", "merge_into_source_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
spark.sql(
"""
|create table iceberg_mor_tb (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.gluten.vectorized;

import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.config.GlutenConfig;
import org.apache.gluten.execution.ColumnarNativeIterator;
import org.apache.gluten.memory.CHThreadGroup;
import org.apache.gluten.utils.ConfigUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi._
import org.apache.gluten.columnarbatch.CHBatch
import org.apache.gluten.component.Component.BuildInfo
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
Expand Down Expand Up @@ -282,11 +282,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

override def supportSortExec(): Boolean = {
GlutenConfig.getConf.enableColumnarSort
GlutenConfig.get.enableColumnarSort
}

override def supportSortMergeJoinExec(): Boolean = {
GlutenConfig.getConf.enableColumnarSortMergeJoin
GlutenConfig.get.enableColumnarSortMergeJoin
}

override def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = {
Expand Down Expand Up @@ -391,7 +391,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
GlutenConfig.get.enableNativeWriter.getOrElse(false)
}

override def supportCartesianProductExec(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.SQLConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenNumaBindingInfo
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.config.GlutenNumaBindingInfo
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.logging.LogLevelUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.ListenerApi
import org.apache.gluten.columnarbatch.CHBatch
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.CHBroadcastBuildSideCache
import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.UDFMappings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.columnarbatch.CHBatch
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.exception.{GlutenException, GlutenNotSupportException}
import org.apache.gluten.execution._
import org.apache.gluten.expression._
Expand Down Expand Up @@ -452,12 +452,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
val readBatchNumRows = metrics("avgReadBatchNumRows")
val numOutputRows = metrics("numOutputRows")
val dataSize = metrics("dataSize")
if (GlutenConfig.getConf.isUseCelebornShuffleManager) {
if (GlutenConfig.get.isUseCelebornShuffleManager) {
val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CHCelebornColumnarBatchSerializer")
val constructor =
clazz.getConstructor(classOf[SQLMetric], classOf[SQLMetric], classOf[SQLMetric])
constructor.newInstance(readBatchNumRows, numOutputRows, dataSize).asInstanceOf[Serializer]
} else if (GlutenConfig.getConf.isUseUniffleShuffleManager) {
} else if (GlutenConfig.get.isUseUniffleShuffleManager) {
throw new UnsupportedOperationException("temporarily uniffle not support ch ")
} else {
new CHColumnarBatchSerializer(readBatchNumRows, numOutputRows, dataSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.ValidatorApi
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.SubstraitContext
Expand Down Expand Up @@ -95,7 +95,7 @@ class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logg
}
case rangePartitoning: RangePartitioning =>
if (
GlutenConfig.getConf.enableColumnarSort &&
GlutenConfig.get.enableColumnarSort &&
RangePartitionerBoundsGenerator.supportedOrderings(rangePartitoning, child)
) {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand All @@ -39,8 +39,8 @@ class CommonSubexpressionEliminateRule(spark: SparkSession) extends Rule[Logical
override def apply(plan: LogicalPlan): LogicalPlan = {
val newPlan =
if (
plan.resolved && GlutenConfig.getConf.enableGluten
&& GlutenConfig.getConf.enableCommonSubexpressionEliminate && !plan.fastEquals(lastPlan)
plan.resolved && GlutenConfig.get.enableGluten
&& GlutenConfig.get.enableCommonSubexpressionEliminate && !plan.fastEquals(lastPlan)
) {
lastPlan = plan
visitPlan(plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, CountDistinct}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -32,9 +32,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.AGGREGATE_EXPRESSION
*/
object CountDistinctWithoutExpand extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
if (
GlutenConfig.getConf.enableGluten && GlutenConfig.getConf.enableCountDistinctWithoutExpand
) {
if (GlutenConfig.get.enableGluten && GlutenConfig.get.enableCountDistinctWithoutExpand) {
plan.transformAllExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
case ae: AggregateExpression
if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand All @@ -36,7 +36,7 @@ object ExtendedGeneratorNestedColumnAliasing {
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
case pj @ Project(projectList, f @ Filter(condition, g: Generate))
if canPruneGenerator(g.generator) &&
GlutenConfig.getConf.enableExtendedColumnPruning &&
GlutenConfig.get.enableExtendedColumnPruning &&
(SQLConf.get.nestedPruningOnExpressions || SQLConf.get.nestedSchemaPruningEnabled) =>
val attrToExtractValues =
getAttributeToExtractValues(projectList ++ g.generator.children :+ condition, Seq.empty)
Expand Down
Loading

0 comments on commit 3c53f9f

Please sign in to comment.