diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md index 80ea7334a4..fe952362bf 100644 --- a/docs/tutorial/sql.md +++ b/docs/tutorial/sql.md @@ -653,6 +653,41 @@ Since v`1.3.0`, Sedona natively supports writing GeoParquet file. GeoParquet can df.write.format("geoparquet").save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet") ``` +Since v`1.5.1`, Sedona supports writing GeoParquet files with custom GeoParquet spec version and crs. +The default GeoParquet spec version is `1.0.0` and the default crs is `null`. You can specify the GeoParquet spec version and crs as follows: + +```scala +val projjson = "{...}" // PROJJSON string for all geometry columns +df.write.format("geoparquet") + .option("geoparquet.version", "1.0.0") + .option("geoparquet.crs", projjson) + .save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet") +``` + +If you have multiple geometry columns written to the GeoParquet file, you can specify the CRS for each column. +For example, `g0` and `g1` are two geometry columns in the DataFrame `df`, and you want to specify the CRS for each column as follows: + +```scala +val projjson_g0 = "{...}" // PROJJSON string for g0 +val projjson_g1 = "{...}" // PROJJSON string for g1 +df.write.format("geoparquet") + .option("geoparquet.version", "1.0.0") + .option("geoparquet.crs.g0", projjson_g0) + .option("geoparquet.crs.g1", projjson_g1) + .save(geoparquetoutputlocation + "/GeoParquet_File_Name.parquet") +``` + +The value of `geoparquet.crs` and `geoparquet.crs.` can be one of the following: + +* `"null"`: Explicitly setting `crs` field to `null`. This is the default behavior. +* `""` (empty string): Omit the `crs` field. This implies that the CRS is [OGC:CRS84](https://www.opengis.net/def/crs/OGC/1.3/CRS84) for CRS-aware implementations. +* `"{...}"` (PROJJSON string): The `crs` field will be set as the PROJJSON object representing the Coordinate Reference System (CRS) of the geometry. You can find the PROJJSON string of a specific CRS from here: https://epsg.io/ (click the JSON option at the bottom of the page). You can also customize your PROJJSON string as needed. + +Please note that Sedona currently cannot set/get a projjson string to/from a CRS. Its geoparquet reader will ignore the projjson metadata and you will have to set your CRS via [`ST_SetSRID`](../api/sql/Function.md#st_setsrid) after reading the file. +Its geoparquet writer will not leverage the SRID field of a geometry so you will have to always set the `geoparquet.crs` option manually when writing the file, if you want to write a meaningful CRS field. + +Due to the same reason, Sedona geoparquet reader and writer do NOT check the axis order (lon/lat or lat/lon) and assume they are handled by the users themselves when writing / reading the files. You can always use [`ST_FlipCoordinates`](../api/sql/Function.md#st_flipcoordinates) to swap the axis order of your geometries. + ## Sort then Save GeoParquet To maximize the performance of Sedona GeoParquet filter pushdown, we suggest that you sort the data by their geohash values (see [ST_GeoHash](../../api/sql/Function/#st_geohash)) and then save as a GeoParquet file. An example is as follows: diff --git a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala index a3ea9ec42c..a3f23b9c4b 100644 --- a/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala +++ b/spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetMetaData.scala @@ -14,12 +14,7 @@ package org.apache.spark.sql.execution.datasources.parquet import org.json4s.jackson.JsonMethods.parse - -/** - * A case class that holds CRS metadata for geometry columns. This class is left empty since CRS - * metadata was not implemented yet. - */ -case class CRSMetaData() +import org.json4s.JValue /** * A case class that holds the metadata of geometry column in GeoParquet metadata @@ -31,7 +26,7 @@ case class GeometryFieldMetaData( encoding: String, geometryTypes: Seq[String], bbox: Seq[Double], - crs: Option[CRSMetaData] = None) + crs: Option[JValue] = None) /** * A case class that holds the metadata of GeoParquet file @@ -45,10 +40,20 @@ case class GeoParquetMetaData( columns: Map[String, GeometryFieldMetaData]) object GeoParquetMetaData { - // We're conforming to version 1.0.0-beta.1 of the GeoParquet specification, please refer to - // https://github.com/opengeospatial/geoparquet/blob/v1.0.0-beta.1/format-specs/geoparquet.md - // for more details. - val VERSION = "1.0.0-beta.1" + // We're conforming to version 1.0.0 of the GeoParquet specification, please refer to + // https://geoparquet.org/releases/v1.0.0/ for more details. + val VERSION = "1.0.0" + + /** + * Configuration key for overriding the version field in GeoParquet file metadata. + */ + val GEOPARQUET_VERSION_KEY = "geoparquet.version" + + /** + * Configuration key for setting the CRS of the geometries in GeoParquet column metadata. This is applied to + * all geometry columns in the file. + */ + val GEOPARQUET_CRS_KEY = "geoparquet.crs" def parseKeyValueMetaData(keyValueMetaData: java.util.Map[String, String]): Option[GeoParquetMetaData] = { Option(keyValueMetaData.get("geo")).map { geo => diff --git a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala index 3ab3fd1c83..2577937a5b 100644 --- a/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala +++ b/spark/spark-3.0/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala @@ -30,14 +30,15 @@ import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION} import org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.types._ -import org.json4s.DefaultFormats -import org.json4s.Extraction +import org.json4s.{DefaultFormats, Extraction, JValue} import org.json4s.jackson.compactJson +import org.json4s.jackson.JsonMethods.parse import org.locationtech.jts.geom.Geometry import org.locationtech.jts.io.WKBWriter @@ -106,6 +107,10 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { // fields in nested structures. private val geometryColumnInfoMap: mutable.Map[Int, GeometryColumnInfo] = mutable.Map.empty + private var geoParquetVersion: Option[String] = None + private var defaultGeoParquetCrs: Option[JValue] = None + private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = mutable.Map.empty + override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) this.schema = StructType.fromString(schemaString) @@ -129,6 +134,25 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { throw new RuntimeException("No geometry column found in the schema") } + geoParquetVersion = configuration.get(GEOPARQUET_VERSION_KEY) match { + case null => Some(VERSION) + case version: String => Some(version) + } + defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match { + case null => + // If no CRS is specified, we write null to the crs metadata field. This is for compatibility with + // geopandas 0.10.0 and earlier versions, which requires crs field to be present. + Some(org.json4s.JNull) + case "" => None + case crs: String => Some(parse(crs)) + } + geometryColumnInfoMap.keys.map(schema(_).name).foreach { name => + Option(configuration.get(GEOPARQUET_CRS_KEY + "." + name)).foreach { + case "" => geoParquetColumnCrsMap.put(name, None) + case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs))) + } + } + val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema) val sparkSqlParquetRowMetadata = GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema) val metadata = Map( @@ -173,10 +197,11 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { val bbox = if (geometryTypes.nonEmpty) { Seq(columnInfo.bbox.minX, columnInfo.bbox.minY, columnInfo.bbox.maxX, columnInfo.bbox.maxY) } else Seq(0.0, 0.0, 0.0, 0.0) - columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox) + val crs = geoParquetColumnCrsMap.getOrElse(columnName, defaultGeoParquetCrs) + columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs) }.toMap - val geoParquetMetadata = GeoParquetMetaData(Some(GeoParquetMetaData.VERSION), primaryColumn, columns) - implicit val formats: org.json4s.Formats = DefaultFormats.preservingEmptyValues + val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, primaryColumn, columns) + implicit val formats: org.json4s.Formats = DefaultFormats val geoParquetMetadataJson = compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys) metadata.put("geo", geoParquetMetadataJson) } diff --git a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala index 64d35a6cb4..7b209e5429 100644 --- a/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala +++ b/spark/spark-3.0/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT @@ -142,18 +143,18 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { df.write.format("geoparquet").save(geoParquetSavePath) // Find parquet files in geoParquetSavePath directory and validate their metadata - val parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet")) - parquetFiles.foreach { filePath => - val metadata = ParquetFileReader.open( - HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration())) - .getFooter.getFileMetaData.getKeyValueMetaData - assert(metadata.containsKey("geo")) - val geo = parseJson(metadata.get("geo")) + validateGeoParquetMetadata(geoParquetSavePath) { geo => implicit val formats : org.json4s.Formats = org.json4s.DefaultFormats + val version = (geo \ "version").extract[String] + assert(version == GeoParquetMetaData.VERSION) val g0Types = (geo \ "columns" \ "g0" \ "geometry_types").extract[Seq[String]] val g1Types = (geo \ "columns" \ "g1" \ "geometry_types").extract[Seq[String]] assert(g0Types.sorted == Seq("Point", "Point Z", "MultiPoint").sorted) assert(g1Types.sorted == Seq("Polygon", "Polygon Z", "MultiLineString").sorted) + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == org.json4s.JNull) + assert(g1Crs == org.json4s.JNull) } // Read GeoParquet with multiple geometry columns @@ -178,13 +179,7 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { assert(df2.schema.fields(1).dataType.isInstanceOf[GeometryUDT]) assert(0 == df2.count()) - val parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet")) - parquetFiles.foreach { filePath => - val metadata = ParquetFileReader.open( - HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration())) - .getFooter.getFileMetaData.getKeyValueMetaData - assert(metadata.containsKey("geo")) - val geo = parseJson(metadata.get("geo")) + validateGeoParquetMetadata(geoParquetSavePath) { geo => implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats val g0Types = (geo \ "columns" \ "g" \ "geometry_types").extract[Seq[String]] val g0BBox = (geo \ "columns" \ "g" \ "bbox").extract[Seq[Double]] @@ -193,6 +188,272 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { } } + it("GeoParquet save should write user specified version and crs to geo metadata") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation4) + // This CRS is taken from https://proj.org/en/9.3/specifications/projjson.html#geographiccrs + // with slight modification. + val projjson = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "NAD83(2011)", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "NAD83 (National Spatial Reference System 2011)", + | "ellipsoid": { + | "name": "GRS 1980", + | "semi_major_axis": 6378137, + | "inverse_flattening": 298.257222101 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | }, + | "scope": "Horizontal component of 3D system.", + | "area": "Puerto Rico - onshore and offshore. United States (USA) onshore and offshore.", + | "bbox": { + | "south_latitude": 14.92, + | "west_longitude": 167.65, + | "north_latitude": 74.71, + | "east_longitude": -63.88 + | }, + | "id": { + | "authority": "EPSG", + | "code": 6318 + | } + |} + |""".stripMargin + var geoParquetSavePath = geoparquetoutputlocation + "/gp_custom_meta.parquet" + df.write.format("geoparquet") + .option("geoparquet.version", "10.9.8") + .option("geoparquet.crs", projjson) + .mode("overwrite").save(geoParquetSavePath) + val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + assert(df2.count() == df.count()) + + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val version = (geo \ "version").extract[String] + val columnName = (geo \ "primary_column").extract[String] + assert(version == "10.9.8") + val crs = geo \ "columns" \ columnName \ "crs" + assert(crs.isInstanceOf[org.json4s.JObject]) + assert(crs == parseJson(projjson)) + } + + // Setting crs to null explicitly + geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_null.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "null") + .mode("overwrite").save(geoParquetSavePath) + val df3 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + assert(df3.count() == df.count()) + + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val columnName = (geo \ "primary_column").extract[String] + val crs = geo \ "columns" \ columnName \ "crs" + assert(crs == org.json4s.JNull) + } + + // Setting crs to "" to omit crs + geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_omit.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "") + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val columnName = (geo \ "primary_column").extract[String] + val crs = geo \ "columns" \ columnName \ "crs" + assert(crs == org.json4s.JNothing) + } + } + + it("GeoParquet save should support specifying per-column CRS") { + val wktReader = new WKTReader() + val testData = Seq( + Row(1, wktReader.read("POINT (1 2)"), wktReader.read("POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))")) + ) + val schema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("g0", GeometryUDT, nullable = false), + StructField("g1", GeometryUDT, nullable = false) + )) + val df = sparkSession.createDataFrame(testData.asJava, schema).repartition(1) + + val projjson0 = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "NAD83(2011)", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "NAD83 (National Spatial Reference System 2011)", + | "ellipsoid": { + | "name": "GRS 1980", + | "semi_major_axis": 6378137, + | "inverse_flattening": 298.257222101 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | }, + | "scope": "Horizontal component of 3D system.", + | "area": "Puerto Rico - onshore and offshore. United States (USA) onshore and offshore.", + | "bbox": { + | "south_latitude": 14.92, + | "west_longitude": 167.65, + | "north_latitude": 74.71, + | "east_longitude": -63.88 + | }, + | "id": { + | "authority": "EPSG", + | "code": 6318 + | } + |} + |""".stripMargin + + val projjson1 = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "Monte Mario (Rome)", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "Monte Mario (Rome)", + | "ellipsoid": { + | "name": "International 1924", + | "semi_major_axis": 6378388, + | "inverse_flattening": 297 + | }, + | "prime_meridian": { + | "name": "Rome", + | "longitude": 12.4523333333333 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | }, + | "scope": "Geodesy, onshore minerals management.", + | "area": "Italy - onshore and offshore; San Marino, Vatican City State.", + | "bbox": { + | "south_latitude": 34.76, + | "west_longitude": 5.93, + | "north_latitude": 47.1, + | "east_longitude": 18.99 + | }, + | "id": { + | "authority": "EPSG", + | "code": 4806 + | } + |} + |""".stripMargin + + val geoParquetSavePath = geoparquetoutputlocation + "/multi_geoms_with_custom_crs.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", projjson0) + .option("geoparquet.crs.g1", projjson1) + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == parseJson(projjson0)) + assert(g1Crs == parseJson(projjson1)) + } + + // Write without fallback CRS for g0 + df.write.format("geoparquet") + .option("geoparquet.crs.g1", projjson1) + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == org.json4s.JNull) + assert(g1Crs == parseJson(projjson1)) + } + + // Fallback CRS is omitting CRS + df.write.format("geoparquet") + .option("geoparquet.crs", "") + .option("geoparquet.crs.g1", projjson1) + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == org.json4s.JNothing) + assert(g1Crs == parseJson(projjson1)) + } + + // Write with CRS, explicitly set CRS to null for g1 + df.write.format("geoparquet") + .option("geoparquet.crs", projjson0) + .option("geoparquet.crs.g1", "null") + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == parseJson(projjson0)) + assert(g1Crs == org.json4s.JNull) + } + + // Write with CRS, explicitly omit CRS for g1 + df.write.format("geoparquet") + .option("geoparquet.crs", projjson0) + .option("geoparquet.crs.g1", "") + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == parseJson(projjson0)) + assert(g1Crs == org.json4s.JNothing) + } + } + it("GeoParquet load should raise exception when loading plain parquet files") { val e = intercept[SparkException] { sparkSession.read.format("geoparquet").load(resourceFolder + "geoparquet/plain.parquet") @@ -250,4 +511,16 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { } } } + + def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => Unit): Unit = { + val parquetFiles = new File(path).listFiles().filter(_.getName.endsWith(".parquet")) + parquetFiles.foreach { filePath => + val metadata = ParquetFileReader.open( + HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration())) + .getFooter.getFileMetaData.getKeyValueMetaData + assert(metadata.containsKey("geo")) + val geo = parseJson(metadata.get("geo")) + body(geo) + } + } } diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala index 186ebe09b5..984edb5c0a 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala @@ -30,14 +30,15 @@ import org.apache.spark.sql.SPARK_VERSION_METADATA_KEY import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION} import org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.types._ -import org.json4s.DefaultFormats -import org.json4s.Extraction +import org.json4s.{DefaultFormats, Extraction, JValue} import org.json4s.jackson.compactJson +import org.json4s.jackson.JsonMethods.parse import org.locationtech.jts.geom.Geometry import org.locationtech.jts.io.WKBWriter @@ -106,6 +107,10 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { // fields in nested structures. private val geometryColumnInfoMap: mutable.Map[Int, GeometryColumnInfo] = mutable.Map.empty + private var geoParquetVersion: Option[String] = None + private var defaultGeoParquetCrs: Option[JValue] = None + private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = mutable.Map.empty + override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) this.schema = StructType.fromString(schemaString) @@ -129,6 +134,25 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { throw new RuntimeException("No geometry column found in the schema") } + geoParquetVersion = configuration.get(GEOPARQUET_VERSION_KEY) match { + case null => Some(VERSION) + case version: String => Some(version) + } + defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match { + case null => + // If no CRS is specified, we write null to the crs metadata field. This is for compatibility with + // geopandas 0.10.0 and earlier versions, which requires crs field to be present. + Some(org.json4s.JNull) + case "" => None + case crs: String => Some(parse(crs)) + } + geometryColumnInfoMap.keys.map(schema(_).name).foreach { name => + Option(configuration.get(GEOPARQUET_CRS_KEY + "." + name)).foreach { + case "" => geoParquetColumnCrsMap.put(name, None) + case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs))) + } + } + val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema) val sparkSqlParquetRowMetadata = GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema) val metadata = Map( @@ -173,10 +197,11 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { val bbox = if (geometryTypes.nonEmpty) { Seq(columnInfo.bbox.minX, columnInfo.bbox.minY, columnInfo.bbox.maxX, columnInfo.bbox.maxY) } else Seq(0.0, 0.0, 0.0, 0.0) - columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox) + val crs = geoParquetColumnCrsMap.getOrElse(columnName, defaultGeoParquetCrs) + columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs) }.toMap - val geoParquetMetadata = GeoParquetMetaData(Some(GeoParquetMetaData.VERSION), primaryColumn, columns) - implicit val formats: org.json4s.Formats = DefaultFormats.preservingEmptyValues + val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, primaryColumn, columns) + implicit val formats: org.json4s.Formats = DefaultFormats val geoParquetMetadataJson = compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys) metadata.put("geo", geoParquetMetadataJson) } diff --git a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala index 6a9db1f1fd..7b209e5429 100644 --- a/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala +++ b/spark/spark-3.4/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala @@ -27,8 +27,9 @@ import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode -import org.apache.spark.sql.functions.{col, expr} +import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport +import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.sedona_sql.expressions.st_constructors.{ST_Point, ST_PolygonFromEnvelope} import org.apache.spark.sql.sedona_sql.expressions.st_predicates.ST_Intersects @@ -142,18 +143,18 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { df.write.format("geoparquet").save(geoParquetSavePath) // Find parquet files in geoParquetSavePath directory and validate their metadata - val parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet")) - parquetFiles.foreach { filePath => - val metadata = ParquetFileReader.open( - HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration())) - .getFooter.getFileMetaData.getKeyValueMetaData - assert(metadata.containsKey("geo")) - val geo = parseJson(metadata.get("geo")) + validateGeoParquetMetadata(geoParquetSavePath) { geo => implicit val formats : org.json4s.Formats = org.json4s.DefaultFormats + val version = (geo \ "version").extract[String] + assert(version == GeoParquetMetaData.VERSION) val g0Types = (geo \ "columns" \ "g0" \ "geometry_types").extract[Seq[String]] val g1Types = (geo \ "columns" \ "g1" \ "geometry_types").extract[Seq[String]] assert(g0Types.sorted == Seq("Point", "Point Z", "MultiPoint").sorted) assert(g1Types.sorted == Seq("Polygon", "Polygon Z", "MultiLineString").sorted) + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == org.json4s.JNull) + assert(g1Crs == org.json4s.JNull) } // Read GeoParquet with multiple geometry columns @@ -178,13 +179,7 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { assert(df2.schema.fields(1).dataType.isInstanceOf[GeometryUDT]) assert(0 == df2.count()) - val parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet")) - parquetFiles.foreach { filePath => - val metadata = ParquetFileReader.open( - HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration())) - .getFooter.getFileMetaData.getKeyValueMetaData - assert(metadata.containsKey("geo")) - val geo = parseJson(metadata.get("geo")) + validateGeoParquetMetadata(geoParquetSavePath) { geo => implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats val g0Types = (geo \ "columns" \ "g" \ "geometry_types").extract[Seq[String]] val g0BBox = (geo \ "columns" \ "g" \ "bbox").extract[Seq[Double]] @@ -193,6 +188,272 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { } } + it("GeoParquet save should write user specified version and crs to geo metadata") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation4) + // This CRS is taken from https://proj.org/en/9.3/specifications/projjson.html#geographiccrs + // with slight modification. + val projjson = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "NAD83(2011)", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "NAD83 (National Spatial Reference System 2011)", + | "ellipsoid": { + | "name": "GRS 1980", + | "semi_major_axis": 6378137, + | "inverse_flattening": 298.257222101 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | }, + | "scope": "Horizontal component of 3D system.", + | "area": "Puerto Rico - onshore and offshore. United States (USA) onshore and offshore.", + | "bbox": { + | "south_latitude": 14.92, + | "west_longitude": 167.65, + | "north_latitude": 74.71, + | "east_longitude": -63.88 + | }, + | "id": { + | "authority": "EPSG", + | "code": 6318 + | } + |} + |""".stripMargin + var geoParquetSavePath = geoparquetoutputlocation + "/gp_custom_meta.parquet" + df.write.format("geoparquet") + .option("geoparquet.version", "10.9.8") + .option("geoparquet.crs", projjson) + .mode("overwrite").save(geoParquetSavePath) + val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + assert(df2.count() == df.count()) + + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val version = (geo \ "version").extract[String] + val columnName = (geo \ "primary_column").extract[String] + assert(version == "10.9.8") + val crs = geo \ "columns" \ columnName \ "crs" + assert(crs.isInstanceOf[org.json4s.JObject]) + assert(crs == parseJson(projjson)) + } + + // Setting crs to null explicitly + geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_null.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "null") + .mode("overwrite").save(geoParquetSavePath) + val df3 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + assert(df3.count() == df.count()) + + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val columnName = (geo \ "primary_column").extract[String] + val crs = geo \ "columns" \ columnName \ "crs" + assert(crs == org.json4s.JNull) + } + + // Setting crs to "" to omit crs + geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_omit.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "") + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val columnName = (geo \ "primary_column").extract[String] + val crs = geo \ "columns" \ columnName \ "crs" + assert(crs == org.json4s.JNothing) + } + } + + it("GeoParquet save should support specifying per-column CRS") { + val wktReader = new WKTReader() + val testData = Seq( + Row(1, wktReader.read("POINT (1 2)"), wktReader.read("POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))")) + ) + val schema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("g0", GeometryUDT, nullable = false), + StructField("g1", GeometryUDT, nullable = false) + )) + val df = sparkSession.createDataFrame(testData.asJava, schema).repartition(1) + + val projjson0 = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "NAD83(2011)", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "NAD83 (National Spatial Reference System 2011)", + | "ellipsoid": { + | "name": "GRS 1980", + | "semi_major_axis": 6378137, + | "inverse_flattening": 298.257222101 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | }, + | "scope": "Horizontal component of 3D system.", + | "area": "Puerto Rico - onshore and offshore. United States (USA) onshore and offshore.", + | "bbox": { + | "south_latitude": 14.92, + | "west_longitude": 167.65, + | "north_latitude": 74.71, + | "east_longitude": -63.88 + | }, + | "id": { + | "authority": "EPSG", + | "code": 6318 + | } + |} + |""".stripMargin + + val projjson1 = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "Monte Mario (Rome)", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "Monte Mario (Rome)", + | "ellipsoid": { + | "name": "International 1924", + | "semi_major_axis": 6378388, + | "inverse_flattening": 297 + | }, + | "prime_meridian": { + | "name": "Rome", + | "longitude": 12.4523333333333 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | }, + | "scope": "Geodesy, onshore minerals management.", + | "area": "Italy - onshore and offshore; San Marino, Vatican City State.", + | "bbox": { + | "south_latitude": 34.76, + | "west_longitude": 5.93, + | "north_latitude": 47.1, + | "east_longitude": 18.99 + | }, + | "id": { + | "authority": "EPSG", + | "code": 4806 + | } + |} + |""".stripMargin + + val geoParquetSavePath = geoparquetoutputlocation + "/multi_geoms_with_custom_crs.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", projjson0) + .option("geoparquet.crs.g1", projjson1) + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == parseJson(projjson0)) + assert(g1Crs == parseJson(projjson1)) + } + + // Write without fallback CRS for g0 + df.write.format("geoparquet") + .option("geoparquet.crs.g1", projjson1) + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == org.json4s.JNull) + assert(g1Crs == parseJson(projjson1)) + } + + // Fallback CRS is omitting CRS + df.write.format("geoparquet") + .option("geoparquet.crs", "") + .option("geoparquet.crs.g1", projjson1) + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == org.json4s.JNothing) + assert(g1Crs == parseJson(projjson1)) + } + + // Write with CRS, explicitly set CRS to null for g1 + df.write.format("geoparquet") + .option("geoparquet.crs", projjson0) + .option("geoparquet.crs.g1", "null") + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == parseJson(projjson0)) + assert(g1Crs == org.json4s.JNull) + } + + // Write with CRS, explicitly omit CRS for g1 + df.write.format("geoparquet") + .option("geoparquet.crs", projjson0) + .option("geoparquet.crs.g1", "") + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == parseJson(projjson0)) + assert(g1Crs == org.json4s.JNothing) + } + } + it("GeoParquet load should raise exception when loading plain parquet files") { val e = intercept[SparkException] { sparkSession.read.format("geoparquet").load(resourceFolder + "geoparquet/plain.parquet") @@ -250,4 +511,16 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { } } } + + def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => Unit): Unit = { + val parquetFiles = new File(path).listFiles().filter(_.getName.endsWith(".parquet")) + parquetFiles.foreach { filePath => + val metadata = ParquetFileReader.open( + HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration())) + .getFooter.getFileMetaData.getKeyValueMetaData + assert(metadata.containsKey("geo")) + val geo = parseJson(metadata.get("geo")) + body(geo) + } + } } diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala index 55fdcc275f..aa81f87255 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/GeoParquetWriteSupport.scala @@ -31,13 +31,14 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.SpecializedGetters import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.DataSourceUtils +import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData.{GEOPARQUET_CRS_KEY, GEOPARQUET_VERSION_KEY, VERSION} import org.apache.spark.sql.execution.datasources.parquet.GeoParquetWriteSupport.GeometryColumnInfo import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT import org.apache.spark.sql.types._ -import org.json4s.DefaultFormats -import org.json4s.Extraction +import org.json4s.{DefaultFormats, Extraction, JValue} import org.json4s.jackson.compactJson +import org.json4s.jackson.JsonMethods.parse import org.locationtech.jts.geom.Geometry import org.locationtech.jts.io.WKBWriter @@ -106,6 +107,10 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { // fields in nested structures. private val geometryColumnInfoMap: mutable.Map[Int, GeometryColumnInfo] = mutable.Map.empty + private var geoParquetVersion: Option[String] = None + private var defaultGeoParquetCrs: Option[JValue] = None + private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = mutable.Map.empty + override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) this.schema = StructType.fromString(schemaString) @@ -129,6 +134,25 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { throw new RuntimeException("No geometry column found in the schema") } + geoParquetVersion = configuration.get(GEOPARQUET_VERSION_KEY) match { + case null => Some(VERSION) + case version: String => Some(version) + } + defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match { + case null => + // If no CRS is specified, we write null to the crs metadata field. This is for compatibility with + // geopandas 0.10.0 and earlier versions, which requires crs field to be present. + Some(org.json4s.JNull) + case "" => None + case crs: String => Some(parse(crs)) + } + geometryColumnInfoMap.keys.map(schema(_).name).foreach { name => + Option(configuration.get(GEOPARQUET_CRS_KEY + "." + name)).foreach { + case "" => geoParquetColumnCrsMap.put(name, None) + case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs))) + } + } + val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema) val sparkSqlParquetRowMetadata = GeoParquetWriteSupport.getSparkSqlParquetRowMetadata(schema) val metadata = Map( @@ -173,10 +197,11 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging { val bbox = if (geometryTypes.nonEmpty) { Seq(columnInfo.bbox.minX, columnInfo.bbox.minY, columnInfo.bbox.maxX, columnInfo.bbox.maxY) } else Seq(0.0, 0.0, 0.0, 0.0) - columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox) + val crs = geoParquetColumnCrsMap.getOrElse(columnName, defaultGeoParquetCrs) + columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs) }.toMap - val geoParquetMetadata = GeoParquetMetaData(Some(GeoParquetMetaData.VERSION), primaryColumn, columns) - implicit val formats: org.json4s.Formats = DefaultFormats.preservingEmptyValues + val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, primaryColumn, columns) + implicit val formats: org.json4s.Formats = DefaultFormats val geoParquetMetadataJson = compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys) metadata.put("geo", geoParquetMetadataJson) } diff --git a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala index 37cfae9866..7b209e5429 100644 --- a/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala +++ b/spark/spark-3.5/src/test/scala/org/apache/sedona/sql/geoparquetIOTests.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.execution.datasources.parquet.GeoParquetMetaData import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport import org.apache.spark.sql.functions.{col, expr} import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT @@ -142,18 +143,18 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { df.write.format("geoparquet").save(geoParquetSavePath) // Find parquet files in geoParquetSavePath directory and validate their metadata - val parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet")) - parquetFiles.foreach { filePath => - val metadata = ParquetFileReader.open( - HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration())) - .getFooter.getFileMetaData.getKeyValueMetaData - assert(metadata.containsKey("geo")) - val geo = parseJson(metadata.get("geo")) + validateGeoParquetMetadata(geoParquetSavePath) { geo => implicit val formats : org.json4s.Formats = org.json4s.DefaultFormats + val version = (geo \ "version").extract[String] + assert(version == GeoParquetMetaData.VERSION) val g0Types = (geo \ "columns" \ "g0" \ "geometry_types").extract[Seq[String]] val g1Types = (geo \ "columns" \ "g1" \ "geometry_types").extract[Seq[String]] assert(g0Types.sorted == Seq("Point", "Point Z", "MultiPoint").sorted) assert(g1Types.sorted == Seq("Polygon", "Polygon Z", "MultiLineString").sorted) + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == org.json4s.JNull) + assert(g1Crs == org.json4s.JNull) } // Read GeoParquet with multiple geometry columns @@ -178,13 +179,7 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { assert(df2.schema.fields(1).dataType.isInstanceOf[GeometryUDT]) assert(0 == df2.count()) - val parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet")) - parquetFiles.foreach { filePath => - val metadata = ParquetFileReader.open( - HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration())) - .getFooter.getFileMetaData.getKeyValueMetaData - assert(metadata.containsKey("geo")) - val geo = parseJson(metadata.get("geo")) + validateGeoParquetMetadata(geoParquetSavePath) { geo => implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats val g0Types = (geo \ "columns" \ "g" \ "geometry_types").extract[Seq[String]] val g0BBox = (geo \ "columns" \ "g" \ "bbox").extract[Seq[Double]] @@ -193,6 +188,272 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { } } + it("GeoParquet save should write user specified version and crs to geo metadata") { + val df = sparkSession.read.format("geoparquet").load(geoparquetdatalocation4) + // This CRS is taken from https://proj.org/en/9.3/specifications/projjson.html#geographiccrs + // with slight modification. + val projjson = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "NAD83(2011)", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "NAD83 (National Spatial Reference System 2011)", + | "ellipsoid": { + | "name": "GRS 1980", + | "semi_major_axis": 6378137, + | "inverse_flattening": 298.257222101 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | }, + | "scope": "Horizontal component of 3D system.", + | "area": "Puerto Rico - onshore and offshore. United States (USA) onshore and offshore.", + | "bbox": { + | "south_latitude": 14.92, + | "west_longitude": 167.65, + | "north_latitude": 74.71, + | "east_longitude": -63.88 + | }, + | "id": { + | "authority": "EPSG", + | "code": 6318 + | } + |} + |""".stripMargin + var geoParquetSavePath = geoparquetoutputlocation + "/gp_custom_meta.parquet" + df.write.format("geoparquet") + .option("geoparquet.version", "10.9.8") + .option("geoparquet.crs", projjson) + .mode("overwrite").save(geoParquetSavePath) + val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + assert(df2.count() == df.count()) + + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val version = (geo \ "version").extract[String] + val columnName = (geo \ "primary_column").extract[String] + assert(version == "10.9.8") + val crs = geo \ "columns" \ columnName \ "crs" + assert(crs.isInstanceOf[org.json4s.JObject]) + assert(crs == parseJson(projjson)) + } + + // Setting crs to null explicitly + geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_null.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "null") + .mode("overwrite").save(geoParquetSavePath) + val df3 = sparkSession.read.format("geoparquet").load(geoParquetSavePath) + assert(df3.count() == df.count()) + + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val columnName = (geo \ "primary_column").extract[String] + val crs = geo \ "columns" \ columnName \ "crs" + assert(crs == org.json4s.JNull) + } + + // Setting crs to "" to omit crs + geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_omit.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", "") + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats + val columnName = (geo \ "primary_column").extract[String] + val crs = geo \ "columns" \ columnName \ "crs" + assert(crs == org.json4s.JNothing) + } + } + + it("GeoParquet save should support specifying per-column CRS") { + val wktReader = new WKTReader() + val testData = Seq( + Row(1, wktReader.read("POINT (1 2)"), wktReader.read("POLYGON ((0 0, 1 0, 1 1, 0 1, 0 0))")) + ) + val schema = StructType(Seq( + StructField("id", IntegerType, nullable = false), + StructField("g0", GeometryUDT, nullable = false), + StructField("g1", GeometryUDT, nullable = false) + )) + val df = sparkSession.createDataFrame(testData.asJava, schema).repartition(1) + + val projjson0 = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "NAD83(2011)", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "NAD83 (National Spatial Reference System 2011)", + | "ellipsoid": { + | "name": "GRS 1980", + | "semi_major_axis": 6378137, + | "inverse_flattening": 298.257222101 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | }, + | "scope": "Horizontal component of 3D system.", + | "area": "Puerto Rico - onshore and offshore. United States (USA) onshore and offshore.", + | "bbox": { + | "south_latitude": 14.92, + | "west_longitude": 167.65, + | "north_latitude": 74.71, + | "east_longitude": -63.88 + | }, + | "id": { + | "authority": "EPSG", + | "code": 6318 + | } + |} + |""".stripMargin + + val projjson1 = + """ + |{ + | "$schema": "https://proj.org/schemas/v0.4/projjson.schema.json", + | "type": "GeographicCRS", + | "name": "Monte Mario (Rome)", + | "datum": { + | "type": "GeodeticReferenceFrame", + | "name": "Monte Mario (Rome)", + | "ellipsoid": { + | "name": "International 1924", + | "semi_major_axis": 6378388, + | "inverse_flattening": 297 + | }, + | "prime_meridian": { + | "name": "Rome", + | "longitude": 12.4523333333333 + | } + | }, + | "coordinate_system": { + | "subtype": "ellipsoidal", + | "axis": [ + | { + | "name": "Geodetic latitude", + | "abbreviation": "Lat", + | "direction": "north", + | "unit": "degree" + | }, + | { + | "name": "Geodetic longitude", + | "abbreviation": "Lon", + | "direction": "east", + | "unit": "degree" + | } + | ] + | }, + | "scope": "Geodesy, onshore minerals management.", + | "area": "Italy - onshore and offshore; San Marino, Vatican City State.", + | "bbox": { + | "south_latitude": 34.76, + | "west_longitude": 5.93, + | "north_latitude": 47.1, + | "east_longitude": 18.99 + | }, + | "id": { + | "authority": "EPSG", + | "code": 4806 + | } + |} + |""".stripMargin + + val geoParquetSavePath = geoparquetoutputlocation + "/multi_geoms_with_custom_crs.parquet" + df.write.format("geoparquet") + .option("geoparquet.crs", projjson0) + .option("geoparquet.crs.g1", projjson1) + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == parseJson(projjson0)) + assert(g1Crs == parseJson(projjson1)) + } + + // Write without fallback CRS for g0 + df.write.format("geoparquet") + .option("geoparquet.crs.g1", projjson1) + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == org.json4s.JNull) + assert(g1Crs == parseJson(projjson1)) + } + + // Fallback CRS is omitting CRS + df.write.format("geoparquet") + .option("geoparquet.crs", "") + .option("geoparquet.crs.g1", projjson1) + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == org.json4s.JNothing) + assert(g1Crs == parseJson(projjson1)) + } + + // Write with CRS, explicitly set CRS to null for g1 + df.write.format("geoparquet") + .option("geoparquet.crs", projjson0) + .option("geoparquet.crs.g1", "null") + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == parseJson(projjson0)) + assert(g1Crs == org.json4s.JNull) + } + + // Write with CRS, explicitly omit CRS for g1 + df.write.format("geoparquet") + .option("geoparquet.crs", projjson0) + .option("geoparquet.crs.g1", "") + .mode("overwrite").save(geoParquetSavePath) + validateGeoParquetMetadata(geoParquetSavePath) { geo => + val g0Crs = geo \ "columns" \ "g0" \ "crs" + val g1Crs = geo \ "columns" \ "g1" \ "crs" + assert(g0Crs == parseJson(projjson0)) + assert(g1Crs == org.json4s.JNothing) + } + } + it("GeoParquet load should raise exception when loading plain parquet files") { val e = intercept[SparkException] { sparkSession.read.format("geoparquet").load(resourceFolder + "geoparquet/plain.parquet") @@ -205,10 +466,6 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { val rows = df.where(ST_Intersects(ST_Point(35.174722, -6.552465), col("geometry"))).collect() assert(rows.length == 1) assert(rows(0).getAs[String]("name") == "Tanzania") - - // Skip reading the file when point is outside the bounding box - val rows2 = df.where(ST_Intersects(ST_Point(100, 200), col("geometry"))).collect() - assert(rows2.isEmpty) } it("Filter push down for nested columns") { @@ -254,4 +511,16 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll { } } } + + def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => Unit): Unit = { + val parquetFiles = new File(path).listFiles().filter(_.getName.endsWith(".parquet")) + parquetFiles.foreach { filePath => + val metadata = ParquetFileReader.open( + HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration())) + .getFooter.getFileMetaData.getKeyValueMetaData + assert(metadata.containsKey("geo")) + val geo = parseJson(metadata.get("geo")) + body(geo) + } + } }