Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-429][SEDONA-430] Support specifying GeoParquet spec version number and CRS #1162

Merged
merged 10 commits into from
Jan 2, 2024
31 changes: 31 additions & 0 deletions docs/tutorial/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,37 @@ Since v`1.3.0`, Sedona natively supports writing GeoParquet file. GeoParquet can
df.write.format("geoparquet").save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet")
```

Since v`1.5.1`, Sedona supports writing GeoParquet files with custom GeoParquet spec version and crs.
The default GeoParquet spec version is `1.0.0` and the default crs is `null`. You can specify the GeoParquet spec version and crs as follows:

```scala
val projjson = "{...}" // PROJJSON string for all geometry columns
df.write.format("geoparquet")
.option("geoparquet.version", "1.0.0")
.option("geoparquet.crs", projjson)
.save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet")
```

If you have multiple geometry columns written to the GeoParquet file, you can specify the CRS for each column.
For example, `g0` and `g1` are two geometry columns in the DataFrame `df`, and you want to specify the CRS for each column as follows:

```scala
val projjson_g0 = "{...}" // PROJJSON string for g0
val projjson_g1 = "{...}" // PROJJSON string for g1
df.write.format("geoparquet")
.option("geoparquet.version", "1.0.0")
.option("geoparquet.crs.g0", projjson_g0)
.option("geoparquet.crs.g1", projjson_g1)
.save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet")
```

You can find the projjson string of a specific CRS from here: https://epsg.io/ (click the JSON option at the bottom of the page). You can also customize your projjson string as needed.

Please note that Sedona currently cannot set/get a projjson string to/from a CRS. Its geoparquet reader will ignore the projjson metadata and you will have to set your CRS via [`ST_SetSRID`](../api/sql/Function.md#st_setsrid) after reading the file.
Its geoparquet writer will not leverage the SRID field of a geometry so you will have to always set the `geoparquet.crs` option manually when writing the file, if you want to write a meaningful CRS field.

Due to the same reason, Sedona geoparquet reader and writer do NOT check the axis order (lon/lat or lat/lon) and assume they are handled by the users themselves when writing / reading the files. You can always use [`ST_FlipCoordinates`](../api/sql/Function.md#st_flipcoordinates) to swap the axis order of your geometries.

## Sort then Save GeoParquet

To maximize the performance of Sedona GeoParquet filter pushdown, we suggest that you sort the data by their geohash values (see [ST_GeoHash](../../api/sql/Function/#st_geohash)) and then save as a GeoParquet file. An example is as follows:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,7 @@
package org.apache.spark.sql.execution.datasources.parquet

Copy link
Member

@jiayuasu jiayuasu Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a new data source to only read the metadata of a parquet file? This is crucial for entry-level users to explore an unknown parquet file including geoparquet. In our geoparquet case, this will help user know the projjson value since we are not able to properly parse it to a known epsg code.

I understand that a Spark DataFrame only allows the schema to be the metadata which cannot be used to hold such information.

So I suggest that we add a new data source namely geoparquet.metadata, which loads these metadata using ParquetFileReader. One good example is from DuckDB: https://duckdb.org/docs/data/parquet/metadata.html

This can be addressed in a separate PR.

Copy link
Member

@jiayuasu jiayuasu Dec 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a JIRA ticket for this: https://issues.apache.org/jira/browse/SEDONA-455

Let's address this in a separate PR.

import org.json4s.jackson.JsonMethods.parse

/**
* A case class that holds CRS metadata for geometry columns. This class is left empty since CRS
* metadata was not implemented yet.
*/
case class CRSMetaData()
import org.json4s.JValue

/**
* A case class that holds the metadata of geometry column in GeoParquet metadata
Expand All @@ -31,7 +26,7 @@ case class GeometryFieldMetaData(
encoding: String,
geometryTypes: Seq[String],
bbox: Seq[Double],
crs: Option[CRSMetaData] = None)
crs: Option[JValue] = None)

/**
* A case class that holds the metadata of GeoParquet file
Expand All @@ -45,10 +40,20 @@ case class GeoParquetMetaData(
columns: Map[String, GeometryFieldMetaData])

object GeoParquetMetaData {
// We're conforming to version 1.0.0-beta.1 of the GeoParquet specification, please refer to
// https://github.com/opengeospatial/geoparquet/blob/v1.0.0-beta.1/format-specs/geoparquet.md
// for more details.
val VERSION = "1.0.0-beta.1"
// We're conforming to version 1.0.0 of the GeoParquet specification, please refer to
// https://geoparquet.org/releases/v1.0.0/ for more details.
val VERSION = "1.0.0"

/**
* Configuration key for overriding the version field in GeoParquet file metadata.
*/
val GEOPARQUET_VERSION_KEY = "geoparquet.version"

/**
* Configuration key for setting the CRS of the geometries in GeoParquet column metadata. This is applied to
* all geometry columns in the file.
*/
val GEOPARQUET_CRS_KEY = "geoparquet.crs"

def parseKeyValueMetaData(keyValueMetaData: java.util.Map[String, String]): Option[GeoParquetMetaData] = {
Option(keyValueMetaData.get("geo")).map { geo =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION}
import org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
import org.apache.spark.sql.types._
import org.json4s.DefaultFormats
import org.json4s.Extraction
import org.json4s.{DefaultFormats, Extraction, JValue}
import org.json4s.jackson.compactJson
import org.json4s.jackson.JsonMethods.parse
import org.locationtech.jts.geom.Geometry
import org.locationtech.jts.io.WKBWriter

Expand Down Expand Up @@ -106,6 +107,10 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
// fields in nested structures.
private val geometryColumnInfoMap: mutable.Map[Int, GeometryColumnInfo] = mutable.Map.empty

private var geoParquetVersion: Option[String] = None
private var defaultGeoParquetCrs: Option[JValue] = None
private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = mutable.Map.empty

override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
this.schema = StructType.fromString(schemaString)
Expand All @@ -129,6 +134,25 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
throw new RuntimeException("No geometry column found in the schema")
}

geoParquetVersion = configuration.get(GEOPARQUET_VERSION_KEY) match {
case null => Some(VERSION)
case version: String => Some(version)
}
defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match {
case null =>
// If no CRS is specified, we write null to the crs metadata field. This is for compatibility with
// geopandas 0.10.0 and earlier versions, which requires crs field to be present.
Some(org.json4s.JNull)
case "" => None
case crs: String => Some(parse(crs))
}
geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
Option(configuration.get(GEOPARQUET_CRS_KEY + "." + name)).foreach {
case "" => geoParquetColumnCrsMap.put(name, None)
case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
}
}

val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
val sparkSqlParquetRowMetadata = GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema)
val metadata = Map(
Expand Down Expand Up @@ -173,10 +197,11 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
val bbox = if (geometryTypes.nonEmpty) {
Seq(columnInfo.bbox.minX, columnInfo.bbox.minY, columnInfo.bbox.maxX, columnInfo.bbox.maxY)
} else Seq(0.0, 0.0, 0.0, 0.0)
columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox)
val crs = geoParquetColumnCrsMap.getOrElse(columnName, defaultGeoParquetCrs)
columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
}.toMap
val geoParquetMetadata = GeoParquetMetaData(Some(GeoParquetMetaData.VERSION), primaryColumn, columns)
implicit val formats: org.json4s.Formats = DefaultFormats.preservingEmptyValues
val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, primaryColumn, columns)
implicit val formats: org.json4s.Formats = DefaultFormats
val geoParquetMetadataJson = compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys)
metadata.put("geo", geoParquetMetadataJson)
}
Expand Down
Loading