Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.2 #587

Merged
merged 35 commits into from
Jan 19, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
30e7004
bumped dev version
metasim Mar 14, 2022
0b4309e
CI fix.
metasim Apr 7, 2022
80e6992
Dependency updates.
metasim Apr 7, 2022
d0e5bd5
Spark 3.1.3
metasim May 4, 2022
ff00b41
bumped dev version
metasim Mar 14, 2022
7f5e078
withNewChildrenInternal
echeipesh Jun 30, 2022
160f351
Try Aggregator implemtnation
echeipesh Jun 30, 2022
eb8ccb9
more explicit
echeipesh Jun 30, 2022
a92ee4e
Fix UDF style Aggregates
echeipesh Jul 6, 2022
8da8bd7
Bring in the Kryo setup
echeipesh Dec 5, 2022
ef2f4ee
Register functions directly
echeipesh Dec 5, 2022
d12ace5
Bump versions
echeipesh Dec 5, 2022
aab5486
Landsat PDS is gone :(
echeipesh Dec 9, 2022
a3ac4cf
Fix Resample and ResampleNearest
echeipesh Dec 10, 2022
0214fa2
fix masking functions
echeipesh Dec 11, 2022
285e03d
Fix test: 6900 bit at position 4 is 1 -- expect NODATA after mask
echeipesh Dec 11, 2022
725c9d5
TileRasterizerAggregate expects column in rf_raster_proj order
echeipesh Dec 11, 2022
91468b4
Use spark-testing-base - core tests green
echeipesh Dec 11, 2022
72a76a0
Update StacApiDataSourceTest.scala
echeipesh Dec 14, 2022
ae3acc4
disable GeoTrellisDataSourceSpec
echeipesh Dec 14, 2022
460971a
Shade caffeine
echeipesh Dec 14, 2022
b937a70
Merge branch 'develop' into spark-3.2
echeipesh Dec 14, 2022
43e8d3d
boop
echeipesh Dec 14, 2022
d1cfb99
Expressions constructors toSeq conversion
pomadchin Jan 3, 2023
b28a10b
Downgrade scaffeine to 4.1.0 for JDK 8 support in caffeine 2.9
echeipesh Jan 3, 2023
15b420c
pyspark version 3.2.1
echeipesh Jan 3, 2023
05b4c44
why exclude log4j ? tests need it
echeipesh Jan 3, 2023
ec3c5f4
GitHub actions build.
metasim Sep 27, 2021
3a7b90f
Fix formatting
echeipesh Jan 3, 2023
4f24ad5
Fix Expressions arity issue
pomadchin Jan 3, 2023
9be3cb6
Add .jvmopts
pomadchin Jan 3, 2023
61081b7
Fix: Mask operations preserver the target tile cell type
echeipesh Jan 4, 2023
b14adaa
Pin GitHub Actions to ubuntu-20.04
echeipesh Jan 13, 2023
df552b8
Implement withNewChildrenInternal directly
echeipesh Jan 13, 2023
c5cf70c
Remove python build from CI
echeipesh Jan 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ lazy val core = project
ExclusionRule(organization = "com.github.mpilquist")
),
scaffeine,
scalatest,
sparktestingbase excludeAll ExclusionRule("org.scala-lang.modules", "scala-xml_2.12"),
`scala-logging`
),
libraryDependencies ++= {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ class RasterRefIT extends TestEnvironment {
describe("practical subregion reads") {
it("should construct a natural color composite") {
import spark.implicits._
def scene(idx: Int) = URI.create(s"https://landsat-pds.s3.us-west-2.amazonaws.com" +
s"/c1/L8/176/039/LC08_L1TP_176039_20190703_20190718_01_T1/LC08_L1TP_176039_20190703_20190718_01_T1_B$idx.TIF")
def scene(idx: Int) = TestData.remoteCOGSingleBand(idx)

val redScene = RFRasterSource(scene(4))
// [west, south, east, north]
Expand Down
119 changes: 14 additions & 105 deletions core/src/main/scala/org/apache/spark/sql/rf/VersionShims.scala
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
package org.apache.spark.sql.rf

import java.lang.reflect.Constructor

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, FunctionRegistryBase}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.{FUNC_ALIAS, FunctionBuilder}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.objects.{Invoke, InvokeLike}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExpressionDescription, ExpressionInfo}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.DataType

import scala.reflect._
import scala.util.{Failure, Success, Try}

/**
* Collection of Spark version compatibility adapters.
Expand All @@ -27,18 +23,6 @@ object VersionShims {
val lrClazz = classOf[LogicalRelation]
val ctor = lrClazz.getConstructors.head.asInstanceOf[Constructor[LogicalRelation]]
ctor.getParameterTypes.length match {
// In Spark 2.1.0 the signature looks like this:
//
// case class LogicalRelation(
// relation: BaseRelation,
// expectedOutputAttributes: Option[Seq[Attribute]] = None,
// catalogTable: Option[CatalogTable] = None)
// extends LeafNode with MultiInstanceRelation
// In Spark 2.2.0 it's like this:
// case class LogicalRelation(
// relation: BaseRelation,
// output: Seq[AttributeReference],
// catalogTable: Option[CatalogTable])
case 3 =>
val arg2: Seq[AttributeReference] = lr.output
val arg3: Option[CatalogTable] = lr.catalogTable
Expand All @@ -49,14 +33,6 @@ object VersionShims {
ctor.newInstance(base, arg2, arg3)
}

// In Spark 2.3.0 this signature is this:
//
// case class LogicalRelation(
// relation: BaseRelation,
// output: Seq[AttributeReference],
// catalogTable: Option[CatalogTable],
// override val isStreaming: Boolean)
// extends LeafNode with MultiInstanceRelation {
case 4 =>
val arg2: Seq[AttributeReference] = lr.output
val arg3: Option[CatalogTable] = lr.catalogTable
Expand All @@ -75,25 +51,8 @@ object VersionShims {
val ctor = classOf[Invoke].getConstructors.head
val TRUE = Boolean.box(true)
ctor.getParameterTypes.length match {
// In Spark 2.1.0 the signature looks like this:
//
// case class Invoke(
// targetObject: Expression,
// functionName: String,
// dataType: DataType,
// arguments: Seq[Expression] = Nil,
// propagateNull: Boolean = true) extends InvokeLike
case 5 =>
ctor.newInstance(targetObject, functionName, dataType, Nil, TRUE).asInstanceOf[InvokeLike]
// In spark 2.2.0 the signature looks like this:
//
// case class Invoke(
// targetObject: Expression,
// functionName: String,
// dataType: DataType,
// arguments: Seq[Expression] = Nil,
// propagateNull: Boolean = true,
// returnNullable : Boolean = true) extends InvokeLike
case 6 =>
ctor.newInstance(targetObject, functionName, dataType, Nil, TRUE, TRUE).asInstanceOf[InvokeLike]

Expand Down Expand Up @@ -125,68 +84,18 @@ object VersionShims {
}
}

// Much of the code herein is copied from org.apache.spark.sql.catalyst.analysis.FunctionRegistry
def registerExpression[T <: Expression: ClassTag](name: String): Unit = {
val clazz = classTag[T].runtimeClass

def expressionInfo: ExpressionInfo = {
val df = clazz.getAnnotation(classOf[ExpressionDescription])
if (df != null) {
if (df.extended().isEmpty) {
new ExpressionInfo(clazz.getCanonicalName, null, name, df.usage(), df.arguments(), df.examples(), df.note(), df.group(), df.since(), df.deprecated())
} else {
// This exists for the backward compatibility with old `ExpressionDescription`s defining
// the extended description in `extended()`.
new ExpressionInfo(clazz.getCanonicalName, null, name, df.usage(), df.extended())
}
} else {
new ExpressionInfo(clazz.getCanonicalName, name)
}
def registerExpression[T <: Expression : ClassTag](
name: String,
setAlias: Boolean = false,
since: Option[String] = None
): (String, (ExpressionInfo, FunctionBuilder)) = {
val (expressionInfo, builder) = FunctionRegistryBase.build[T](name, since)
val newBuilder = (expressions: Seq[Expression]) => {
val expr = builder(expressions)
if (setAlias) expr.setTagValue(FUNC_ALIAS, name)
expr
}
def findBuilder: FunctionBuilder = {
val constructors = clazz.getConstructors
// See if we can find a constructor that accepts Seq[Expression]
val varargCtor = constructors.find(_.getParameterTypes.toSeq == Seq(classOf[Seq[_]]))
val builder = (expressions: Seq[Expression]) => {
if (varargCtor.isDefined) {
// If there is an apply method that accepts Seq[Expression], use that one.
Try(varargCtor.get.newInstance(expressions).asInstanceOf[Expression]) match {
case Success(e) => e
case Failure(e) =>
// the exception is an invocation exception. To get a meaningful message, we need the
// cause.
throw new AnalysisException(e.getCause.getMessage)
}
} else {
// Otherwise, find a constructor method that matches the number of arguments, and use that.
val params = Seq.fill(expressions.size)(classOf[Expression])
val f = constructors.find(_.getParameterTypes.toSeq == params).getOrElse {
val validParametersCount = constructors
.filter(_.getParameterTypes.forall(_ == classOf[Expression]))
.map(_.getParameterCount).distinct.sorted
val expectedNumberOfParameters = if (validParametersCount.length == 1) {
validParametersCount.head.toString
} else {
validParametersCount.init.mkString("one of ", ", ", " and ") +
validParametersCount.last
}
throw new AnalysisException(s"Invalid number of arguments for function ${clazz.getSimpleName}. " +
s"Expected: $expectedNumberOfParameters; Found: ${params.length}")
}
Try(f.newInstance(expressions : _*).asInstanceOf[Expression]) match {
case Success(e) => e
case Failure(e) =>
// the exception is an invocation exception. To get a meaningful message, we need the
// cause.
throw new AnalysisException(e.getCause.getMessage)
}
}
}

builder
}

registry.registerFunction(FunctionIdentifier(name), expressionInfo, findBuilder)
(name, (expressionInfo, newBuilder))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ import com.typesafe.scalalogging.Logger
import geotrellis.raster.Tile
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.BinaryExpression
import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, Expression}
import org.apache.spark.sql.types.DataType
import org.locationtech.rasterframes.expressions.DynamicExtractors._
import org.slf4j.LoggerFactory

/** Operation combining two tiles or a tile and a scalar into a new tile. */
trait BinaryRasterFunction extends BinaryExpression with RasterResult {
trait BinaryRasterFunction extends BinaryExpression with RasterResult { self: HasBinaryExpressionCopy =>
override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression = copy(newLeft, newRight)
echeipesh marked this conversation as resolved.
Show resolved Hide resolved

@transient protected lazy val logger = Logger(LoggerFactory.getLogger(getClass.getName))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import geotrellis.raster.{CellGrid, Neighborhood, Raster, TargetCell, Tile}
import geotrellis.vector.Extent
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.jts.JTSTypes
import org.apache.spark.sql.rf.{RasterSourceUDT, TileUDT}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -106,6 +107,24 @@ object DynamicExtractors {
(row: InternalRow) => row.as[ProjectedRasterTile]
}

lazy val intArrayExtractor: PartialFunction[DataType, ArrayData => Array[Int]] = {
case ArrayType(t, true) =>
throw new IllegalArgumentException(s"Can't turn array of $t to array<int>")
case ArrayType(DoubleType, false) =>
unsafe => unsafe.toDoubleArray.map(_.toInt)
case ArrayType(FloatType, false) =>
unsafe => unsafe.toFloatArray.map(_.toInt)
case ArrayType(IntegerType, false) =>
unsafe => unsafe.toIntArray
case ArrayType(ShortType, false) =>
unsafe => unsafe.toShortArray.map(_.toInt)
case ArrayType(ByteType, false) =>
unsafe => unsafe.toByteArray.map(_.toInt)
case ArrayType(BooleanType, false) =>
unsafe => unsafe.toBooleanArray().map(x => if (x) 1 else 0)

}

lazy val crsExtractor: PartialFunction[DataType, Any => CRS] = {
val base: PartialFunction[DataType, Any => CRS] = {
case _: StringType => (v: Any) => LazyCRS(v.asInstanceOf[UTF8String].toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import geotrellis.raster.CellGrid
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.UnaryExpression
import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}

/**
* Implements boilerplate for subtype expressions processing TileUDT, RasterSourceUDT, and RasterRefs
* as Grid types.
*
* @since 11/4/18
*/
trait OnCellGridExpression extends UnaryExpression {
trait OnCellGridExpression extends UnaryExpression { self: HasUnaryExpressionCopy =>
override protected def withNewChildInternal(newChild: Expression): Expression = copy(newChild)

private lazy val fromRow: InternalRow => CellGrid[Int] = {
if (child.resolved) gridExtractor(child.dataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.locationtech.rasterframes.expressions.DynamicExtractors._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.UnaryExpression
import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
import org.locationtech.rasterframes.model.TileContext

/**
Expand All @@ -34,7 +34,8 @@ import org.locationtech.rasterframes.model.TileContext
*
* @since 11/3/18
*/
trait OnTileContextExpression extends UnaryExpression {
trait OnTileContextExpression extends UnaryExpression { self: HasUnaryExpressionCopy =>
override protected def withNewChildInternal(newChild: Expression): Expression = copy(newChild)

override def checkInputDataTypes(): TypeCheckResult = {
if (!projectedRasterLikeExtractor.isDefinedAt(child.dataType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ import org.locationtech.geomesa.spark.jts.udf.SpatialRelationFunctions._
*
* @since 12/28/17
*/
abstract class SpatialRelation extends BinaryExpression with CodegenFallback {
abstract class SpatialRelation extends BinaryExpression with CodegenFallback { this: HasBinaryExpressionCopy =>

override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression =
copy(left = newLeft, right = newRight)

def extractGeometry(expr: Expression, input: Any): Geometry = {
input match {
Expand Down Expand Up @@ -72,8 +75,11 @@ object SpatialRelation {
type RelationPredicate = (Geometry, Geometry) => java.lang.Boolean

case class Intersects(left: Expression, right: Expression) extends SpatialRelation {
override def nodeName = "intersects"
override def nodeName: String = "intersects"
val relation = ST_Intersects

override protected def withNewChildrenInternal(newLeft: Expression, newRight: Expression): Expression =
copy(left = newLeft, right = newRight)
}
case class Contains(left: Expression, right: Expression) extends SpatialRelation {
override def nodeName = "contains"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ case class TileAssembler(

def serialize(buffer: TileBuffer): Array[Byte] = buffer.serialize()
def deserialize(storageFormat: Array[Byte]): TileBuffer = new TileBuffer(storageFormat)

override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy(
colIndex = newChildren(0),
rowIndex = newChildren(1),
cellValue = newChildren(2),
tileCols = newChildren(3),
tileRows = newChildren(4)
)
}

object TileAssembler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ import org.locationtech.rasterframes.encoders.syntax._
import scala.reflect.runtime.universe._

/** Mixin providing boilerplate for DeclarativeAggrates over tile-conforming columns. */
trait UnaryRasterAggregate extends DeclarativeAggregate {
trait UnaryRasterAggregate extends DeclarativeAggregate { self: HasUnaryExpressionCopy =>
def child: Expression

def nullable: Boolean = child.nullable

def children = Seq(child)
def children: Seq[Expression] = Seq(child)

protected def tileOpAsExpression[R: TypeTag](name: String, op: Tile => R): Expression => ScalaUDF =
udfiexpr[R, Any](name, (dataType: DataType) => (a: Any) => if(a == null) null.asInstanceOf[R] else op(UnaryRasterAggregate.extractTileFromAny(dataType, a)))

override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy(newChildren(0))
}

object UnaryRasterAggregate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import org.locationtech.rasterframes.expressions.DynamicExtractors._
import geotrellis.raster.Tile
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.{TypeCheckFailure, TypeCheckSuccess}
import org.apache.spark.sql.catalyst.expressions.UnaryExpression
import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
import org.locationtech.rasterframes.model.TileContext

/** Boilerplate for expressions operating on a single Tile-like . */
trait UnaryRasterFunction extends UnaryExpression {
trait UnaryRasterFunction extends UnaryExpression { self: HasUnaryExpressionCopy =>
override protected def withNewChildInternal(newChild: Expression): Expression = copy(newChild)

override def checkInputDataTypes(): TypeCheckResult = {
if (!tileExtractor.isDefinedAt(child.dataType)) {
TypeCheckFailure(s"Input type '${child.dataType}' does not conform to a raster type.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ package org.locationtech.rasterframes.expressions

import com.typesafe.scalalogging.Logger
import geotrellis.raster.Tile
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.types.DataType
import org.locationtech.rasterframes.model.TileContext
import org.slf4j.LoggerFactory

/** Operation on a tile returning a tile. */
trait UnaryRasterOp extends UnaryRasterFunction with RasterResult {
trait UnaryRasterOp extends UnaryRasterFunction with RasterResult { this: HasUnaryExpressionCopy =>
@transient protected lazy val logger = Logger(LoggerFactory.getLogger(getClass.getName))

def dataType: DataType = child.dataType
Expand All @@ -37,5 +38,7 @@ trait UnaryRasterOp extends UnaryRasterFunction with RasterResult {
toInternalRow(op(tile), ctx)

protected def op(child: Tile): Tile

override protected def withNewChildInternal(newChild: Expression): Expression = copy(newChild)
}

Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ case class GetCRS(child: Expression) extends UnaryExpression with CodegenFallbac
}
}

override protected def withNewChildInternal(newChild: Expression): Expression = copy(newChild)
}

object GetCRS {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ case class GetCellType(child: Expression) extends OnCellGridExpression with Code

/** Implemented by subtypes to process incoming ProjectedRasterLike entity. */
def eval(cg: CellGrid[Int]): Any = resultConverter(cg.cellType)

override protected def withNewChildInternal(newChild: Expression): Expression = copy(newChild)
}

object GetCellType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ case class GetEnvelope(child: Expression) extends UnaryExpression with CodegenFa
}

def dataType: DataType = envelopeEncoder.schema

override protected def withNewChildInternal(newChild: Expression): Expression = copy(newChild)
}

object GetEnvelope {
Expand Down
Loading