Skip to content

Commit

Permalink
SPARK-45959. added new tests. Handled flattening of Project when done…
Browse files Browse the repository at this point in the history
… using dataFrame.select instead of withColumn api
  • Loading branch information
ashahid committed Dec 15, 2023
1 parent 73f869a commit 7455648
Showing 1 changed file with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.internal

import scala.collection.mutable
import scala.util.{Failure, Success, Try}

import org.apache.spark.sql.{Dataset, RuntimeConfig}
Expand All @@ -27,7 +28,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.types.MetadataBuilder



private[sql] object EasilyFlattenable {
object OpType extends Enumeration {
type OpType = Value
Expand Down Expand Up @@ -140,7 +140,14 @@ private[sql] object EasilyFlattenable {
}
remappedNewProjListResult match {
case Success(remappedNewProjList) =>
Option(p.copy(projectList = remappedNewProjList))
val newProj = p.copy(projectList = remappedNewProjList)
if (conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
val dsIds = p.getTagValue(Dataset.DATASET_ID_TAG).map(_.clone()).getOrElse (
new mutable.HashSet[Long])

newProj.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
}
Option(newProj)

case Failure(_) => None
}
Expand Down Expand Up @@ -187,8 +194,13 @@ private[sql] object EasilyFlattenable {
}
remappedNewProjListResult match {
case Success(remappedNewProjList) =>

val newProj = p.copy(projectList = remappedNewProjList)
if (conf.get(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED)) {
val dsIds = p.getTagValue(Dataset.DATASET_ID_TAG).map(_.clone()).getOrElse(
new mutable.HashSet[Long])

newProj.setTagValue(Dataset.DATASET_ID_TAG, dsIds)
}

Option(newProj)

Expand Down

0 comments on commit 7455648

Please sign in to comment.