Skip to content

Commit

Permalink
[SEDONA-429][SEDONA-430] Support specifying GeoParquet spec version n…
Browse files Browse the repository at this point in the history
…umber and CRS (apache#1162)

* Support geoparquet.version and geoparquet.crs option for Spark 3.0~3.3

* Add tests for geoparquet.version and geoparquet.crs options for Spark 3.0~3.3

* Add documentation for geoparquet.version and geoparquet.crs options

* Apply this patch on Spark 3.4 and Spark 3.5

* Remove `Configuration.getPropsWithPrefix` to be compatible with lower versions of Hadoop

* Add notes about crs metadata in GeoParquet files

* Allow omitting CRS by setting geoparquet.crs to "" (empty string)

* Set default crs metadata to null

* Apply to Spark 3.4 and Spark 3.5

* Explain the behavior of geoparquet.crs option
  • Loading branch information
Kontinuation authored and jiayuasu committed Jan 8, 2024
1 parent 25793c9 commit 06c30cb
Show file tree
Hide file tree
Showing 8 changed files with 1,003 additions and 73 deletions.
35 changes: 35 additions & 0 deletions docs/tutorial/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,41 @@ Since v`1.3.0`, Sedona natively supports writing GeoParquet file. GeoParquet can
df.write.format("geoparquet").save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet")
```

Since v`1.5.1`, Sedona supports writing GeoParquet files with custom GeoParquet spec version and crs.
The default GeoParquet spec version is `1.0.0` and the default crs is `null`. You can specify the GeoParquet spec version and crs as follows:

```scala
val projjson = "{...}" // PROJJSON string for all geometry columns
df.write.format("geoparquet")
.option("geoparquet.version", "1.0.0")
.option("geoparquet.crs", projjson)
.save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet")
```

If you have multiple geometry columns written to the GeoParquet file, you can specify the CRS for each column.
For example, `g0` and `g1` are two geometry columns in the DataFrame `df`, and you want to specify the CRS for each column as follows:

```scala
val projjson_g0 = "{...}" // PROJJSON string for g0
val projjson_g1 = "{...}" // PROJJSON string for g1
df.write.format("geoparquet")
.option("geoparquet.version", "1.0.0")
.option("geoparquet.crs.g0", projjson_g0)
.option("geoparquet.crs.g1", projjson_g1)
.save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet")
```

The value of `geoparquet.crs` and `geoparquet.crs.<column_name>` can be one of the following:

* `"null"`: Explicitly setting `crs` field to `null`. This is the default behavior.
* `""` (empty string): Omit the `crs` field. This implies that the CRS is [OGC:CRS84](https://www.opengis.net/def/crs/OGC/1.3/CRS84) for CRS-aware implementations.
* `"{...}"` (PROJJSON string): The `crs` field will be set as the PROJJSON object representing the Coordinate Reference System (CRS) of the geometry. You can find the PROJJSON string of a specific CRS from here: https://epsg.io/ (click the JSON option at the bottom of the page). You can also customize your PROJJSON string as needed.

Please note that Sedona currently cannot set/get a projjson string to/from a CRS. Its geoparquet reader will ignore the projjson metadata and you will have to set your CRS via [`ST_SetSRID`](../api/sql/Function.md#st_setsrid) after reading the file.
Its geoparquet writer will not leverage the SRID field of a geometry so you will have to always set the `geoparquet.crs` option manually when writing the file, if you want to write a meaningful CRS field.

Due to the same reason, Sedona geoparquet reader and writer do NOT check the axis order (lon/lat or lat/lon) and assume they are handled by the users themselves when writing / reading the files. You can always use [`ST_FlipCoordinates`](../api/sql/Function.md#st_flipcoordinates) to swap the axis order of your geometries.

## Sort then Save GeoParquet

To maximize the performance of Sedona GeoParquet filter pushdown, we suggest that you sort the data by their geohash values (see [ST_GeoHash](../../api/sql/Function/#st_geohash)) and then save as a GeoParquet file. An example is as follows:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,7 @@
package org.apache.spark.sql.execution.datasources.parquet

import org.json4s.jackson.JsonMethods.parse

/**
* A case class that holds CRS metadata for geometry columns. This class is left empty since CRS
* metadata was not implemented yet.
*/
case class CRSMetaData()
import org.json4s.JValue

/**
* A case class that holds the metadata of geometry column in GeoParquet metadata
Expand All @@ -31,7 +26,7 @@ case class GeometryFieldMetaData(
encoding: String,
geometryTypes: Seq[String],
bbox: Seq[Double],
crs: Option[CRSMetaData] = None)
crs: Option[JValue] = None)

/**
* A case class that holds the metadata of GeoParquet file
Expand All @@ -45,10 +40,20 @@ case class GeoParquetMetaData(
columns: Map[String, GeometryFieldMetaData])

object GeoParquetMetaData {
// We're conforming to version 1.0.0-beta.1 of the GeoParquet specification, please refer to
// https://github.com/opengeospatial/geoparquet/blob/v1.0.0-beta.1/format-specs/geoparquet.md
// for more details.
val VERSION = "1.0.0-beta.1"
// We're conforming to version 1.0.0 of the GeoParquet specification, please refer to
// https://geoparquet.org/releases/v1.0.0/ for more details.
val VERSION = "1.0.0"

/**
* Configuration key for overriding the version field in GeoParquet file metadata.
*/
val GEOPARQUET_VERSION_KEY = "geoparquet.version"

/**
* Configuration key for setting the CRS of the geometries in GeoParquet column metadata. This is applied to
* all geometry columns in the file.
*/
val GEOPARQUET_CRS_KEY = "geoparquet.crs"

def parseKeyValueMetaData(keyValueMetaData: java.util.Map[String, String]): Option[GeoParquetMetaData] = {
Option(keyValueMetaData.get("geo")).map { geo =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION}
import org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types._
import org.json4s.DefaultFormats
import org.json4s.Extraction
import org.json4s.{DefaultFormats, Extraction, JValue}
import org.json4s.jackson.compactJson
import org.json4s.jackson.JsonMethods.parse
import org.locationtech.jts.geom.Geometry
import org.locationtech.jts.io.WKBWriter

Expand Down Expand Up @@ -106,6 +107,10 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
// fields in nested structures.
private val geometryColumnInfoMap: mutable.Map[Int, GeometryColumnInfo] = mutable.Map.empty

private var geoParquetVersion: Option[String] = None
private var defaultGeoParquetCrs: Option[JValue] = None
private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = mutable.Map.empty

override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
this.schema = StructType.fromString(schemaString)
Expand All @@ -129,6 +134,25 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
throw new RuntimeException("No geometry column found in the schema")
}

geoParquetVersion = configuration.get(GEOPARQUET_VERSION_KEY) match {
case null => Some(VERSION)
case version: String => Some(version)
}
defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match {
case null =>
// If no CRS is specified, we write null to the crs metadata field. This is for compatibility with
// geopandas 0.10.0 and earlier versions, which requires crs field to be present.
Some(org.json4s.JNull)
case "" => None
case crs: String => Some(parse(crs))
}
geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
Option(configuration.get(GEOPARQUET_CRS_KEY + "." + name)).foreach {
case "" => geoParquetColumnCrsMap.put(name, None)
case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
}
}

val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
val sparkSqlParquetRowMetadata = GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
val metadata = Map(
Expand Down Expand Up @@ -173,10 +197,11 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
val bbox = if (geometryTypes.nonEmpty) {
Seq(columnInfo.bbox.minX, columnInfo.bbox.minY, columnInfo.bbox.maxX, columnInfo.bbox.maxY)
} else Seq(0.0, 0.0, 0.0, 0.0)
columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox)
val crs = geoParquetColumnCrsMap.getOrElse(columnName, defaultGeoParquetCrs)
columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
}.toMap
val geoParquetMetadata = GeoParquetMetaData(Some(GeoParquetMetaData.VERSION), primaryColumn, columns)
implicit val formats: org.json4s.Formats = DefaultFormats.preservingEmptyValues
val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, primaryColumn, columns)
implicit val formats: org.json4s.Formats = DefaultFormats
val geoParquetMetadataJson = compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys)
metadata.put("geo", geoParquetMetadataJson)
}
Expand Down
Loading

0 comments on commit 06c30cb

Please sign in to comment.