Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8327][CORE] Introduce the ConfigEntry to make the config definition more flexible #8328

Merged
merged 1 commit into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.shuffle

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.vectorized.BlockOutputStream
import org.apache.gluten.vectorized.CHStreamReader

Expand Down Expand Up @@ -59,14 +59,14 @@ private class CHCelebornColumnarBatchSerializerInstance(
with Logging {

private lazy val conf = SparkEnv.get.conf
private lazy val gluten_conf = GlutenConfig.getConf
private lazy val gluten_conf = GlutenConfig.get
private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
GlutenConfig.get.columnarShuffleCodecBackend.orNull)

override def deserializeStream(in: InputStream): DeserializationStream = {
new DeserializationStream {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.shuffle

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.clickhouse.CHBackendSettings
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.ColumnarNativeIterator
import org.apache.gluten.memory.CHThreadGroup
import org.apache.gluten.vectorized._
Expand Down Expand Up @@ -80,10 +80,10 @@ class CHCelebornColumnarShuffleWriter[K, V](
nativeBufferSize,
capitalizedCompressionCodec,
compressionLevel,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
GlutenConfig.get.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
GlutenConfig.get.chColumnarForceMemorySortShuffle
|| ShuffleMode.SORT.name.equalsIgnoreCase(shuffleWriterType)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.spark.sql.delta

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.clickhouse.CHConf
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.SparkException
import org.apache.spark.sql.Dataset
Expand Down Expand Up @@ -68,7 +68,7 @@ class ClickhouseOptimisticTransaction(
writeOptions: Option[DeltaOptions],
isOptimize: Boolean,
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
val nativeWrite = GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
val nativeWrite = GlutenConfig.get.enableNativeWriter.getOrElse(false)
if (writingMergeTree) {
// TODO: update FallbackByBackendSettings for mergetree always return true
val onePipeline = nativeWrite && CHConf.get.enableOnePipelineMergeTreeWrite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution.datasources

import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, GlutenPlan, SortExecTransformer}
import org.apache.spark.SparkConf
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.gluten.execution.iceberg

import org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, IcebergScanTransformer}
import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row

Expand Down Expand Up @@ -62,7 +62,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(
s"""
Expand All @@ -84,7 +84,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite

// Partition key of integer type.
withSQLConf(
GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
GlutenConfig.GLUTEN_ENABLED.key -> "false"
) {
// Gluten does not support write iceberg table.
spark.sql(
Expand Down Expand Up @@ -145,7 +145,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(
s"""
Expand All @@ -167,7 +167,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite

// Partition key of integer type.
withSQLConf(
GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
GlutenConfig.GLUTEN_ENABLED.key -> "false"
) {
// Gluten does not support write iceberg table.
spark.sql(
Expand Down Expand Up @@ -228,7 +228,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
val rightTable = "p_int_tb"
withTable(leftTable, rightTable) {
// Partition key of string type.
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
// Gluten does not support write iceberg table.
spark.sql(
s"""
Expand All @@ -250,7 +250,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite

// Partition key of integer type.
withSQLConf(
GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
GlutenConfig.GLUTEN_ENABLED.key -> "false"
) {
// Gluten does not support write iceberg table.
spark.sql(
Expand Down Expand Up @@ -350,7 +350,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite

test("iceberg read mor table - delete and update") {
withTable("iceberg_mor_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
spark.sql(
"""
|create table iceberg_mor_tb (
Expand Down Expand Up @@ -403,7 +403,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
// TODO: support merge-on-read mode
ignore("iceberg read mor table - delete and update with merge-on-read mode") {
withTable("iceberg_mor_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
spark.sql(
"""
|create table iceberg_mor_tb (
Expand Down Expand Up @@ -458,7 +458,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite

test("iceberg read mor table - merge into") {
withTable("iceberg_mor_tb", "merge_into_source_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
spark.sql(
"""
|create table iceberg_mor_tb (
Expand Down Expand Up @@ -531,7 +531,7 @@ class ClickHouseIcebergSuite extends GlutenClickHouseWholeStageTransformerSuite
// TODO: support merge-on-read mode
ignore("iceberg read mor table - merge into with merge-on-read mode") {
withTable("iceberg_mor_tb", "merge_into_source_tb") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
spark.sql(
"""
|create table iceberg_mor_tb (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.gluten.vectorized;

import org.apache.gluten.GlutenConfig;
import org.apache.gluten.backendsapi.BackendsApiManager;
import org.apache.gluten.config.GlutenConfig;
import org.apache.gluten.execution.ColumnarNativeIterator;
import org.apache.gluten.memory.CHThreadGroup;
import org.apache.gluten.utils.ConfigUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi._
import org.apache.gluten.columnarbatch.CHBatch
import org.apache.gluten.component.Component.BuildInfo
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.WriteFilesExecTransformer
import org.apache.gluten.expression.WindowFunctionsBuilder
import org.apache.gluten.extension.ValidationResult
Expand Down Expand Up @@ -282,11 +282,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

override def supportSortExec(): Boolean = {
GlutenConfig.getConf.enableColumnarSort
GlutenConfig.get.enableColumnarSort
}

override def supportSortMergeJoinExec(): Boolean = {
GlutenConfig.getConf.enableColumnarSortMergeJoin
GlutenConfig.get.enableColumnarSortMergeJoin
}

override def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = {
Expand Down Expand Up @@ -391,7 +391,7 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}

override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
GlutenConfig.get.enableNativeWriter.getOrElse(false)
}

override def supportCartesianProductExec(): Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.internal.SQLConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenNumaBindingInfo
import org.apache.gluten.backendsapi.IteratorApi
import org.apache.gluten.config.GlutenNumaBindingInfo
import org.apache.gluten.execution._
import org.apache.gluten.expression.ConverterUtils
import org.apache.gluten.logging.LogLevelUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.ListenerApi
import org.apache.gluten.columnarbatch.CHBatch
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.CHBroadcastBuildSideCache
import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.expression.UDFMappings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.RuleApi
import org.apache.gluten.columnarbatch.CHBatch
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
import org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow, RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.exception.{GlutenException, GlutenNotSupportException}
import org.apache.gluten.execution._
import org.apache.gluten.expression._
Expand Down Expand Up @@ -452,12 +452,12 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {
val readBatchNumRows = metrics("avgReadBatchNumRows")
val numOutputRows = metrics("numOutputRows")
val dataSize = metrics("dataSize")
if (GlutenConfig.getConf.isUseCelebornShuffleManager) {
if (GlutenConfig.get.isUseCelebornShuffleManager) {
val clazz = ClassUtils.getClass("org.apache.spark.shuffle.CHCelebornColumnarBatchSerializer")
val constructor =
clazz.getConstructor(classOf[SQLMetric], classOf[SQLMetric], classOf[SQLMetric])
constructor.newInstance(readBatchNumRows, numOutputRows, dataSize).asInstanceOf[Serializer]
} else if (GlutenConfig.getConf.isUseUniffleShuffleManager) {
} else if (GlutenConfig.get.isUseUniffleShuffleManager) {
throw new UnsupportedOperationException("temporarily uniffle not support ch ")
} else {
new CHColumnarBatchSerializer(readBatchNumRows, numOutputRows, dataSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.ValidatorApi
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.substrait.SubstraitContext
Expand Down Expand Up @@ -95,7 +95,7 @@ class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logg
}
case rangePartitoning: RangePartitioning =>
if (
GlutenConfig.getConf.enableColumnarSort &&
GlutenConfig.get.enableColumnarSort &&
RangePartitionerBoundsGenerator.supportedOrderings(rangePartitoning, child)
) {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand All @@ -39,8 +39,8 @@ class CommonSubexpressionEliminateRule(spark: SparkSession) extends Rule[Logical
override def apply(plan: LogicalPlan): LogicalPlan = {
val newPlan =
if (
plan.resolved && GlutenConfig.getConf.enableGluten
&& GlutenConfig.getConf.enableCommonSubexpressionEliminate && !plan.fastEquals(lastPlan)
plan.resolved && GlutenConfig.get.enableGluten
&& GlutenConfig.get.enableCommonSubexpressionEliminate && !plan.fastEquals(lastPlan)
) {
lastPlan = plan
visitPlan(plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, CountDistinct}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -32,9 +32,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.AGGREGATE_EXPRESSION
*/
object CountDistinctWithoutExpand extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = {
if (
GlutenConfig.getConf.enableGluten && GlutenConfig.getConf.enableCountDistinctWithoutExpand
) {
if (GlutenConfig.get.enableGluten && GlutenConfig.get.enableCountDistinctWithoutExpand) {
plan.transformAllExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
case ae: AggregateExpression
if ae.isDistinct && ae.aggregateFunction.isInstanceOf[Count] &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension

import org.apache.gluten.GlutenConfig
import org.apache.gluten.config.GlutenConfig

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
Expand All @@ -36,7 +36,7 @@ object ExtendedGeneratorNestedColumnAliasing {
def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
case pj @ Project(projectList, f @ Filter(condition, g: Generate))
if canPruneGenerator(g.generator) &&
GlutenConfig.getConf.enableExtendedColumnPruning &&
GlutenConfig.get.enableExtendedColumnPruning &&
(SQLConf.get.nestedPruningOnExpressions || SQLConf.get.nestedSchemaPruningEnabled) =>
val attrToExtractValues =
getAttributeToExtractValues(projectList ++ g.generator.children :+ condition, Seq.empty)
Expand Down
Loading
Loading