Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SQL] Fix logging warn -> debug #1800

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
Batch("Resolution", fixedPoint,
ResolveReferences ::
ResolveRelations ::
ResolveSortReferences ::
NewRelationInstances ::
ImplicitGenerate ::
StarExpansion ::
Expand Down Expand Up @@ -113,13 +114,58 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
q transformExpressions {
case u @ UnresolvedAttribute(name) =>
// Leave unchanged if resolution fails. Hopefully will be resolved next round.
val result = q.resolve(name).getOrElse(u)
val result = q.resolveChildren(name).getOrElse(u)
logDebug(s"Resolving $u to $result")
result
}
}
}

/**
* In many dialects of SQL is it valid to sort by attributes that are not present in the SELECT
* clause. This rule detects such queries and adds the required attributes to the original
* projection, so that they will be available during sorting. Another projection is added to
* remove these attributes after sorting.
*/
object ResolveSortReferences extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
case s @ Sort(ordering, p @ Project(projectList, child)) if !s.resolved && p.resolved =>
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
val resolved = unresolved.flatMap(child.resolveChildren)
val requiredAttributes = resolved.collect { case a: Attribute => a }.toSet

val missingInProject = requiredAttributes -- p.output
if (missingInProject.nonEmpty) {
// Add missing attributes and then project them away after the sort.
Project(projectList,
Sort(ordering,
Project(projectList ++ missingInProject, child)))
} else {
s // Nothing we can do here. Return original plan.
}
case s @ Sort(ordering, a @ Aggregate(grouping, aggs, child)) if !s.resolved && a.resolved =>
val unresolved = ordering.flatMap(_.collect { case UnresolvedAttribute(name) => name })
// A small hack to create an object that will allow us to resolve any references that
// refer to named expressions that are present in the grouping expressions.
val groupingRelation = LocalRelation(
grouping.collect { case ne: NamedExpression => ne.toAttribute }
)

logDebug(s"Grouping expressions: $groupingRelation")
val resolved = unresolved.flatMap(groupingRelation.resolve).toSet
val missingInAggs = resolved -- a.outputSet
logDebug(s"Resolved: $resolved Missing in aggs: $missingInAggs")
if (missingInAggs.nonEmpty) {
// Add missing grouping exprs and then project them away after the sort.
Project(a.output,
Sort(ordering,
Aggregate(grouping, aggs ++ missingInAggs, child)))
} else {
s // Nothing we can do here. Return original plan.
}
}
}

/**
* Replaces [[UnresolvedFunction]]s with concrete [[catalyst.expressions.Expression Expressions]].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,32 +72,45 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
def childrenResolved: Boolean = !children.exists(!_.resolved)

/**
* Optionally resolves the given string to a [[NamedExpression]]. The attribute is expressed as
* Optionally resolves the given string to a [[NamedExpression]] using the input from all child
* nodes of this LogicalPlan. The attribute is expressed as
* as string in the following form: `[scope].AttributeName.[nested].[fields]...`.
*/
def resolve(name: String): Option[NamedExpression] = {
def resolveChildren(name: String): Option[NamedExpression] =
resolve(name, children.flatMap(_.output))

/**
* Optionally resolves the given string to a [[NamedExpression]] based on the output of this
* LogicalPlan. The attribute is expressed as string in the following form:
* `[scope].AttributeName.[nested].[fields]...`.
*/
def resolve(name: String): Option[NamedExpression] =
resolve(name, output)

/** Performs attribute resolution given a name and a sequence of possible attributes. */
protected def resolve(name: String, input: Seq[Attribute]): Option[NamedExpression] = {
val parts = name.split("\\.")
// Collect all attributes that are output by this nodes children where either the first part
// matches the name or where the first part matches the scope and the second part matches the
// name. Return these matches along with any remaining parts, which represent dotted access to
// struct fields.
val options = children.flatMap(_.output).flatMap { option =>
val options = input.flatMap { option =>
// If the first part of the desired name matches a qualifier for this possible match, drop it.
val remainingParts =
if (option.qualifiers.contains(parts.head) && parts.size > 1) parts.drop(1) else parts
if (option.name == remainingParts.head) (option, remainingParts.tail.toList) :: Nil else Nil
}

options.distinct match {
case (a, Nil) :: Nil => Some(a) // One match, no nested fields, use it.
case Seq((a, Nil)) => Some(a) // One match, no nested fields, use it.
// One match, but we also need to extract the requested nested field.
case (a, nestedFields) :: Nil =>
case Seq((a, nestedFields)) =>
a.dataType match {
case StructType(fields) =>
Some(Alias(nestedFields.foldLeft(a: Expression)(GetField), nestedFields.last)())
case _ => None // Don't know how to resolve these field references
}
case Nil => None // No matches.
case Seq() => None // No matches.
case ambiguousReferences =>
throw new TreeNodeException(
this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.execution

import scala.reflect.ClassTag

import org.apache.spark.sql.{SQLConf, QueryTest}
import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._

/**
* A collection of hive query tests where we generate the answers ourselves instead of depending on
* Hive to generate them (in contrast to HiveQuerySuite). Often this is because the query is
* valid, but Hive currently cannot execute it.
*/
class SQLQuerySuite extends QueryTest {
test("ordering not in select") {
checkAnswer(
sql("SELECT key FROM src ORDER BY value"),
sql("SELECT key FROM (SELECT key, value FROM src ORDER BY value) a").collect().toSeq)
}

test("ordering not in agg") {
checkAnswer(
sql("SELECT key FROM src GROUP BY key, value ORDER BY value"),
sql("""
SELECT key
FROM (
SELECT key, value
FROM src
GROUP BY key, value
ORDER BY value) a""").collect().toSeq)
}
}