Skip to content

Commit

Permalink
[SEDONA-703] Introducing the StructuredAdapter class (apache#1780)
Browse files Browse the repository at this point in the history
* Initial commit

* Refactor the code according to comments

* Add Python API

* Update docs and fix lints
  • Loading branch information
jiayuasu authored Feb 4, 2025
1 parent 08a86e4 commit 79c1da8
Show file tree
Hide file tree
Showing 14 changed files with 574 additions and 351 deletions.
262 changes: 12 additions & 250 deletions docs/tutorial/rdd.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,197 +32,31 @@ Please refer to [Create Sedona config](sql.md#create-sedona-config) to create a

Please refer to [Initiate SedonaContext](sql.md#initiate-sedonacontext) to initiate a SedonaContext.

## Create a SpatialRDD
## Create a SpatialRDD from SedonaSQL DataFrame

### Create a typed SpatialRDD

Sedona-core provides three special SpatialRDDs: PointRDD, PolygonRDD, and LineStringRDD.

!!!warning
Typed SpatialRDD has been deprecated for a long time. We do NOT recommend it anymore.

### Create a generic SpatialRDD

A generic SpatialRDD is not typed to a certain geometry type and open to more scenarios. It allows an input data file contains mixed types of geometries. For instance, a WKT file contains three types geometries ==LineString==, ==Polygon== and ==MultiPolygon==.

#### From WKT/WKB

Geometries in a WKT and WKB file always occupy a single column no matter how many coordinates they have. Sedona provides `WktReader` and `WkbReader` to create generic SpatialRDD.

Suppose we have a `checkin.tsv` WKT TSV file at Path `/Download/checkin.tsv` as follows:

```
POINT (-88.331492 32.324142) hotel
POINT (-88.175933 32.360763) gas
POINT (-88.388954 32.357073) bar
POINT (-88.221102 32.35078) restaurant
```

This file has two columns and corresponding ==offsets==(Column IDs) are 0, 1. Column 0 is the WKT string and Column 1 is the checkin business type.

Use the following code to create a SpatialRDD
Please refer to [Create a Geometry type column](sql.md#create-a-geometry-type-column) to create a Geometry type column. Then you can create a SpatialRDD from the DataFrame.

=== "Scala"

```scala
val inputLocation = "/Download/checkin.tsv"
val wktColumn = 0 // The WKT string starts from Column 0
val allowTopologyInvalidGeometries = true // Optional
val skipSyntaxInvalidGeometries = false // Optional
val spatialRDD = WktReader.readToGeometryRDD(sedona.sparkContext, inputLocation, wktColumn, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
var spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty")
```

=== "Java"

```java
String inputLocation = "/Download/checkin.tsv"
int wktColumn = 0 // The WKT string starts from Column 0
boolean allowTopologyInvalidGeometries = true // Optional
boolean skipSyntaxInvalidGeometries = false // Optional
SpatialRDD spatialRDD = WktReader.readToGeometryRDD(sedona.sparkContext, inputLocation, wktColumn, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
SpatialRDD spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty")
```

=== "Python"

```python
from sedona.core.formatMapper import WktReader
from sedona.core.formatMapper import WkbReader

WktReader.readToGeometryRDD(sc, wkt_geometries_location, 0, True, False)

WkbReader.readToGeometryRDD(sc, wkb_geometries_location, 0, True, False)
```

#### From GeoJSON

!!!note
Reading GeoJSON using SpatialRDD is not recommended. Please use [Sedona SQL and DataFrame API](sql.md#load-geojson-data) to read GeoJSON files.

Geometries in GeoJSON is similar to WKT/WKB. However, a GeoJSON file must be beaked into multiple lines.

Suppose we have a `polygon.json` GeoJSON file at Path `/Download/polygon.json` as follows:

```
{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "077", "TRACTCE": "011501", "BLKGRPCE": "5", "AFFGEOID": "1500000US010770115015", "GEOID": "010770115015", "NAME": "5", "LSAD": "BG", "ALAND": 6844991, "AWATER": 32636 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -87.621765, 34.873444 ], [ -87.617535, 34.873369 ], [ -87.6123, 34.873337 ], [ -87.604049, 34.873303 ], [ -87.604033, 34.872316 ], [ -87.60415, 34.867502 ], [ -87.604218, 34.865687 ], [ -87.604409, 34.858537 ], [ -87.604018, 34.851336 ], [ -87.603716, 34.844829 ], [ -87.603696, 34.844307 ], [ -87.603673, 34.841884 ], [ -87.60372, 34.841003 ], [ -87.603879, 34.838423 ], [ -87.603888, 34.837682 ], [ -87.603889, 34.83763 ], [ -87.613127, 34.833938 ], [ -87.616451, 34.832699 ], [ -87.621041, 34.831431 ], [ -87.621056, 34.831526 ], [ -87.62112, 34.831925 ], [ -87.621603, 34.8352 ], [ -87.62158, 34.836087 ], [ -87.621383, 34.84329 ], [ -87.621359, 34.844438 ], [ -87.62129, 34.846387 ], [ -87.62119, 34.85053 ], [ -87.62144, 34.865379 ], [ -87.621765, 34.873444 ] ] ] } },
{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "045", "TRACTCE": "021102", "BLKGRPCE": "4", "AFFGEOID": "1500000US010450211024", "GEOID": "010450211024", "NAME": "4", "LSAD": "BG", "ALAND": 11360854, "AWATER": 0 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -85.719017, 31.297901 ], [ -85.715626, 31.305203 ], [ -85.714271, 31.307096 ], [ -85.69999, 31.307552 ], [ -85.697419, 31.307951 ], [ -85.675603, 31.31218 ], [ -85.672733, 31.312876 ], [ -85.672275, 31.311977 ], [ -85.67145, 31.310988 ], [ -85.670622, 31.309524 ], [ -85.670729, 31.307622 ], [ -85.669876, 31.30666 ], [ -85.669796, 31.306224 ], [ -85.670356, 31.306178 ], [ -85.671664, 31.305583 ], [ -85.67177, 31.305299 ], [ -85.671878, 31.302764 ], [ -85.671344, 31.302123 ], [ -85.668276, 31.302076 ], [ -85.66566, 31.30093 ], [ -85.665687, 31.30022 ], [ -85.669183, 31.297677 ], [ -85.668703, 31.295638 ], [ -85.671985, 31.29314 ], [ -85.677177, 31.288211 ], [ -85.678452, 31.286376 ], [ -85.679236, 31.28285 ], [ -85.679195, 31.281426 ], [ -85.676865, 31.281049 ], [ -85.674661, 31.28008 ], [ -85.674377, 31.27935 ], [ -85.675714, 31.276882 ], [ -85.677938, 31.275168 ], [ -85.680348, 31.276814 ], [ -85.684032, 31.278848 ], [ -85.684387, 31.279082 ], [ -85.692398, 31.283499 ], [ -85.705032, 31.289718 ], [ -85.706755, 31.290476 ], [ -85.718102, 31.295204 ], [ -85.719132, 31.29689 ], [ -85.719017, 31.297901 ] ] ] } },
{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "055", "TRACTCE": "001300", "BLKGRPCE": "3", "AFFGEOID": "1500000US010550013003", "GEOID": "010550013003", "NAME": "3", "LSAD": "BG", "ALAND": 1378742, "AWATER": 247387 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -86.000685, 34.00537 ], [ -85.998837, 34.009768 ], [ -85.998012, 34.010398 ], [ -85.987865, 34.005426 ], [ -85.986656, 34.004552 ], [ -85.985, 34.002659 ], [ -85.98851, 34.001502 ], [ -85.987567, 33.999488 ], [ -85.988666, 33.99913 ], [ -85.992568, 33.999131 ], [ -85.993144, 33.999714 ], [ -85.994876, 33.995153 ], [ -85.998823, 33.989548 ], [ -85.999925, 33.994237 ], [ -86.000616, 34.000028 ], [ -86.000685, 34.00537 ] ] ] } },
{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "089", "TRACTCE": "001700", "BLKGRPCE": "2", "AFFGEOID": "1500000US010890017002", "GEOID": "010890017002", "NAME": "2", "LSAD": "BG", "ALAND": 1040641, "AWATER": 0 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -86.574172, 34.727375 ], [ -86.562684, 34.727131 ], [ -86.562797, 34.723865 ], [ -86.562957, 34.723168 ], [ -86.562336, 34.719766 ], [ -86.557381, 34.719143 ], [ -86.557352, 34.718322 ], [ -86.559921, 34.717363 ], [ -86.564827, 34.718513 ], [ -86.567582, 34.718565 ], [ -86.570572, 34.718577 ], [ -86.573618, 34.719377 ], [ -86.574172, 34.727375 ] ] ] } },
```

Use the following code to create a generic SpatialRDD:

=== "Scala"

```scala
val inputLocation = "/Download/polygon.json"
val allowTopologyInvalidGeometries = true // Optional
val skipSyntaxInvalidGeometries = false // Optional
val spatialRDD = GeoJsonReader.readToGeometryRDD(sedona.sparkContext, inputLocation, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
```

=== "Java"

```java
String inputLocation = "/Download/polygon.json"
boolean allowTopologyInvalidGeometries = true // Optional
boolean skipSyntaxInvalidGeometries = false // Optional
SpatialRDD spatialRDD = GeoJsonReader.readToGeometryRDD(sedona.sparkContext, inputLocation, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries)
```

=== "Python"

```python
from sedona.core.formatMapper import GeoJsonReader

GeoJsonReader.readToGeometryRDD(sc, geo_json_file_location)
```

!!!warning
The way that Sedona reads JSON file is different from SparkSQL

#### From Shapefile

=== "Scala"

```scala
val shapefileInputLocation="/Download/myshapefile"
val spatialRDD = ShapefileReader.readToGeometryRDD(sedona.sparkContext, shapefileInputLocation)
```

=== "Java"

```java
String shapefileInputLocation="/Download/myshapefile"
SpatialRDD spatialRDD = ShapefileReader.readToGeometryRDD(sedona.sparkContext, shapefileInputLocation)
```

=== "Python"

```python
from sedona.core.formatMapper.shapefileParser import ShapefileReader

ShapefileReader.readToGeometryRDD(sc, shape_file_location)
```

!!!note
The path to the shapefile is the path to the folder that contains the .shp file, not the path to the .shp file itself. The file extensions of .shp, .shx, .dbf must be in lowercase. Assume you have a shape file called ==myShapefile==, the path should be `XXX/myShapefile`. The file structure should be like this:
from sedona.utils.structured_adapter import StructuredAdapter

spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty")
```
- shapefile1
- shapefile2
- myshapefile
- myshapefile.shp
- myshapefile.shx
- myshapefile.dbf
- myshapefile...
- ...
```

If the file you are reading contains non-ASCII characters you'll need to explicitly set the Spark config before initializing the SparkSession, then you can use `ShapefileReader.readToGeometryRDD`.

Example:

```scala
spark.driver.extraJavaOptions -Dsedona.global.charset=utf8
spark.executor.extraJavaOptions -Dsedona.global.charset=utf8
```

#### From SedonaSQL DataFrame

!!!note
More details about SedonaSQL, please read the SedonaSQL tutorial.

To create a generic SpatialRDD from CSV, TSV, WKT, WKB and GeoJSON input formats, you can use SedonaSQL.

We use checkin.csv CSV file as the example. You can create a generic SpatialRDD using the following steps:

1. Load data in SedonaSQL.

```scala
var df = sedona.read.format("csv").option("header", "false").load(csvPointInputLocation)
df.createOrReplaceTempView("inputtable")
```

2. Create a Geometry type column in SedonaSQL

```scala
var spatialDf = sedona.sql(
"""
|SELECT ST_Point(CAST(inputtable._c0 AS Decimal(24,20)),CAST(inputtable._c1 AS Decimal(24,20))) AS checkin
|FROM inputtable
""".stripMargin)
```

3. Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD

```scala
var spatialRDD = Adapter.toSpatialRdd(spatialDf, "checkin")
```

"checkin" is the name of the geometry column

For WKT/WKB data, please use ==ST_GeomFromWKT / ST_GeomFromWKB == instead.
"usacounty" is the name of the geometry column. It is an optional parameter. If you don't provide it, the first geometry column will be used.

## Transform the Coordinate Reference System

Expand Down Expand Up @@ -284,33 +118,6 @@ To convert Coordinate Reference System of an SpatialRDD, use the following code:

The details CRS information can be found on [EPSG.io](https://epsg.io/)

## Read other attributes in an SpatialRDD

Each SpatialRDD can carry non-spatial attributes such as price, age and name.

The other attributes are combined together to a string and stored in ==UserData== field of each geometry.

To retrieve the UserData field, use the following code:

=== "Scala"

```scala
val rddWithOtherAttributes = objectRDD.rawSpatialRDD.rdd.map[String](f=>f.getUserData.asInstanceOf[String])
```

=== "Java"

```java
SpatialRDD<Geometry> spatialRDD = Adapter.toSpatialRdd(spatialDf, "arealandmark");
spatialRDD.rawSpatialRDD.map(obj -> {return obj.getUserData();});
```

=== "Python"

```python
rdd_with_other_attributes = object_rdd.rawSpatialRDD.map(lambda x: x.getUserData())
```

## Write a Spatial Range Query

A spatial range query takes as input a range query window and an SpatialRDD and returns all geometries that have specified relationship with the query window.
Expand Down Expand Up @@ -380,7 +187,7 @@ Assume you now have a SpatialRDD (typed or generic). You can use the following c
consider_boundary_intersection = False ## Only return gemeotries fully covered by the window
using_index = False
query_result = RangeQueryRaw.SpatialRangeQuery(spatial_rdd, range_query_window, consider_boundary_intersection, using_index)
gdf = Adapter.toDf(query_result, spark, ["col1", ..., "coln"])
gdf = StructuredAdapter.toDf(query_result, spark, ["col1", ..., "coln"])
```

### Range query window
Expand Down Expand Up @@ -872,6 +679,7 @@ The index should be built on either one of two SpatialRDDs. In general, you shou
from sedona.core.SpatialRDD import CircleRDD
from sedona.core.enums import GridType
from sedona.core.spatialOperator import JoinQueryRaw
from sedona.utils.structured_adapter import StructuredAdapter

object_rdd.analyze()

Expand All @@ -886,7 +694,7 @@ The index should be built on either one of two SpatialRDDs. In general, you shou

result = JoinQueryRaw.DistanceJoinQueryFlat(spatial_rdd, circle_rdd, using_index, consider_boundary_intersection)

gdf = Adapter.toDf(result, ["left_col1", ..., "lefcoln"], ["rightcol1", ..., "rightcol2"], spark)
gdf = StructuredAdapter.toDf(result, ["left_col1", ..., "lefcoln"], ["rightcol1", ..., "rightcol2"], spark)
```

## Write a Distance Join Query
Expand Down Expand Up @@ -972,45 +780,10 @@ The output format of the distance join query is [here](#output-format-2).

## Save to permanent storage

You can always save an SpatialRDD back to some permanent storage such as HDFS and Amazon S3. You can save distributed SpatialRDD to WKT, GeoJSON and object files.

!!!note
Non-spatial attributes such as price, age and name will also be stored to permanent storage.
You can always save an SpatialRDD back to some permanent storage such as HDFS and Amazon S3.

### Save an SpatialRDD (not indexed)

Typed SpatialRDD and generic SpatialRDD can be saved to permanent storage.

#### Save to distributed WKT text file

Use the following code to save an SpatialRDD as a distributed WKT text file:

```scala
objectRDD.rawSpatialRDD.saveAsTextFile("hdfs://PATH")
objectRDD.saveAsWKT("hdfs://PATH")
```

#### Save to distributed WKB text file

Use the following code to save an SpatialRDD as a distributed WKB text file:

```scala
objectRDD.saveAsWKB("hdfs://PATH")
```

#### Save to distributed GeoJSON text file

!!!note
Saving GeoJSON using SpatialRDD is not recommended. Please use [Sedona SQL and DataFrame API](sql.md#save-as-geojson) to write GeoJSON files.

Use the following code to save an SpatialRDD as a distributed GeoJSON text file:

```scala
objectRDD.saveAsGeoJSON("hdfs://PATH")
```

#### Save to distributed object file

Use the following code to save an SpatialRDD as a distributed object file:

=== "Scala/Java"
Expand All @@ -1032,8 +805,6 @@ Use the following code to save an SpatialRDD as a distributed object file:

Indexed typed SpatialRDD and generic SpatialRDD can be saved to permanent storage. However, the indexed SpatialRDD has to be stored as a distributed object file.

#### Save to distributed object file

Use the following code to save an SpatialRDD as a distributed object file:

```
Expand All @@ -1046,16 +817,7 @@ A spatial partitioned RDD can be saved to permanent storage but Spark is not abl

### Reload a saved SpatialRDD

You can easily reload an SpatialRDD that has been saved to ==a distributed object file==.

#### Load to a typed SpatialRDD

!!!warning
Typed SpatialRDD has been deprecated for a long time. We do NOT recommend it anymore.

#### Load to a generic SpatialRDD

Use the following code to reload the SpatialRDD:
You can easily reload an SpatialRDD that has been saved to ==a distributed object file==. Use the following code to reload the SpatialRDD:

=== "Scala"

Expand Down
Loading

0 comments on commit 79c1da8

Please sign in to comment.