From 79c1da83754ecbbf9b6457876147ca0d35b224bb Mon Sep 17 00:00:00 2001 From: Jia Yu Date: Tue, 4 Feb 2025 09:52:51 -0800 Subject: [PATCH] [SEDONA-703] Introducing the StructuredAdapter class (#1780) * Initial commit * Refactor the code according to comments * Add Python API * Update docs and fix lints --- docs/tutorial/rdd.md | 262 +----------------- docs/tutorial/sql.md | 104 +------ python/sedona/register/java_libs.py | 1 + python/sedona/utils/adapter.py | 10 +- python/sedona/utils/structured_adapter.py | 126 +++++++++ python/tests/sql/test_structured_adapter.py | 60 ++++ .../sedona/core/spatialRDD/SpatialRDD.java | 3 + .../org/apache/sedona/sql/utils/Adapter.scala | 7 + .../org/apache/sedona/stats/Weighting.scala | 2 +- .../sedona/stats/clustering/DBSCAN.scala | 2 +- .../outlierDetection/LocalOutlierFactor.scala | 3 +- .../org/apache/sedona/util/DfUtils.scala | 6 +- .../adapters/StructuredAdapter.scala | 229 +++++++++++++++ .../sql/structuredAdapterTestScala.scala | 110 ++++++++ 14 files changed, 574 insertions(+), 351 deletions(-) create mode 100644 python/sedona/utils/structured_adapter.py create mode 100644 python/tests/sql/test_structured_adapter.py create mode 100644 spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala create mode 100644 spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala diff --git a/docs/tutorial/rdd.md b/docs/tutorial/rdd.md index b36dde62a4..fd5a1fcfde 100644 --- a/docs/tutorial/rdd.md +++ b/docs/tutorial/rdd.md @@ -32,197 +32,31 @@ Please refer to [Create Sedona config](sql.md#create-sedona-config) to create a Please refer to [Initiate SedonaContext](sql.md#initiate-sedonacontext) to initiate a SedonaContext. -## Create a SpatialRDD +## Create a SpatialRDD from SedonaSQL DataFrame -### Create a typed SpatialRDD - -Sedona-core provides three special SpatialRDDs: PointRDD, PolygonRDD, and LineStringRDD. - -!!!warning - Typed SpatialRDD has been deprecated for a long time. We do NOT recommend it anymore. - -### Create a generic SpatialRDD - -A generic SpatialRDD is not typed to a certain geometry type and open to more scenarios. It allows an input data file contains mixed types of geometries. For instance, a WKT file contains three types geometries ==LineString==, ==Polygon== and ==MultiPolygon==. - -#### From WKT/WKB - -Geometries in a WKT and WKB file always occupy a single column no matter how many coordinates they have. Sedona provides `WktReader` and `WkbReader` to create generic SpatialRDD. - -Suppose we have a `checkin.tsv` WKT TSV file at Path `/Download/checkin.tsv` as follows: - -``` -POINT (-88.331492 32.324142) hotel -POINT (-88.175933 32.360763) gas -POINT (-88.388954 32.357073) bar -POINT (-88.221102 32.35078) restaurant -``` - -This file has two columns and corresponding ==offsets==(Column IDs) are 0, 1. Column 0 is the WKT string and Column 1 is the checkin business type. - -Use the following code to create a SpatialRDD +Please refer to [Create a Geometry type column](sql.md#create-a-geometry-type-column) to create a Geometry type column. Then you can create a SpatialRDD from the DataFrame. === "Scala" ```scala - val inputLocation = "/Download/checkin.tsv" - val wktColumn = 0 // The WKT string starts from Column 0 - val allowTopologyInvalidGeometries = true // Optional - val skipSyntaxInvalidGeometries = false // Optional - val spatialRDD = WktReader.readToGeometryRDD(sedona.sparkContext, inputLocation, wktColumn, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries) + var spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty") ``` === "Java" ```java - String inputLocation = "/Download/checkin.tsv" - int wktColumn = 0 // The WKT string starts from Column 0 - boolean allowTopologyInvalidGeometries = true // Optional - boolean skipSyntaxInvalidGeometries = false // Optional - SpatialRDD spatialRDD = WktReader.readToGeometryRDD(sedona.sparkContext, inputLocation, wktColumn, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries) + SpatialRDD spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty") ``` === "Python" ```python - from sedona.core.formatMapper import WktReader - from sedona.core.formatMapper import WkbReader - - WktReader.readToGeometryRDD(sc, wkt_geometries_location, 0, True, False) - - WkbReader.readToGeometryRDD(sc, wkb_geometries_location, 0, True, False) - ``` - -#### From GeoJSON - -!!!note - Reading GeoJSON using SpatialRDD is not recommended. Please use [Sedona SQL and DataFrame API](sql.md#load-geojson-data) to read GeoJSON files. - -Geometries in GeoJSON is similar to WKT/WKB. However, a GeoJSON file must be beaked into multiple lines. - -Suppose we have a `polygon.json` GeoJSON file at Path `/Download/polygon.json` as follows: - -``` -{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "077", "TRACTCE": "011501", "BLKGRPCE": "5", "AFFGEOID": "1500000US010770115015", "GEOID": "010770115015", "NAME": "5", "LSAD": "BG", "ALAND": 6844991, "AWATER": 32636 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -87.621765, 34.873444 ], [ -87.617535, 34.873369 ], [ -87.6123, 34.873337 ], [ -87.604049, 34.873303 ], [ -87.604033, 34.872316 ], [ -87.60415, 34.867502 ], [ -87.604218, 34.865687 ], [ -87.604409, 34.858537 ], [ -87.604018, 34.851336 ], [ -87.603716, 34.844829 ], [ -87.603696, 34.844307 ], [ -87.603673, 34.841884 ], [ -87.60372, 34.841003 ], [ -87.603879, 34.838423 ], [ -87.603888, 34.837682 ], [ -87.603889, 34.83763 ], [ -87.613127, 34.833938 ], [ -87.616451, 34.832699 ], [ -87.621041, 34.831431 ], [ -87.621056, 34.831526 ], [ -87.62112, 34.831925 ], [ -87.621603, 34.8352 ], [ -87.62158, 34.836087 ], [ -87.621383, 34.84329 ], [ -87.621359, 34.844438 ], [ -87.62129, 34.846387 ], [ -87.62119, 34.85053 ], [ -87.62144, 34.865379 ], [ -87.621765, 34.873444 ] ] ] } }, -{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "045", "TRACTCE": "021102", "BLKGRPCE": "4", "AFFGEOID": "1500000US010450211024", "GEOID": "010450211024", "NAME": "4", "LSAD": "BG", "ALAND": 11360854, "AWATER": 0 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -85.719017, 31.297901 ], [ -85.715626, 31.305203 ], [ -85.714271, 31.307096 ], [ -85.69999, 31.307552 ], [ -85.697419, 31.307951 ], [ -85.675603, 31.31218 ], [ -85.672733, 31.312876 ], [ -85.672275, 31.311977 ], [ -85.67145, 31.310988 ], [ -85.670622, 31.309524 ], [ -85.670729, 31.307622 ], [ -85.669876, 31.30666 ], [ -85.669796, 31.306224 ], [ -85.670356, 31.306178 ], [ -85.671664, 31.305583 ], [ -85.67177, 31.305299 ], [ -85.671878, 31.302764 ], [ -85.671344, 31.302123 ], [ -85.668276, 31.302076 ], [ -85.66566, 31.30093 ], [ -85.665687, 31.30022 ], [ -85.669183, 31.297677 ], [ -85.668703, 31.295638 ], [ -85.671985, 31.29314 ], [ -85.677177, 31.288211 ], [ -85.678452, 31.286376 ], [ -85.679236, 31.28285 ], [ -85.679195, 31.281426 ], [ -85.676865, 31.281049 ], [ -85.674661, 31.28008 ], [ -85.674377, 31.27935 ], [ -85.675714, 31.276882 ], [ -85.677938, 31.275168 ], [ -85.680348, 31.276814 ], [ -85.684032, 31.278848 ], [ -85.684387, 31.279082 ], [ -85.692398, 31.283499 ], [ -85.705032, 31.289718 ], [ -85.706755, 31.290476 ], [ -85.718102, 31.295204 ], [ -85.719132, 31.29689 ], [ -85.719017, 31.297901 ] ] ] } }, -{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "055", "TRACTCE": "001300", "BLKGRPCE": "3", "AFFGEOID": "1500000US010550013003", "GEOID": "010550013003", "NAME": "3", "LSAD": "BG", "ALAND": 1378742, "AWATER": 247387 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -86.000685, 34.00537 ], [ -85.998837, 34.009768 ], [ -85.998012, 34.010398 ], [ -85.987865, 34.005426 ], [ -85.986656, 34.004552 ], [ -85.985, 34.002659 ], [ -85.98851, 34.001502 ], [ -85.987567, 33.999488 ], [ -85.988666, 33.99913 ], [ -85.992568, 33.999131 ], [ -85.993144, 33.999714 ], [ -85.994876, 33.995153 ], [ -85.998823, 33.989548 ], [ -85.999925, 33.994237 ], [ -86.000616, 34.000028 ], [ -86.000685, 34.00537 ] ] ] } }, -{ "type": "Feature", "properties": { "STATEFP": "01", "COUNTYFP": "089", "TRACTCE": "001700", "BLKGRPCE": "2", "AFFGEOID": "1500000US010890017002", "GEOID": "010890017002", "NAME": "2", "LSAD": "BG", "ALAND": 1040641, "AWATER": 0 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -86.574172, 34.727375 ], [ -86.562684, 34.727131 ], [ -86.562797, 34.723865 ], [ -86.562957, 34.723168 ], [ -86.562336, 34.719766 ], [ -86.557381, 34.719143 ], [ -86.557352, 34.718322 ], [ -86.559921, 34.717363 ], [ -86.564827, 34.718513 ], [ -86.567582, 34.718565 ], [ -86.570572, 34.718577 ], [ -86.573618, 34.719377 ], [ -86.574172, 34.727375 ] ] ] } }, - -``` - -Use the following code to create a generic SpatialRDD: - -=== "Scala" - - ```scala - val inputLocation = "/Download/polygon.json" - val allowTopologyInvalidGeometries = true // Optional - val skipSyntaxInvalidGeometries = false // Optional - val spatialRDD = GeoJsonReader.readToGeometryRDD(sedona.sparkContext, inputLocation, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries) - ``` - -=== "Java" - - ```java - String inputLocation = "/Download/polygon.json" - boolean allowTopologyInvalidGeometries = true // Optional - boolean skipSyntaxInvalidGeometries = false // Optional - SpatialRDD spatialRDD = GeoJsonReader.readToGeometryRDD(sedona.sparkContext, inputLocation, allowTopologyInvalidGeometries, skipSyntaxInvalidGeometries) - ``` - -=== "Python" - - ```python - from sedona.core.formatMapper import GeoJsonReader - - GeoJsonReader.readToGeometryRDD(sc, geo_json_file_location) - ``` - -!!!warning - The way that Sedona reads JSON file is different from SparkSQL - -#### From Shapefile - -=== "Scala" - - ```scala - val shapefileInputLocation="/Download/myshapefile" - val spatialRDD = ShapefileReader.readToGeometryRDD(sedona.sparkContext, shapefileInputLocation) - ``` - -=== "Java" - - ```java - String shapefileInputLocation="/Download/myshapefile" - SpatialRDD spatialRDD = ShapefileReader.readToGeometryRDD(sedona.sparkContext, shapefileInputLocation) - ``` - -=== "Python" - - ```python - from sedona.core.formatMapper.shapefileParser import ShapefileReader - - ShapefileReader.readToGeometryRDD(sc, shape_file_location) - ``` - -!!!note - The path to the shapefile is the path to the folder that contains the .shp file, not the path to the .shp file itself. The file extensions of .shp, .shx, .dbf must be in lowercase. Assume you have a shape file called ==myShapefile==, the path should be `XXX/myShapefile`. The file structure should be like this: + from sedona.utils.structured_adapter import StructuredAdapter + spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty") ``` - - shapefile1 - - shapefile2 - - myshapefile - - myshapefile.shp - - myshapefile.shx - - myshapefile.dbf - - myshapefile... - - ... - ``` - -If the file you are reading contains non-ASCII characters you'll need to explicitly set the Spark config before initializing the SparkSession, then you can use `ShapefileReader.readToGeometryRDD`. - -Example: - -```scala -spark.driver.extraJavaOptions -Dsedona.global.charset=utf8 -spark.executor.extraJavaOptions -Dsedona.global.charset=utf8 -``` - -#### From SedonaSQL DataFrame - -!!!note - More details about SedonaSQL, please read the SedonaSQL tutorial. - -To create a generic SpatialRDD from CSV, TSV, WKT, WKB and GeoJSON input formats, you can use SedonaSQL. - -We use checkin.csv CSV file as the example. You can create a generic SpatialRDD using the following steps: - -1. Load data in SedonaSQL. - -```scala -var df = sedona.read.format("csv").option("header", "false").load(csvPointInputLocation) -df.createOrReplaceTempView("inputtable") -``` - -2. Create a Geometry type column in SedonaSQL - -```scala -var spatialDf = sedona.sql( - """ - |SELECT ST_Point(CAST(inputtable._c0 AS Decimal(24,20)),CAST(inputtable._c1 AS Decimal(24,20))) AS checkin - |FROM inputtable - """.stripMargin) -``` -3. Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD - -```scala -var spatialRDD = Adapter.toSpatialRdd(spatialDf, "checkin") -``` - -"checkin" is the name of the geometry column - -For WKT/WKB data, please use ==ST_GeomFromWKT / ST_GeomFromWKB == instead. +"usacounty" is the name of the geometry column. It is an optional parameter. If you don't provide it, the first geometry column will be used. ## Transform the Coordinate Reference System @@ -284,33 +118,6 @@ To convert Coordinate Reference System of an SpatialRDD, use the following code: The details CRS information can be found on [EPSG.io](https://epsg.io/) -## Read other attributes in an SpatialRDD - -Each SpatialRDD can carry non-spatial attributes such as price, age and name. - -The other attributes are combined together to a string and stored in ==UserData== field of each geometry. - -To retrieve the UserData field, use the following code: - -=== "Scala" - - ```scala - val rddWithOtherAttributes = objectRDD.rawSpatialRDD.rdd.map[String](f=>f.getUserData.asInstanceOf[String]) - ``` - -=== "Java" - - ```java - SpatialRDD spatialRDD = Adapter.toSpatialRdd(spatialDf, "arealandmark"); - spatialRDD.rawSpatialRDD.map(obj -> {return obj.getUserData();}); - ``` - -=== "Python" - - ```python - rdd_with_other_attributes = object_rdd.rawSpatialRDD.map(lambda x: x.getUserData()) - ``` - ## Write a Spatial Range Query A spatial range query takes as input a range query window and an SpatialRDD and returns all geometries that have specified relationship with the query window. @@ -380,7 +187,7 @@ Assume you now have a SpatialRDD (typed or generic). You can use the following c consider_boundary_intersection = False ## Only return gemeotries fully covered by the window using_index = False query_result = RangeQueryRaw.SpatialRangeQuery(spatial_rdd, range_query_window, consider_boundary_intersection, using_index) - gdf = Adapter.toDf(query_result, spark, ["col1", ..., "coln"]) + gdf = StructuredAdapter.toDf(query_result, spark, ["col1", ..., "coln"]) ``` ### Range query window @@ -872,6 +679,7 @@ The index should be built on either one of two SpatialRDDs. In general, you shou from sedona.core.SpatialRDD import CircleRDD from sedona.core.enums import GridType from sedona.core.spatialOperator import JoinQueryRaw + from sedona.utils.structured_adapter import StructuredAdapter object_rdd.analyze() @@ -886,7 +694,7 @@ The index should be built on either one of two SpatialRDDs. In general, you shou result = JoinQueryRaw.DistanceJoinQueryFlat(spatial_rdd, circle_rdd, using_index, consider_boundary_intersection) - gdf = Adapter.toDf(result, ["left_col1", ..., "lefcoln"], ["rightcol1", ..., "rightcol2"], spark) + gdf = StructuredAdapter.toDf(result, ["left_col1", ..., "lefcoln"], ["rightcol1", ..., "rightcol2"], spark) ``` ## Write a Distance Join Query @@ -972,45 +780,10 @@ The output format of the distance join query is [here](#output-format-2). ## Save to permanent storage -You can always save an SpatialRDD back to some permanent storage such as HDFS and Amazon S3. You can save distributed SpatialRDD to WKT, GeoJSON and object files. - -!!!note - Non-spatial attributes such as price, age and name will also be stored to permanent storage. +You can always save an SpatialRDD back to some permanent storage such as HDFS and Amazon S3. ### Save an SpatialRDD (not indexed) -Typed SpatialRDD and generic SpatialRDD can be saved to permanent storage. - -#### Save to distributed WKT text file - -Use the following code to save an SpatialRDD as a distributed WKT text file: - -```scala -objectRDD.rawSpatialRDD.saveAsTextFile("hdfs://PATH") -objectRDD.saveAsWKT("hdfs://PATH") -``` - -#### Save to distributed WKB text file - -Use the following code to save an SpatialRDD as a distributed WKB text file: - -```scala -objectRDD.saveAsWKB("hdfs://PATH") -``` - -#### Save to distributed GeoJSON text file - -!!!note - Saving GeoJSON using SpatialRDD is not recommended. Please use [Sedona SQL and DataFrame API](sql.md#save-as-geojson) to write GeoJSON files. - -Use the following code to save an SpatialRDD as a distributed GeoJSON text file: - -```scala -objectRDD.saveAsGeoJSON("hdfs://PATH") -``` - -#### Save to distributed object file - Use the following code to save an SpatialRDD as a distributed object file: === "Scala/Java" @@ -1032,8 +805,6 @@ Use the following code to save an SpatialRDD as a distributed object file: Indexed typed SpatialRDD and generic SpatialRDD can be saved to permanent storage. However, the indexed SpatialRDD has to be stored as a distributed object file. -#### Save to distributed object file - Use the following code to save an SpatialRDD as a distributed object file: ``` @@ -1046,16 +817,7 @@ A spatial partitioned RDD can be saved to permanent storage but Spark is not abl ### Reload a saved SpatialRDD -You can easily reload an SpatialRDD that has been saved to ==a distributed object file==. - -#### Load to a typed SpatialRDD - -!!!warning - Typed SpatialRDD has been deprecated for a long time. We do NOT recommend it anymore. - -#### Load to a generic SpatialRDD - -Use the following code to reload the SpatialRDD: +You can easily reload an SpatialRDD that has been saved to ==a distributed object file==. Use the following code to reload the SpatialRDD: === "Scala" diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md index 35730e8c98..ffea04132a 100644 --- a/docs/tutorial/sql.md +++ b/docs/tutorial/sql.md @@ -565,10 +565,6 @@ The character encoding of string attributes are inferred from the `.cpg` file. I df = sedona.read.format("shapefile").option("charset", "UTF-8").load("/path/to/shapefile") ``` -### (Deprecated) Loading Shapefile using SpatialRDD - -If you are using Sedona earlier than v`1.7.0`, you can load shapefiles as SpatialRDD and converted to DataFrame using Adapter. Please read [Load SpatialRDD](rdd.md#create-a-generic-spatialrdd) and [DataFrame <-> RDD](#convert-between-dataframe-and-spatialrdd). - ## Load GeoParquet Since v`1.3.0`, Sedona natively supports loading GeoParquet file. Sedona will infer geometry fields using the "geo" metadata in GeoParquet files. @@ -1594,32 +1590,29 @@ my_postgis_db# alter table my_table alter column geom type geometry; ### DataFrame to SpatialRDD -Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD. Please read [Adapter Scaladoc](../api/scaladoc/spark/org/apache/sedona/sql/utils/index.html) +Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD. === "Scala" ```scala - var spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty") + var spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty") ``` === "Java" ```java - SpatialRDD spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty") + SpatialRDD spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty") ``` === "Python" ```python - from sedona.utils.adapter import Adapter + from sedona.utils.structured_adapter import StructuredAdapter - spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty") + spatialRDD = StructuredAdapter.toSpatialRdd(spatialDf, "usacounty") ``` -"usacounty" is the name of the geometry column - -!!!warning - Only one Geometry type column is allowed per DataFrame. +"usacounty" is the name of the geometry column. It is an optional parameter. If you don't provide it, the first geometry column will be used. ### SpatialRDD to DataFrame @@ -1628,109 +1621,42 @@ Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD. Ple === "Scala" ```scala - var spatialDf = Adapter.toDf(spatialRDD, sedona) + var spatialDf = StructuredAdapter.toDf(spatialRDD, sedona) ``` === "Java" ```java - Dataset spatialDf = Adapter.toDf(spatialRDD, sedona) + Dataset spatialDf = StructuredAdapter.toDf(spatialRDD, sedona) ``` === "Python" ```python - from sedona.utils.adapter import Adapter + from sedona.utils.adapter import StructuredAdapter - spatialDf = Adapter.toDf(spatialRDD, sedona) - ``` - -All other attributes such as price and age will be also brought to the DataFrame as long as you specify ==carryOtherAttributes== (see [Read other attributes in an SpatialRDD](rdd.md#read-other-attributes-in-an-spatialrdd)). - -You may also manually specify a schema for the resulting DataFrame in case you require different column names or data -types. Note that string schemas and not all data types are supported—please check the -[Adapter Scaladoc](../api/javadoc/sql/org/apache/sedona/sql/utils/index.html) to confirm what is supported for your use -case. At least one column for the user data must be provided. - -=== "Scala" - - ```scala - val schema = StructType(Array( - StructField("county", GeometryUDT, nullable = true), - StructField("name", StringType, nullable = true), - StructField("price", DoubleType, nullable = true), - StructField("age", IntegerType, nullable = true) - )) - val spatialDf = Adapter.toDf(spatialRDD, schema, sedona) + spatialDf = StructuredAdapter.toDf(spatialRDD, sedona) ``` ### SpatialPairRDD to DataFrame -PairRDD is the result of a spatial join query or distance join query. SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you need to provide the name of other attributes. +PairRDD is the result of a spatial join query or distance join query. SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you need to provide the schema of the left and right RDDs. === "Scala" ```scala - var joinResultDf = Adapter.toDf(joinResultPairRDD, Seq("left_attribute1", "left_attribute2"), Seq("right_attribute1", "right_attribute2"), sedona) + var joinResultDf = StructuredAdapter.toDf(joinResultPairRDD, leftDf.schema, rightDf.schema, sedona) ``` === "Java" ```java - import scala.collection.JavaConverters; - - List leftFields = new ArrayList<>(Arrays.asList("c1", "c2", "c3")); - List rightFields = new ArrayList<>(Arrays.asList("c4", "c5", "c6")); - Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, JavaConverters.asScalaBuffer(leftFields).toSeq(), JavaConverters.asScalaBuffer(rightFields).toSeq(), sedona); + Dataset joinResultDf = StructuredAdapter.toDf(joinResultPairRDD, leftDf.schema, rightDf.schema, sedona); ``` - === "Python" ```python - from sedona.utils.adapter import Adapter + from sedona.utils.adapter import StructuredAdapter - joinResultDf = Adapter.toDf(jvm_sedona_rdd, ["poi_from_id", "poi_from_name"], ["poi_to_id", "poi_to_name"], spark)) - ``` -or you can use the attribute names directly from the input RDD - -=== "Scala" - - ```scala - import scala.collection.JavaConversions._ - var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames, rightRdd.fieldNames, sedona) - ``` - -=== "Java" - - ```java - import scala.collection.JavaConverters; - Dataset joinResultDf = Adapter.toDf(joinResultPairRDD, JavaConverters.asScalaBuffer(leftRdd.fieldNames).toSeq(), JavaConverters.asScalaBuffer(rightRdd.fieldNames).toSeq(), sedona); - ``` -=== "Python" - - ```python - from sedona.utils.adapter import Adapter - - joinResultDf = Adapter.toDf(result_pair_rdd, leftRdd.fieldNames, rightRdd.fieldNames, spark) - ``` - -All other attributes such as price and age will be also brought to the DataFrame as long as you specify ==carryOtherAttributes== (see [Read other attributes in an SpatialRDD](rdd.md#read-other-attributes-in-an-spatialrdd)). - -You may also manually specify a schema for the resulting DataFrame in case you require different column names or data -types. Note that string schemas and not all data types are supported—please check the -[Adapter Scaladoc](../api/javadoc/sql/org/apache/sedona/sql/utils/index.html) to confirm what is supported for your use -case. Columns for the left and right user data must be provided. - -=== "Scala" - - ```scala - val schema = StructType(Array( - StructField("leftGeometry", GeometryUDT, nullable = true), - StructField("name", StringType, nullable = true), - StructField("price", DoubleType, nullable = true), - StructField("age", IntegerType, nullable = true), - StructField("rightGeometry", GeometryUDT, nullable = true), - StructField("category", StringType, nullable = true) - )) - val joinResultDf = Adapter.toDf(joinResultPairRDD, schema, sedona) + joinResultDf = StructuredAdapter.pairRddToDf(result_pair_rdd, leftDf.schema, rightDf.schema, spark) ``` diff --git a/python/sedona/register/java_libs.py b/python/sedona/register/java_libs.py index 931488d917..8d1681d15e 100644 --- a/python/sedona/register/java_libs.py +++ b/python/sedona/register/java_libs.py @@ -21,6 +21,7 @@ class SedonaJvmLib(Enum): JoinParams = "org.apache.sedona.python.wrapper.adapters.JoinParamsAdapter" Adapter = "org.apache.sedona.sql.utils.Adapter" + StructuredAdapter = "org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter" JoinQuery = "org.apache.sedona.core.spatialOperator.JoinQuery" KNNQuery = "org.apache.sedona.core.spatialOperator.KNNQuery" RangeQuery = "org.apache.sedona.core.spatialOperator.RangeQuery" diff --git a/python/sedona/utils/adapter.py b/python/sedona/utils/adapter.py index b692fd2fdb..7b978e7212 100644 --- a/python/sedona/utils/adapter.py +++ b/python/sedona/utils/adapter.py @@ -29,16 +29,14 @@ class Adapter(metaclass=MultipleMeta): """ Class which allow to convert between Spark DataFrame and SpatialRDD and reverse. + This class is used to convert between PySpark DataFrame and SpatialRDD. Schema + is lost during the conversion. This should be used if your data starts as a + SpatialRDD and you want to convert it to a DataFrame for further processing. """ @staticmethod def _create_dataframe(jdf, sparkSession: SparkSession) -> DataFrame: - if hasattr(sparkSession, "_wrapped"): - # In Spark < 3.3, use the _wrapped SQLContext - return DataFrame(jdf, sparkSession._wrapped) - else: - # In Spark >= 3.3, use the session directly - return DataFrame(jdf, sparkSession) + return DataFrame(jdf, sparkSession) @classmethod def toRdd(cls, dataFrame: DataFrame) -> "JvmSpatialRDD": diff --git a/python/sedona/utils/structured_adapter.py b/python/sedona/utils/structured_adapter.py new file mode 100644 index 0000000000..bc3b959363 --- /dev/null +++ b/python/sedona/utils/structured_adapter.py @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyspark.sql import DataFrame, SparkSession +from pyspark.sql.types import StructType + +from sedona.core.SpatialRDD.spatial_rdd import SpatialRDD +from sedona.core.spatialOperator.rdd import SedonaPairRDD + + +class StructuredAdapter: + """ + Class which allow to convert between Spark DataFrame and SpatialRDD and reverse. + This class is used to convert between PySpark DataFrame and SpatialRDD. Schema + is lost during the conversion. This should be used if your data starts as a + SpatialRDD and you want to convert it to a DataFrame for further processing. + """ + + @staticmethod + def _create_dataframe(jdf, sparkSession: SparkSession) -> DataFrame: + return DataFrame(jdf, sparkSession) + + @classmethod + def toSpatialRdd( + cls, dataFrame: DataFrame, geometryFieldName: str = None + ) -> SpatialRDD: + """ + Convert a DataFrame to a SpatialRDD + :param dataFrame: + :param geometryFieldName: + :return: + """ + sc = dataFrame._sc + jvm = sc._jvm + if geometryFieldName is None: + srdd = jvm.StructuredAdapter.toSpatialRdd(dataFrame._jdf) + else: + srdd = jvm.StructuredAdapter.toSpatialRdd(dataFrame._jdf, geometryFieldName) + + spatial_rdd = SpatialRDD(sc) + spatial_rdd.set_srdd(srdd) + + return spatial_rdd + + @classmethod + def toDf(cls, spatialRDD: SpatialRDD, sparkSession: SparkSession) -> DataFrame: + """ + Convert a SpatialRDD to a DataFrame + :param spatialRDD: + :param sparkSession: + :return: + """ + sc = spatialRDD._sc + jvm = sc._jvm + + jdf = jvm.StructuredAdapter.toDf(spatialRDD._srdd, sparkSession._jsparkSession) + + df = StructuredAdapter._create_dataframe(jdf, sparkSession) + + return df + + @classmethod + def toSpatialPartitionedDf( + cls, spatialRDD: SpatialRDD, sparkSession: SparkSession + ) -> DataFrame: + """ + Convert a SpatialRDD to a DataFrame. This DataFrame will be spatially partitioned + :param spatialRDD: + :param sparkSession: + :return: + """ + sc = spatialRDD._sc + jvm = sc._jvm + + jdf = jvm.StructuredAdapter.toSpatialPartitionedDf( + spatialRDD._srdd, sparkSession._jsparkSession + ) + + df = StructuredAdapter._create_dataframe(jdf, sparkSession) + + return df + + @classmethod + def pairRddToDf( + cls, + rawPairRDD: SedonaPairRDD, + left_schema: StructType, + right_schema: StructType, + sparkSession: SparkSession, + ) -> DataFrame: + """ + Convert a raw pair RDD to a DataFrame. This is useful when you have a Spatial join result + Args: + rawPairRDD: + left_schema: + right_schema: + sparkSession: + + Returns: + + """ + jvm = sparkSession._jvm + left_schema_json = left_schema.json() + right_schema_json = right_schema.json() + jdf = jvm.StructuredAdapter.toDf( + rawPairRDD.jsrdd, + left_schema_json, + right_schema_json, + sparkSession._jsparkSession, + ) + df = StructuredAdapter._create_dataframe(jdf, sparkSession) + return df diff --git a/python/tests/sql/test_structured_adapter.py b/python/tests/sql/test_structured_adapter.py new file mode 100644 index 0000000000..54fcbc44ef --- /dev/null +++ b/python/tests/sql/test_structured_adapter.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from pyspark.sql import DataFrame + +from sedona.core.SpatialRDD import CircleRDD +from sedona.core.enums import GridType +from sedona.core.spatialOperator import JoinQueryRaw +from sedona.utils.structured_adapter import StructuredAdapter +from tests.test_base import TestBase + + +class TestStructuredAdapter(TestBase): + + def test_df_rdd(self): + spatial_df: DataFrame = self.spark.sql("select ST_MakePoint(1, 1) as geom") + srdd = StructuredAdapter.toSpatialRdd(spatial_df, "geom") + spatial_df = StructuredAdapter.toDf(srdd, self.spark) + assert spatial_df.count() == 1 + + def test_spatial_partitioned_df(self): + spatial_df: DataFrame = self.spark.sql("select ST_MakePoint(1, 1) as geom") + srdd = StructuredAdapter.toSpatialRdd(spatial_df, "geom") + srdd.analyze() + srdd.spatialPartitioning(GridType.KDBTREE, 1) + spatial_df = StructuredAdapter.toSpatialPartitionedDf(srdd, self.spark) + assert spatial_df.count() == 1 + + def test_distance_join_result_to_dataframe(self): + spatial_df: DataFrame = self.spark.sql("select ST_MakePoint(1, 1) as geom") + schema = spatial_df.schema + srdd = StructuredAdapter.toSpatialRdd(spatial_df, "geom") + srdd.analyze() + + circle_rdd = CircleRDD(srdd, 0.001) + + srdd.spatialPartitioning(GridType.QUADTREE) + circle_rdd.spatialPartitioning(srdd.getPartitioner()) + + join_result_pair_rdd = JoinQueryRaw.DistanceJoinQueryFlat( + srdd, circle_rdd, False, True + ) + + join_result_df = StructuredAdapter.pairRddToDf( + join_result_pair_rdd, schema, schema, self.spark + ) + assert join_result_df.count() == 1 diff --git a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java index d81b916183..f286dd1588 100644 --- a/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java +++ b/spark/common/src/main/java/org/apache/sedona/core/spatialRDD/SpatialRDD.java @@ -42,6 +42,7 @@ import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; +import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import org.apache.spark.util.random.SamplingUtils; import org.locationtech.jts.geom.Coordinate; @@ -85,6 +86,8 @@ public class SpatialRDD implements Serializable { public JavaRDD rawSpatialRDD; public List fieldNames; + + public StructType schema; /** The CR stransformation. */ protected boolean CRStransformation = false; /** The source epsg code. */ diff --git a/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala b/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala index 9b1067a25a..96aab1287e 100644 --- a/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala +++ b/spark/common/src/main/scala/org/apache/sedona/sql/utils/Adapter.scala @@ -29,6 +29,13 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.locationtech.jts.geom.Geometry +/** + * Adapter for converting between DataFrame and SpatialRDD. It provides methods to convert + * DataFrame to SpatialRDD and vice versa. The schema information is lost during conversion. It is + * different from [[org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter]] which does not + * lose the schema information during conversion. This should be used if your data starts as a + * SpatialRDD and you want to convert it to DataFrame. + */ object Adapter { /** diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala index 41869817e0..7713674261 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/Weighting.scala @@ -79,7 +79,7 @@ object Weighting { require(alpha < 0, "Alpha must be less than 0") val geometryColumn = geometry match { - case null => getGeometryColumnName(dataframe) + case null => getGeometryColumnName(dataframe.schema) case _ => require( dataframe.schema.fields.exists(_.name == geometry), diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala b/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala index 11bdb8306e..c75291d971 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala @@ -68,7 +68,7 @@ object DBSCAN { clusterColumnName: String = "cluster"): DataFrame = { val geometryCol = geometry match { - case null => getGeometryColumnName(dataframe) + case null => getGeometryColumnName(dataframe.schema) case _ => geometry } diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/outlierDetection/LocalOutlierFactor.scala b/spark/common/src/main/scala/org/apache/sedona/stats/outlierDetection/LocalOutlierFactor.scala index 55e682d079..2595a90852 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/outlierDetection/LocalOutlierFactor.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/outlierDetection/LocalOutlierFactor.scala @@ -73,7 +73,8 @@ object LocalOutlierFactor { if (useSphere) ST_DistanceSphere else ST_Distance val useSpheroidString = if (useSphere) "True" else "False" // for the SQL expression - val geometryColumn = if (geometry == null) getGeometryColumnName(dataframe) else geometry + val geometryColumn = + if (geometry == null) getGeometryColumnName(dataframe.schema) else geometry val KNNFunction = "ST_KNN" diff --git a/spark/common/src/main/scala/org/apache/sedona/util/DfUtils.scala b/spark/common/src/main/scala/org/apache/sedona/util/DfUtils.scala index 18d4bc0c1a..5b2bea2180 100644 --- a/spark/common/src/main/scala/org/apache/sedona/util/DfUtils.scala +++ b/spark/common/src/main/scala/org/apache/sedona/util/DfUtils.scala @@ -18,12 +18,12 @@ */ package org.apache.sedona.util -import org.apache.spark.sql.DataFrame import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT +import org.apache.spark.sql.types.StructType object DfUtils { - def getGeometryColumnName(dataframe: DataFrame): String = { - val geomFields = dataframe.schema.fields.filter(_.dataType == GeometryUDT) + def getGeometryColumnName(schema: StructType): String = { + val geomFields = schema.fields.filter(_.dataType == GeometryUDT) if (geomFields.isEmpty) throw new IllegalArgumentException( diff --git a/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala new file mode 100644 index 0000000000..d2ca5f8f09 --- /dev/null +++ b/spark/common/src/main/scala/org/apache/spark/sql/sedona_sql/adapters/StructuredAdapter.scala @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.spark.sql.sedona_sql.adapters + +import org.apache.sedona.core.spatialRDD.SpatialRDD +import org.apache.sedona.sql.utils.GeometrySerializer +import org.apache.sedona.util.DfUtils +import org.apache.spark.api.java.JavaPairRDD +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.locationtech.jts.geom.Geometry +import org.slf4j.{Logger, LoggerFactory} + +/** + * Adapter for converting between DataFrame and SpatialRDD. It provides methods to convert + * DataFrame to SpatialRDD and vice versa without losing schema. It is different from + * [[org.apache.sedona.sql.utils.Adapter]] which loses the schema information during conversion. + * This should be used if your data starts as a DataFrame and you want to convert it to SpatialRDD + */ +object StructuredAdapter { + val logger: Logger = LoggerFactory.getLogger(getClass) + + /** + * Convert RDD[Row] to SpatialRDD. It puts Row as user data of Geometry. + * @param rdd + * @param geometryFieldName + * @return + */ + def toSpatialRdd(rdd: RDD[Row], geometryFieldName: String): SpatialRDD[Geometry] = { + val spatialRDD = new SpatialRDD[Geometry] + if (rdd.isEmpty()) { + spatialRDD.schema = StructType(Seq()) + } else spatialRDD.schema = rdd.first().schema + spatialRDD.rawSpatialRDD = rdd + .map(row => { + val geom = row.getAs[Geometry](geometryFieldName) + geom.setUserData(row.copy()) + geom + }) + spatialRDD + } + + /** + * Convert RDD[Row] to SpatialRDD. It puts Row as user data of Geometry. It auto-detects + * geometry column if geometryFieldName is not provided. It uses the first geometry column in + * RDD. + * @param rdd + * @return + */ + def toSpatialRdd(rdd: RDD[Row]): SpatialRDD[Geometry] = { + require(rdd.count() > 0, "Input RDD cannot be empty.") + toSpatialRdd(rdd, DfUtils.getGeometryColumnName(rdd.first().schema)) + } + + /** + * Convert SpatialRDD to RDD[Row]. It extracts Row from user data of Geometry. + * @param spatialRDD + * @return + */ + def toRowRdd(spatialRDD: SpatialRDD[Geometry]): RDD[Row] = { + spatialRDD.rawSpatialRDD.map(geometry => { + val row = geometry.getUserData.asInstanceOf[Row] + row + }) + } + + /** + * Convert DataFrame to SpatialRDD. It puts InternalRow as user data of Geometry. It allows only + * one geometry column. + * + * @param dataFrame + * @param geometryFieldName + */ + def toSpatialRdd(dataFrame: DataFrame, geometryFieldName: String): SpatialRDD[Geometry] = { + val spatialRDD = new SpatialRDD[Geometry] + spatialRDD.schema = dataFrame.schema + val ordinal = spatialRDD.schema.fieldIndex(geometryFieldName) + spatialRDD.rawSpatialRDD = dataFrame.queryExecution.toRdd + .map(row => { + val geom = GeometrySerializer.deserialize(row.getBinary(ordinal)) + geom.setUserData(row.copy()) + geom + }) + spatialRDD + } + + /** + * Convert DataFrame to SpatialRDD. It puts InternalRow as user data of Geometry. It + * auto-detects geometry column if geometryFieldName is not provided. It uses the first geometry + * column in DataFrame. + * @param dataFrame + * @return + */ + def toSpatialRdd(dataFrame: DataFrame): SpatialRDD[Geometry] = { + toSpatialRdd(dataFrame, DfUtils.getGeometryColumnName(dataFrame.schema)) + } + + /** + * Convert SpatialRDD.rawSpatialRdd to DataFrame + * @param spatialRDD + * The SpatialRDD to convert. It must have rawSpatialRDD set. + * @param sparkSession + * @return + */ + def toDf(spatialRDD: SpatialRDD[Geometry], sparkSession: SparkSession): DataFrame = { + val rowRdd = spatialRDD.rawSpatialRDD.map(geometry => { + val row = geometry.getUserData.asInstanceOf[InternalRow] + row + }) + sparkSession.internalCreateDataFrame(rowRdd, spatialRDD.schema) + } + + /** + * Convert SpatialRDD.spatialPartitionedRDD to DataFrame This is useful when you want to convert + * SpatialRDD after spatial partitioning. + * @param spatialRDD + * The SpatialRDD to convert. It must have spatialPartitionedRDD set. You must call + * spatialPartitioning method before calling this method. + * @param sparkSession + * @return + */ + def toSpatialPartitionedDf( + spatialRDD: SpatialRDD[Geometry], + sparkSession: SparkSession): DataFrame = { + if (spatialRDD.spatialPartitionedRDD == null) + throw new RuntimeException( + "SpatialRDD is not spatially partitioned. Please call spatialPartitioning method before calling this method.") + logger.warn( + "SpatialPartitionedRDD might have duplicate geometries. Please make sure you are aware of it.") + val rowRdd = spatialRDD.spatialPartitionedRDD.map(geometry => { + val row = geometry.getUserData.asInstanceOf[InternalRow] + row + }) + sparkSession.internalCreateDataFrame(rowRdd, spatialRDD.schema) + } + + /** + * Convert JavaPairRDD[Geometry, Geometry] to DataFrame This method is useful when you want to + * convert the result of spatial join to DataFrame. + * @param spatialPairRDD + * The JavaPairRDD to convert. + * @param leftSchemaJson + * Schema of the left side. In a json format. + * @param rightSchemaJson + * Schema of the right side. In a json format. + * @param sparkSession + * @return + */ + def toDf( + spatialPairRDD: JavaPairRDD[Geometry, Geometry], + leftSchemaJson: String, + rightSchemaJson: String, + sparkSession: SparkSession): DataFrame = { + val leftSchema = DataType.fromJson(leftSchemaJson).asInstanceOf[StructType] + val rightSchema = DataType.fromJson(rightSchemaJson).asInstanceOf[StructType] + toDf(spatialPairRDD, leftSchema, rightSchema, sparkSession) + } + + /** + * Convert JavaPairRDD[Geometry, Geometry] to DataFrame This method is useful when you want to + * convert the result of spatial join to DataFrame. + * @param spatialPairRDD + * The JavaPairRDD to convert. + * @param leftSchema + * The schema of the left side. + * @param rightSchema + * The schema of the right side. + * @param sparkSession + * @return + */ + def toDf( + spatialPairRDD: JavaPairRDD[Geometry, Geometry], + leftSchema: StructType, + rightSchema: StructType, + sparkSession: SparkSession): DataFrame = { + val rowRdd = spatialPairRDD.rdd.map(pair => { + val leftRow = pair._1.getUserData.asInstanceOf[InternalRow].toSeq(leftSchema) + val rightRow = pair._2.getUserData.asInstanceOf[InternalRow].toSeq(rightSchema) + InternalRow.fromSeq(leftRow ++ rightRow) + }) + sparkSession.internalCreateDataFrame( + rowRdd, + StructType(leftSchema.fields ++ rightSchema.fields)) + } + + /** + * Convert JavaPairRDD[Geometry, Geometry] to DataFrame This method is useful when you want to + * convert the result of spatial join to DataFrame. + * @param spatialPairRDD + * The JavaPairRDD to convert. + * @param originalLeftSpatialRdd + * The original left SpatialRDD involved in the join. It is used to get the schema of the left + * side. + * @param originalRightSpatialRdd + * The original right SpatialRDD involved in the join. It is used to get the schema of the + * right side. + * @param sparkSession + * @return + */ + def toDf( + spatialPairRDD: JavaPairRDD[Geometry, Geometry], + originalLeftSpatialRdd: SpatialRDD[Geometry], + originalRightSpatialRdd: SpatialRDD[Geometry], + sparkSession: SparkSession): DataFrame = { + toDf( + spatialPairRDD, + originalLeftSpatialRdd.schema, + originalRightSpatialRdd.schema, + sparkSession) + } +} diff --git a/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala new file mode 100644 index 0000000000..6938916a6d --- /dev/null +++ b/spark/common/src/test/scala/org/apache/sedona/sql/structuredAdapterTestScala.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.sql + +import org.apache.sedona.core.enums.{GridType, IndexType} +import org.apache.sedona.core.spatialOperator.{JoinQuery, SpatialPredicate} +import org.apache.sedona.core.spatialRDD.CircleRDD +import org.apache.spark.sql.Row +import org.apache.spark.sql.sedona_sql.adapters.StructuredAdapter +import org.junit.Assert.assertEquals +import org.scalatest.GivenWhenThen + +class structuredAdapterTestScala extends TestBaseScala with GivenWhenThen { + + describe("Structured Adapter") { + it("Should convert DataFrame to SpatialRDD and back") { + val seq = generateTestData() + val geom1 = seq.head._3 + val dfOrigin = sparkSession.createDataFrame(seq) + val rdd = StructuredAdapter.toSpatialRdd(dfOrigin, "_3") + assertGeometryEquals(geom1, rdd.rawSpatialRDD.take(1).get(0)) + val dfConverted = StructuredAdapter.toDf(rdd, sparkSession) + intercept[RuntimeException] { + StructuredAdapter.toSpatialPartitionedDf(rdd, sparkSession) + } + assertEquals(seq.size, dfConverted.count()) + } + + it("Should convert DataFrame to SpatialRDD and back, without specifying geometry column") { + val seq = generateTestData() + val geom1 = seq.head._3 + val dfOrigin = sparkSession.createDataFrame(seq) + val rdd = StructuredAdapter.toSpatialRdd(dfOrigin) + assertGeometryEquals(geom1, rdd.rawSpatialRDD.take(1).get(0)) + val dfConverted = StructuredAdapter.toDf(rdd, sparkSession) + intercept[RuntimeException] { + StructuredAdapter.toSpatialPartitionedDf(rdd, sparkSession) + } + assertEquals(seq.size, dfConverted.count()) + } + + it("Should convert to Rdd and do spatial partitioning") { + val seq = generateTestData() + val dfOrigin = sparkSession.createDataFrame(seq) + val rdd = StructuredAdapter.toSpatialRdd(dfOrigin, "_3") + rdd.analyze() + rdd.spatialPartitioning(GridType.KDBTREE, 10) + val dfConverted = StructuredAdapter.toSpatialPartitionedDf(rdd, sparkSession) + assertEquals(seq.size, dfConverted.count()) + } + + it("Should convert a spatial join result back to DataFrame") { + val pointRdd = + StructuredAdapter.toSpatialRdd(sparkSession.createDataFrame(generateTestData())) + val circleRDD = new CircleRDD(pointRdd, 0.0001) + circleRDD.analyze() + pointRdd.analyze() + circleRDD.spatialPartitioning(GridType.KDBTREE) + pointRdd.spatialPartitioning(circleRDD.getPartitioner) + circleRDD.buildIndex(IndexType.QUADTREE, true) + val pairRdd = + JoinQuery.DistanceJoinQueryFlat(pointRdd, circleRDD, true, SpatialPredicate.INTERSECTS) + var resultDf = + StructuredAdapter.toDf(pairRdd, pointRdd.schema, pointRdd.schema, sparkSession) + assertEquals(pointRdd.rawSpatialRDD.count(), resultDf.count()) + resultDf = + StructuredAdapter.toDf(pairRdd, pointRdd.schema.json, pointRdd.schema.json, sparkSession) + assertEquals(pointRdd.rawSpatialRDD.count(), resultDf.count()) + } + + it("Should convert a SpatialRdd to RowRdd and back") { + val seq = generateTestData() + val dfOrigin = sparkSession.createDataFrame(seq) + val spatialRdd = StructuredAdapter.toSpatialRdd(dfOrigin.rdd) + val rowRdd = StructuredAdapter.toRowRdd(spatialRdd) + assertEquals(seq.size, StructuredAdapter.toSpatialRdd(rowRdd).rawSpatialRDD.count()) + } + + it("Should not be able to convert an empty Row RDD to SpatialRDD if schema is not provided") { + val rdd = sparkSession.sparkContext.parallelize(Seq.empty[Row]) + intercept[IllegalArgumentException] { + StructuredAdapter.toSpatialRdd(rdd) + } + } + + it("Should convert an empty Row RDD to SpatialRDD if schema is provided") { + val rdd = sparkSession.sparkContext.parallelize(Seq.empty[Row]) + val spatialRdd = StructuredAdapter.toSpatialRdd(rdd, null) + assertEquals(0, spatialRdd.rawSpatialRDD.count()) + assertEquals(0, spatialRdd.schema.size) + } + } + +}