Skip to content

Commit

Permalink
Allow omitting CRS by setting geoparquet.crs to "" (empty string)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kontinuation committed Dec 29, 2023
1 parent cff270d commit 921d79b
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 55 deletions.
10 changes: 3 additions & 7 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,21 @@ jobs:
scala: '2.12.8'
python: '3.8'
hadoop: '3'
- spark: '3.4.0'
scala: '2.12.8'
python: '3.7'
hadoop: '3'
- spark: '3.3.0'
scala: '2.12.8'
python: '3.8'
hadoop: '3'
- spark: '3.2.0'
scala: '2.12.8'
python: '3.7'
python: '3.8'
hadoop: '2.7'
- spark: '3.1.2'
scala: '2.12.8'
python: '3.7'
python: '3.8'
hadoop: '2.7'
- spark: '3.0.3'
scala: '2.12.8'
python: '3.7'
python: '3.8'
hadoop: '2.7'

steps:
Expand Down
4 changes: 2 additions & 2 deletions python/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ pytest-cov = "*"
[packages]
shapely="<=1.8.5"
pandas="<=1.3.5"
geopandas="<=0.10.2"
geopandas="<=0.13.2"
pyspark=">=2.3.0"
attrs="*"
pyarrow="*"
keplergl = "==0.3.2"
pydeck = "===0.8.0"

[requires]
python_version = "3.7"
python_version = "3.8"
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {

private var geoParquetVersion: Option[String] = None
private var defaultGeoParquetCrs: Option[JValue] = None
private val geoParquetColumnCrsMap: mutable.Map[String, JValue] = mutable.Map.empty
private val geoParquetColumnCrsMap: mutable.Map[String, Option[JValue]] = mutable.Map.empty

override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
Expand Down Expand Up @@ -138,9 +138,15 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
case null => Some(VERSION)
case version: String => Some(version)
}
defaultGeoParquetCrs = Option(configuration.get(GEOPARQUET_CRS_KEY)).map(parse(_))
defaultGeoParquetCrs = configuration.get(GEOPARQUET_CRS_KEY) match {
case null | "" => None
case crs: String => Some(parse(crs))
}
geometryColumnInfoMap.keys.map(schema(_).name).foreach { name =>
Option(configuration.get(GEOPARQUET_CRS_KEY + "." + name)).foreach(crs => geoParquetColumnCrsMap.put(name, parse(crs)))
Option(configuration.get(GEOPARQUET_CRS_KEY + "." + name)).foreach {
case "" => geoParquetColumnCrsMap.put(name, None)
case crs: String => geoParquetColumnCrsMap.put(name, Some(parse(crs)))
}
}

