Skip to content

Commit

Permalink
Fix build failures
Browse files Browse the repository at this point in the history
Co-authored-by: Thang Long VU <long.vu@databricks.com>
  • Loading branch information
vkorukanti and longvu-db committed Sep 9, 2024
1 parent 15da4aa commit 4dc759c
Show file tree
Hide file tree
Showing 44 changed files with 180 additions and 20 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/spark_master_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ jobs:
- name: Run Spark Master tests
# when changing TEST_PARALLELISM_COUNT make sure to also change it in spark_test.yaml
run: |
TEST_PARALLELISM_COUNT=4 SHARD_ID=${{matrix.shard}} build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
# Temporarily change to run Spark Connect tests
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/test
TEST_PARALLELISM_COUNT=4 build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean connectServer/assembly connectClient/test
TEST_PARALLELISM_COUNT=4 SHARD_ID=${{matrix.shard}} build/sbt -DsparkVersion=master "++ ${{ matrix.scala }}" clean spark/test
if: steps.git-diff.outputs.diff
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ lazy val connectClient = (project in file("spark-connect/client"))
}
}.taskValue,
(Test / resourceGenerators) += Def.task {
val src = url("https://repository.apache.org/content/groups/public/org/apache/spark/spark-connect_2.13/4.0.0-preview1/spark-connect_2.13-4.0.0-preview1.jar")
val src = url("https://repository.apache.org/content/groups/snapshots/org/apache/spark/spark-connect_2.13/4.0.0-SNAPSHOT/spark-connect_2.13-4.0.0-20240830.001644-352.jar")
val dest = (Test / resourceManaged).value / "spark-connect.jar"
if (!dest.exists()) {
src #> dest !;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.annotation.Evolving
import org.apache.spark.sql.{functions, Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.PrimitiveBooleanEncoder
import org.apache.spark.sql.connect.delta.ImplicitProtoConversions._
import org.apache.spark.sql.internal.ColumnNodeToProtoConverter.toExpr

/**
* Main class for programmatically interacting with Delta tables.
Expand Down Expand Up @@ -137,7 +138,7 @@ class DeltaTable private[tables](
val delete = proto.DeleteFromTable
.newBuilder()
.setTarget(df.plan.getRoot)
condition.foreach(c => delete.setCondition(c.expr))
condition.foreach(c => delete.setCondition(toExpr(c)))
val relation = proto.DeltaRelation.newBuilder().setDeleteFromTable(delete).build()
val extension = com.google.protobuf.Any.pack(relation)
val sparkRelation = spark_proto.Relation.newBuilder().setExtension(extension).build()
Expand Down Expand Up @@ -188,15 +189,15 @@ class DeltaTable private[tables](
val assignments = set.toSeq.map { case (field, value) =>
proto.Assignment
.newBuilder()
.setField(functions.expr(field).expr)
.setValue(value.expr)
.setField(toExpr(functions.expr(field)))
.setValue(toExpr(value))
.build()
}
val update = proto.UpdateTable
.newBuilder()
.setTarget(df.plan.getRoot)
.addAllAssignments(assignments.asJava)
condition.foreach(c => update.setCondition(c.expr))
condition.foreach(c => update.setCondition(toExpr(c)))
val relation = proto.DeltaRelation.newBuilder().setUpdateTable(update).build()
val extension = com.google.protobuf.Any.pack(relation)
val sparkRelation = spark_proto.Relation.newBuilder().setExtension(extension).build()
Expand Down
35 changes: 35 additions & 0 deletions spark/src/main/scala-spark-3.5/shims/ColumnExtShim.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.Expression

/**
* This shim is introduced to due to breaking `Column` API changes in Spark master with
* apache/spark#47785. It removed the following two APIs (both of which are already
* available in 3.5 and not needed to be shimmed):
* - `Column.expr`
* - `Column.apply(Expression)`
*/
object ColumnImplicitsShim {
/**
* Implicitly convert a [[Column]] to an [[Expression]]. Sometimes the `Column.expr` extension
* above conflicts other implicit conversions, so this method can be explicitly used.
*/
def expression(column: Column): Expression = {
column.expr
}
}
53 changes: 53 additions & 0 deletions spark/src/main/scala-spark-master/shims/ColumnExtShim.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (2023) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.internal.ExpressionUtils

/**
* This shim is introduced to due to breaking `Column` API changes in Spark master with
* apache/spark#47785. It removed the following two APIs:
* - `Column.expr` - Replaced with `ExpressionUtils.expression(column)`
* - `Column.apply(Expression)` - Replaced with `ExpressionUtils.column(e)`
*/
object ColumnImplicitsShim {

/**
* Extend [[Column]] to provide `expr` method to get the [[Expression]].
* This avoids changing every `Column.expr` to `ExpressionUtils.expression(column)`.
*
* @param column The column to get the expression from.
*/
implicit class ColumnExprExt(val column: Column) extends AnyVal {
def expr: Expression = ExpressionUtils.expression(column)
}

/**
* Provide an implicit constructor to create a [[Column]] from an [[Expression]].
*/
implicit class ColumnConstructorExt(val c: Column.type) extends AnyVal {
def apply(e: Expression): Column = ExpressionUtils.column(e)
}

/**
* Implicitly convert a [[Column]] to an [[Expression]]. Sometimes the `Column.expr` extension
* above conflicts other implicit conversions, so this method can be explicitly used.
*/
def expression(column: Column): Expression = {
ExpressionUtils.expression(column)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta.util.AnalysisHelper
import org.apache.spark.annotation._
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, NamedExpression}
Expand Down
1 change: 1 addition & 0 deletions spark/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.annotation._
import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.types.StructType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.delta.commands.{DeltaGenerateCommand, DescribeDeltaD
import org.apache.spark.sql.delta.util.AnalysisHelper
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{functions, Column, DataFrame}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskType}
import org.apache.spark.TaskContext
import org.apache.spark.internal.MDC
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf, DeltaStreamUtils}

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{DataFrame, Dataset, Encoder}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.EqualNullSafe
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,13 @@ class DeltaAnalysis(session: SparkSession)
val v1TableName = child.identifier.asTableIdentifier
namespace.foreach { ns =>
if (v1TableName.database.exists(!resolver(_, ns.head))) {
// Temporary Comment: Spark Master
throw QueryCompilationErrors.showColumnsWithConflictNamespacesError(
Seq(ns.head), Seq(v1TableName.database.get))
// Temporary Comment: Spark 3.5
/*
throw QueryCompilationErrors.showColumnsWithConflictDatabasesError(ns, v1TableName)
*/
}
}
ShowDeltaTableColumnsCommand(child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.{FileSourceOptions, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStatistics, CatalogTable}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.AnalysisHelper

import org.apache.spark.sql.{AnalysisException, Column, Dataset, SparkSession}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkException
import org.apache.spark.internal.{MDC, MessageWithContext}
import org.apache.spark.sql.{AnalysisException, Column, DataFrame, SparkSession}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.internal.Logging
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, Encoder, SparkSession}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import com.fasterxml.jackson.databind.annotation.JsonDeserialize

import org.apache.spark.SparkContext
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkContext
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.CatalogTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, Literal}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ import org.apache.spark.sql.delta.stats.StatisticsCollection
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.MDC
import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, Row, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.util.ScalaExtensions.OptionExt

import org.apache.spark.internal.MDC
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Literal}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.delta.commands.merge.MergeOutputGeneration.{SOURCE_R
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
import org.apache.spark.sql.delta.util.SetAccumulator

import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Literal, Or}
import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeIntoClause
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
import org.apache.spark.sql.delta.commands.MergeIntoCommandBase

import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.expressions.{Alias, CaseWhen, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.delta.commands.MergeIntoCommandBase
import org.apache.spark.sql.delta.commands.cdc.CDCReader

import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkEnv
import org.apache.spark.internal.MDC
import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta
import org.apache.spark.sql.delta.OptimizablePartitionExpression._

import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{Cast, DateFormatClass, DayOfMonth, Expression, Hour, IsNull, Literal, Month, Or, Substring, TruncDate, TruncTimestamp, UnixTimestamp, Year}
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.spark.internal.MDC
import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions.AttributeReference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.delta.skipping
import org.apache.spark.sql.delta.expressions.{HilbertByteArrayIndex, HilbertLongIndex, InterleaveBits, RangePartitionId}

import org.apache.spark.SparkException
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.types.StringType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import org.apache.hadoop.fs.Path
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.internal.MDC
import org.apache.spark.sql._
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.catalyst.analysis.TableOutputResolver
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.types.DataTypeUtils
Expand Down Expand Up @@ -205,7 +207,7 @@ case class DeltaSink(
allowStructEvolution = canMergeSchema,
columnName = columnName
)
new Column(Alias(castExpr, columnName)())
Column(Alias(castExpr, columnName)())
}

data.queryExecution match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.stats
import org.apache.spark.sql.delta.stats.DeltaStatistics.{MAX, MIN}

import org.apache.spark.sql.Column
import org.apache.spark.sql.ColumnImplicitsShim._

/**
* A trait that defines interfaces for a data skipping predicate builder.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.delta.stats.DeltaStatistics._
import org.apache.spark.sql.delta.util.StateCache
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{DataFrame, _}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
Expand Down
Loading

0 comments on commit 4dc759c

Please sign in to comment.