Skip to content

Commit

Permalink
[SPARK-33389][SQL] Make internal classes of SparkSession always using…
Browse files Browse the repository at this point in the history
… active SQLConf
  • Loading branch information
luluorta committed Nov 10, 2020
1 parent 036c11b commit 90181f9
Show file tree
Hide file tree
Showing 56 changed files with 385 additions and 424 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import org.apache.spark.sql.internal.SQLConf

/**
* Trait for shared SQLConf.
*/
trait HasConf {

/**
* The active config object within the current scope.
* See [[SQLConf.get]] for more information.
*/
def conf: SQLConf = SQLConf.get
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.catalyst.util.toPrettySQL
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnChange, ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
import org.apache.spark.sql.connector.catalog.TableChange.{First => _, _}
import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform, Transform}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -58,15 +58,14 @@ import org.apache.spark.util.Utils
*/
object SimpleAnalyzer extends Analyzer(
new CatalogManager(
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true),
FakeV2SessionCatalog,
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) {
EmptyFunctionRegistry) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
}),
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
})) {
override def resolver: Resolver = caseSensitiveResolution
}

object FakeV2SessionCatalog extends TableCatalog {
private def fail() = throw new UnsupportedOperationException
Expand Down Expand Up @@ -130,10 +129,8 @@ object AnalysisContext {
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
*/
class Analyzer(
override val catalogManager: CatalogManager,
conf: SQLConf)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog {
class Analyzer(override val catalogManager: CatalogManager)
extends RuleExecutor[LogicalPlan] with CheckAnalysis with LookupCatalog with HasConf {

private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog

Expand All @@ -144,10 +141,8 @@ class Analyzer(
override def isView(nameParts: Seq[String]): Boolean = v1SessionCatalog.isView(nameParts)

// Only for tests.
def this(catalog: SessionCatalog, conf: SQLConf) = {
this(
new CatalogManager(conf, FakeV2SessionCatalog, catalog),
conf)
def this(catalog: SessionCatalog) = {
this(new CatalogManager(FakeV2SessionCatalog, catalog))
}

def executeAndCheck(plan: LogicalPlan, tracker: QueryPlanningTracker): LogicalPlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.HasConf
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, ListQuery, TimeZoneAwareExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType

/**
Expand Down Expand Up @@ -47,10 +47,7 @@ object ResolveTimeZone extends Rule[LogicalPlan] {
* Mix-in trait for constructing valid [[Cast]] expressions.
*/
trait CastSupport {
/**
* Configuration used to create a valid cast expression.
*/
def conf: SQLConf
self: HasConf =>

/**
* Create a Cast expression with the session local time zone.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf

/**
* This file defines view types and analysis rules related to views.
Expand Down Expand Up @@ -54,8 +53,6 @@ import org.apache.spark.sql.internal.SQLConf
* completely resolved during the batch of Resolution.
*/
object EliminateView extends Rule[LogicalPlan] with CastSupport {
override def conf: SQLConf = SQLConf.get

override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// The child has the different output attributes with the View operator. Adds a Project over
// the child of the view.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,34 +61,38 @@ class SessionCatalog(
externalCatalogBuilder: () => ExternalCatalog,
globalTempViewManagerBuilder: () => GlobalTempViewManager,
functionRegistry: FunctionRegistry,
conf: SQLConf,
hadoopConf: Configuration,
parser: ParserInterface,
functionResourceLoader: FunctionResourceLoader) extends Logging {
functionResourceLoader: FunctionResourceLoader,
cacheSize: Int = SQLConf.get.tableRelationCacheSize,
cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends Logging with HasConf {
import SessionCatalog._
import CatalogTypes.TablePartitionSpec

// For testing only.
def this(
externalCatalog: ExternalCatalog,
functionRegistry: FunctionRegistry,
conf: SQLConf) = {
staticConf: SQLConf) = {
this(
() => externalCatalog,
() => new GlobalTempViewManager(conf.getConf(GLOBAL_TEMP_DATABASE)),
() => new GlobalTempViewManager(staticConf.getConf(GLOBAL_TEMP_DATABASE)),
functionRegistry,
conf,
new Configuration(),
new CatalystSqlParser(conf),
DummyFunctionResourceLoader)
new CatalystSqlParser(),
DummyFunctionResourceLoader,
staticConf.tableRelationCacheSize,
staticConf.metadataCacheTTL)
}

// For testing only.
def this(externalCatalog: ExternalCatalog, functionRegistry: FunctionRegistry) = {
this(externalCatalog, functionRegistry, SQLConf.get)
}

// For testing only.
def this(externalCatalog: ExternalCatalog) = {
this(
externalCatalog,
new SimpleFunctionRegistry,
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))
this(externalCatalog, new SimpleFunctionRegistry)
}

lazy val externalCatalog = externalCatalogBuilder()
Expand Down Expand Up @@ -136,9 +140,6 @@ class SessionCatalog(
}

private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
val cacheSize = conf.tableRelationCacheSize
val cacheTTL = conf.metadataCacheTTL

var builder = CacheBuilder.newBuilder()
.maximumSize(cacheSize)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, HasConf, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -177,8 +177,7 @@ case class CatalogTablePartition(
case class BucketSpec(
numBuckets: Int,
bucketColumnNames: Seq[String],
sortColumnNames: Seq[String]) {
def conf: SQLConf = SQLConf.get
sortColumnNames: Seq[String]) extends HasConf {

if (numBuckets <= 0 || numBuckets > conf.bucketingMaxBuckets) {
throw new AnalysisException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,8 @@ object SimpleTestOptimizer extends SimpleTestOptimizer

class SimpleTestOptimizer extends Optimizer(
new CatalogManager(
new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true),
FakeV2SessionCatalog,
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, new SQLConf())))
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry)))

/**
* Remove redundant aliases from a query plan. A redundant alias is an alias that does not change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf

/**
* Collapse plans consisting empty local relations generated by [[PruneFilters]].
Expand All @@ -47,8 +46,6 @@ object PropagateEmptyRelation extends Rule[LogicalPlan] with PredicateHelper wit
private def nullValueProjectList(plan: LogicalPlan): Seq[NamedExpression] =
plan.output.map{ a => Alias(cast(Literal(null), a.dataType), a.name)(a.exprId) }

override def conf: SQLConf = SQLConf.get

def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case p: Union if p.children.exists(isEmptyLocalRelation) =>
val newChildren = p.children.filterNot(isEmptyLocalRelation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,16 @@ package org.apache.spark.sql.catalyst.optimizer

import scala.annotation.tailrec

import org.apache.spark.sql.catalyst.HasConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf

/**
* Encapsulates star-schema detection logic.
*/
object StarSchemaDetection extends PredicateHelper {

private def conf = SQLConf.get
object StarSchemaDetection extends PredicateHelper with HasConf {

/**
* Star schema consists of one or more fact tables referencing a number of dimension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, HasConf, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -51,11 +51,9 @@ import org.apache.spark.util.random.RandomSampler
* The AstBuilder converts an ANTLR4 ParseTree into a catalyst Expression, LogicalPlan or
* TableIdentifier.
*/
class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging {
class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging with HasConf {
import ParserUtils._

def this() = this(new SQLConf())

protected def typedVisit[T](ctx: ParseTree): T = {
ctx.accept(this).asInstanceOf[T]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.antlr.v4.runtime.tree.TerminalNodeImpl

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, HasConf, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.Origin
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
/**
* Base SQL parsing infrastructure.
*/
abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Logging {
abstract class AbstractSqlParser extends ParserInterface with Logging with HasConf {

/** Creates/Resolves DataType for a given SQL string. */
override def parseDataType(sqlText: String): DataType = parse(sqlText) { parser =>
Expand Down Expand Up @@ -138,14 +138,12 @@ abstract class AbstractSqlParser(conf: SQLConf) extends ParserInterface with Log
/**
* Concrete SQL parser for Catalyst-only SQL statements.
*/
class CatalystSqlParser(conf: SQLConf) extends AbstractSqlParser(conf) {
val astBuilder = new AstBuilder(conf)
class CatalystSqlParser extends AbstractSqlParser {
val astBuilder = new AstBuilder
}

/** For test-only. */
object CatalystSqlParser extends AbstractSqlParser(SQLConf.get) {
val astBuilder = new AstBuilder(SQLConf.get)
}
object CatalystSqlParser extends CatalystSqlParser

/**
* This string stream provides the lexer with upper case characters only. This greatly simplifies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans
import scala.collection.mutable

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.HasConf
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode, TreeNodeTag}
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -35,15 +36,9 @@ import org.apache.spark.sql.types.{DataType, StructType}
* The tree traverse APIs like `transform`, `foreach`, `collect`, etc. that are
* inherited from `TreeNode`, do not traverse into query plans inside subqueries.
*/
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] {
abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanType] with HasConf {
self: PlanType =>

/**
* The active config object within the current scope.
* See [[SQLConf.get]] for more information.
*/
def conf: SQLConf = SQLConf.get

def output: Seq[Attribute]

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
package org.apache.spark.sql.catalyst.rules

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.HasConf
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.internal.SQLConf

abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
abstract class Rule[TreeType <: TreeNode[_]] extends HasConf with Logging {

/** Name for this rule, automatically inferred based on class name. */
val ruleName: String = {
Expand All @@ -30,6 +30,4 @@ abstract class Rule[TreeType <: TreeNode[_]] extends Logging {
}

def apply(plan: TreeType): TreeType

def conf: SQLConf = SQLConf.get
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.connector.catalog
import scala.collection.mutable

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.HasConf
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -37,9 +38,8 @@ import org.apache.spark.sql.internal.SQLConf
// need to track current database at all.
private[sql]
class CatalogManager(
conf: SQLConf,
defaultSessionCatalog: CatalogPlugin,
val v1SessionCatalog: SessionCatalog) extends Logging {
val v1SessionCatalog: SessionCatalog) extends Logging with HasConf {
import CatalogManager.SESSION_CATALOG_NAME
import CatalogV2Util._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.connector.expressions.{BucketTransform, IdentityTransform, LogicalExpressions, Transform}
import org.apache.spark.sql.internal.SQLConf

/**
* Conversion helpers for working with v2 [[CatalogPlugin]].
Expand Down Expand Up @@ -143,7 +142,7 @@ private[sql] object CatalogV2Implicits {
}
}

private lazy val catalystSqlParser = new CatalystSqlParser(SQLConf.get)
private lazy val catalystSqlParser = new CatalystSqlParser()

def parseColumnPath(name: String): Seq[String] = {
catalystSqlParser.parseMultipartIdentifier(name)
Expand Down
Loading

0 comments on commit 90181f9

Please sign in to comment.