val messageType = new SparkToParquetSchemaConverter(configuration).convert(schema)
Expand Down Expand Up @@ -187,11 +193,11 @@ class GeoParquetWriteSupport extends WriteSupport[InternalRow] with Logging {
val bbox = if (geometryTypes.nonEmpty) {
Seq(columnInfo.bbox.minX, columnInfo.bbox.minY, columnInfo.bbox.maxX, columnInfo.bbox.maxY)
} else Seq(0.0, 0.0, 0.0, 0.0)
val crs = geoParquetColumnCrsMap.get(columnName).orElse(defaultGeoParquetCrs)
val crs = geoParquetColumnCrsMap.getOrElse(columnName, defaultGeoParquetCrs)
columnName -> GeometryFieldMetaData("WKB", geometryTypes, bbox, crs)
}.toMap
val geoParquetMetadata = GeoParquetMetaData(geoParquetVersion, primaryColumn, columns)
implicit val formats: org.json4s.Formats = DefaultFormats.preservingEmptyValues
implicit val formats: org.json4s.Formats = DefaultFormats
val geoParquetMetadataJson = compactJson(Extraction.decompose(geoParquetMetadata).underscoreKeys)
metadata.put("geo", geoParquetMetadataJson)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,18 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll {
df.write.format("geoparquet").save(geoParquetSavePath)

// Find parquet files in geoParquetSavePath directory and validate their metadata
val parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet"))
parquetFiles.foreach { filePath =>
val metadata = ParquetFileReader.open(
HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration()))
.getFooter.getFileMetaData.getKeyValueMetaData
assert(metadata.containsKey("geo"))
val geo = parseJson(metadata.get("geo"))
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
implicit val formats : org.json4s.Formats = org.json4s.DefaultFormats
val version = (geo \ "version").extract[String]
assert(version == GeoParquetMetaData.VERSION)
val g0Types = (geo \ "columns" \ "g0" \ "geometry_types").extract[Seq[String]]
val g1Types = (geo \ "columns" \ "g1" \ "geometry_types").extract[Seq[String]]
assert(g0Types.sorted == Seq("Point", "Point Z", "MultiPoint").sorted)
assert(g1Types.sorted == Seq("Polygon", "Polygon Z", "MultiLineString").sorted)
val g0Crs = geo \ "columns" \ "g0" \ "crs"
val g1Crs = geo \ "columns" \ "g1" \ "crs"
assert(g0Crs == org.json4s.JNothing)
assert(g1Crs == org.json4s.JNothing)
}

// Read GeoParquet with multiple geometry columns
Expand All @@ -181,13 +179,7 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll {
assert(df2.schema.fields(1).dataType.isInstanceOf[GeometryUDT])
assert(0 == df2.count())

val parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet"))
parquetFiles.foreach { filePath =>
val metadata = ParquetFileReader.open(
HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration()))
.getFooter.getFileMetaData.getKeyValueMetaData
assert(metadata.containsKey("geo"))
val geo = parseJson(metadata.get("geo"))
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
val g0Types = (geo \ "columns" \ "g" \ "geometry_types").extract[Seq[String]]
val g0BBox = (geo \ "columns" \ "g" \ "bbox").extract[Seq[Double]]
Expand Down Expand Up @@ -246,21 +238,15 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll {
| }
|}
|""".stripMargin
var geoParquetSavePath = geoparquetoutputlocation + "/gp_custom_meta.parquet"
df.write.format("geoparquet")
.option("geoparquet.version", "10.9.8")
.option("geoparquet.crs", projjson)
.mode("overwrite").save(geoparquetoutputlocation + "/gp_custom_meta.parquet")
val geoParquetSavePath = geoparquetoutputlocation + "/gp_custom_meta.parquet"
.mode("overwrite").save(geoParquetSavePath)
val df2 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
assert(df2.count() == df.count())

val parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet"))
parquetFiles.foreach { filePath =>
val metadata = ParquetFileReader.open(
HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration()))
.getFooter.getFileMetaData.getKeyValueMetaData
assert(metadata.containsKey("geo"))
val geo = parseJson(metadata.get("geo"))
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
val version = (geo \ "version").extract[String]
val columnName = (geo \ "primary_column").extract[String]
Expand All @@ -269,6 +255,33 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll {
assert(crs.isInstanceOf[org.json4s.JObject])
assert(crs == parseJson(projjson))
}

// Setting crs to null explicitly
geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_null.parquet"
df.write.format("geoparquet")
.option("geoparquet.crs", "null")
.mode("overwrite").save(geoParquetSavePath)
val df3 = sparkSession.read.format("geoparquet").load(geoParquetSavePath)
assert(df3.count() == df.count())

validateGeoParquetMetadata(geoParquetSavePath) { geo =>
implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
val columnName = (geo \ "primary_column").extract[String]
val crs = geo \ "columns" \ columnName \ "crs"
assert(crs == org.json4s.JNull)
}

// Setting crs to "" to omit crs
geoParquetSavePath = geoparquetoutputlocation + "/gp_crs_omit.parquet"
df.write.format("geoparquet")
.option("geoparquet.crs", "")
.mode("overwrite").save(geoParquetSavePath)
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
implicit val formats: org.json4s.Formats = org.json4s.DefaultFormats
val columnName = (geo \ "primary_column").extract[String]
val crs = geo \ "columns" \ columnName \ "crs"
assert(crs == org.json4s.JNothing)
}
}

it("GeoParquet save should support specifying per-column CRS") {
Expand Down Expand Up @@ -386,15 +399,7 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll {
.option("geoparquet.crs", projjson0)
.option("geoparquet.crs.g1", projjson1)
.mode("overwrite").save(geoParquetSavePath)

// Find parquet files in geoParquetSavePath directory and validate their metadata
var parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet"))
parquetFiles.foreach { filePath =>
val metadata = ParquetFileReader.open(
HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration()))
.getFooter.getFileMetaData.getKeyValueMetaData
assert(metadata.containsKey("geo"))
val geo = parseJson(metadata.get("geo"))
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
val g0Crs = geo \ "columns" \ "g0" \ "crs"
val g1Crs = geo \ "columns" \ "g1" \ "crs"
assert(g0Crs == parseJson(projjson0))
Expand All @@ -405,19 +410,36 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll {
df.write.format("geoparquet")
.option("geoparquet.crs.g1", projjson1)
.mode("overwrite").save(geoParquetSavePath)

parquetFiles = new File(geoParquetSavePath).listFiles().filter(_.getName.endsWith(".parquet"))
parquetFiles.foreach { filePath =>
val metadata = ParquetFileReader.open(
HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration()))
.getFooter.getFileMetaData.getKeyValueMetaData
assert(metadata.containsKey("geo"))
val geo = parseJson(metadata.get("geo"))
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
val g0Crs = geo \ "columns" \ "g0" \ "crs"
val g1Crs = geo \ "columns" \ "g1" \ "crs"
assert(g0Crs == org.json4s.JNull)
assert(g0Crs == org.json4s.JNothing)
assert(g1Crs == parseJson(projjson1))
}

// Write with CRS, explicitly set CRS to null for g1
df.write.format("geoparquet")
.option("geoparquet.crs", projjson0)
.option("geoparquet.crs.g1", "null")
.mode("overwrite").save(geoParquetSavePath)
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
val g0Crs = geo \ "columns" \ "g0" \ "crs"
val g1Crs = geo \ "columns" \ "g1" \ "crs"
assert(g0Crs == parseJson(projjson0))
assert(g1Crs == org.json4s.JNull)
}

// Write with CRS, explicitly omit CRS for g1
df.write.format("geoparquet")
.option("geoparquet.crs", projjson0)
.option("geoparquet.crs.g1", "")
.mode("overwrite").save(geoParquetSavePath)
validateGeoParquetMetadata(geoParquetSavePath) { geo =>
val g0Crs = geo \ "columns" \ "g0" \ "crs"
val g1Crs = geo \ "columns" \ "g1" \ "crs"
assert(g0Crs == parseJson(projjson0))
assert(g1Crs == org.json4s.JNothing)
}
}

it("GeoParquet load should raise exception when loading plain parquet files") {
Expand Down Expand Up @@ -477,4 +499,16 @@ class geoparquetIOTests extends TestBaseScala with BeforeAndAfterAll {
}
}
}

def validateGeoParquetMetadata(path: String)(body: org.json4s.JValue => Unit): Unit = {
val parquetFiles = new File(path).listFiles().filter(_.getName.endsWith(".parquet"))
parquetFiles.foreach { filePath =>
val metadata = ParquetFileReader.open(
HadoopInputFile.fromPath(new Path(filePath.getPath), new Configuration()))
.getFooter.getFileMetaData.getKeyValueMetaData
assert(metadata.containsKey("geo"))
val geo = parseJson(metadata.get("geo"))
body(geo)
}
}
}

0 comments on commit 921d79b

Please sign in to comment.