Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-705] Add unique partitioner wrapper to enable partitioned writes with Sedona #1778

Merged
merged 25 commits into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions docs/tutorial/sql.md
Original file line number Diff line number Diff line change
Expand Up @@ -1638,6 +1638,45 @@ Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD. Ple
spatialDf = StructuredAdapter.toDf(spatialRDD, sedona)
```

### SpatialRDD to DataFrame with spatial partitioning

By default, `StructuredAdapter.toDf()` does not preserve spatial partitions because doing so
may introduce duplicate features for most types of spatial data. These duplicates
are introduced on purpose to ensure correctness when performing a spatial join;
however, when using Sedona to prepare a dataset for distribution this is not typically
desired.
jiayuasu marked this conversation as resolved.
Show resolved Hide resolved

You can use `StructuredAdapter` and the `spatialRDD.spatialPartitioningWithoutDuplicates` function to obtain a Sedona DataFrame that is spatially partitioned without duplicates. This is especially useful for generating balanced GeoParquet files while preserving spatial proximity within files, which is crucial for optimizing filter pushdown performance in GeoParquet files.

=== "Scala"

```scala
spatialRDD.spatialParitioningWithoutDuplicates(GridType.KDBTREE)
// Specify the desired number of partitions as 10, though the actual number may vary
// spatialRDD.spatialParitioningWithoutDuplicates(GridType.KDBTREE, 10)
var spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona)
```

=== "Java"

```java
spatialRDD.spatialParitioningWithoutDuplicates(GridType.KDBTREE)
// Specify the desired number of partitions as 10, though the actual number may vary
// spatialRDD.spatialParitioningWithoutDuplicates(GridType.KDBTREE, 10)
Dataset<Row> spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona)
```

=== "Python"

```python
from sedona.utils.structured_adapter import StructuredAdapter

spatialRDD.spatialPartitioningWithoutDuplicates(GridType.KDBTREE)
# Specify the desired number of partitions as 10, though the actual number may vary
# spatialRDD.spatialParitioningWithoutDuplicates(GridType.KDBTREE, 10)
spatialDf = StructuredAdapter.toSpatialPartitionedDf(spatialRDD, sedona)
```

### SpatialPairRDD to DataFrame

PairRDD is the result of a spatial join query or distance join query. SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you need to provide the schema of the left and right RDDs.
Expand Down
59 changes: 53 additions & 6 deletions python/sedona/core/SpatialRDD/spatial_rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import List, Optional, Union

import attr
from py4j.java_gateway import get_field
from py4j.java_gateway import get_field, get_method
from pyspark import RDD, SparkContext, StorageLevel
from pyspark.sql import SparkSession

Expand Down Expand Up @@ -51,6 +51,15 @@ def from_java_class_name(cls, jvm_partitioner) -> "SpatialPartitioner":

return cls(partitioner, jvm_partitioner)

def getGrids(self) -> List[Envelope]:
jvm_grids = get_method(self.jvm_partitioner, "getGrids")()
number_of_grids = jvm_grids.size()
envelopes = [
Envelope.from_jvm_instance(jvm_grids[index])
for index in range(number_of_grids)
]
return envelopes


@attr.s
class JvmSpatialRDD:
Expand Down Expand Up @@ -422,11 +431,49 @@ def spatialPartitioning(
num_partitions: Optional[int] = None,
) -> bool:
"""
Calculate partitions and assign items in this RDD to a partition.

:param partitioning: partitioning type
:param num_partitions: number of partitions
:return:
:param partitioning: Partitioning type or existing SpatialPartitioner
(e.g., one obtained from another SpatialRDD to align partitions among
input data)
:param num_partitions: If partitioning is a GridType, the target
number of partitions into which the RDD should be split.
:return: True on success
"""
return self._spatial_partitioning_impl(
partitioning, num_partitions, self._srdd.spatialPartitioning
)

def spatialPartitioningWithoutDuplicates(
self,
partitioning: Union[str, GridType, SpatialPartitioner, List[Envelope]],
num_partitions: Optional[int] = None,
) -> bool:
"""
Calculate partitions and assign items in this RDD to a partition without
introducing duplicates. This is not the desired behaviour for
executing joins but is the correct option when partitioning in
preparation for a distributed write.

:param partitioning: Partitioning type or existing SpatialPartitioner
(e.g., one obtained from another SpatialRDD to align partitions among
input data)
:param num_partitions: If partitioning is a GridType, the target
number of partitions into which the RDD should be split.
:return: True on success
"""
return self._spatial_partitioning_impl(
partitioning,
num_partitions,
self._srdd.spatialPartitioningWithoutDuplicates,
)

def _spatial_partitioning_impl(
self,
partitioning: Union[str, GridType, SpatialPartitioner, List[Envelope]],
num_partitions: Optional[int],
java_method,
) -> bool:
if type(partitioning) == str:
grid = GridTypeJvm(self._jvm, GridType.from_str(partitioning)).jvm_instance
elif type(partitioning) == GridType:
Expand All @@ -446,9 +493,9 @@ def spatialPartitioning(
self._spatial_partitioned = True

if num_partitions:
return self._srdd.spatialPartitioning(grid, num_partitions)
return java_method(grid, num_partitions)
else:
return self._srdd.spatialPartitioning(grid)
return java_method(grid)

def set_srdd(self, srdd):
self._srdd = srdd
Expand Down
26 changes: 26 additions & 0 deletions python/tests/spatial_rdd/test_spatial_rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from sedona.core.enums import FileDataSplitter, GridType, IndexType
from sedona.core.formatMapper.geo_json_reader import GeoJsonReader
from sedona.utils.adapter import Adapter
from sedona.core.geom.envelope import Envelope
from sedona.core.SpatialRDD import PointRDD

Expand Down Expand Up @@ -126,6 +127,10 @@ def test_get_partitioner(self):
else:
assert spatial_rdd.getPartitioner().name == "FlatGridPartitioner"

grids = spatial_rdd.getPartitioner().getGrids()
assert len(grids) > 0
assert all(isinstance(grid, Envelope) for grid in grids)

def test_get_raw_spatial_rdd(self):
spatial_rdd = self.create_spatial_rdd()
assert isinstance(spatial_rdd.getRawSpatialRDD(), RDD)
Expand Down Expand Up @@ -154,3 +159,24 @@ def test_partition_tree(self):
spatial_rdd.spatialPartitioning(GridType.QUADTREE)

print(spatial_rdd.getPartitioner())

def test_partition_unique(self):
grids = [
Envelope(0.0, 10.0, 0.0, 10.0),
Envelope(10.0, 20.0, 0.0, 10.0),
Envelope(0.0, 10.0, 10.0, 20.0),
Envelope(10.0, 20.0, 10.0, 20.0),
]

df = self.spark.createDataFrame(
[("POLYGON ((5 5, 15 5, 15 15, 5 15, 5 5))",)], ["wkt"]
).selectExpr("ST_GeomFromText(wkt) as geometry")
spatial_rdd = Adapter.toSpatialRdd(df, "geometry")

spatial_rdd.spatialPartitioning(grids)
assert spatial_rdd.spatialPartitionedRDD.count() == 5
assert spatial_rdd.getPartitioner().getGrids() == grids

spatial_rdd.spatialPartitioningWithoutDuplicates(grids)
assert spatial_rdd.spatialPartitionedRDD.count() == 1
spatial_rdd.getPartitioner().getGrids() == grids
22 changes: 22 additions & 0 deletions python/tests/sql/test_structured_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import glob
import tempfile

from pyspark.sql import DataFrame

from sedona.core.SpatialRDD import CircleRDD
Expand Down Expand Up @@ -58,3 +61,22 @@ def test_distance_join_result_to_dataframe(self):
join_result_pair_rdd, schema, schema, self.spark
)
assert join_result_df.count() == 1

def test_spatial_partitioned_write(self):
xys = [(i, i // 100, i % 100) for i in range(1_000)]
df = self.spark.createDataFrame(xys, ["id", "x", "y"]).selectExpr(
"id", "ST_Point(x, y) AS geom"
)

rdd = StructuredAdapter.toSpatialRdd(df, "geom")
rdd.analyze()
rdd.spatialPartitioningWithoutDuplicates(GridType.KDBTREE, num_partitions=16)
n_spatial_partitions = rdd.spatialPartitionedRDD.getNumPartitions()
assert n_spatial_partitions >= 16

partitioned_df = StructuredAdapter.toSpatialPartitionedDf(rdd, self.spark)

with tempfile.TemporaryDirectory() as td:
out = td + "/out"
partitioned_df.write.format("geoparquet").save(out)
assert len(glob.glob(out + "/*.parquet")) == n_spatial_partitions
10 changes: 10 additions & 0 deletions python/tests/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from sedona.utils.decorators import classproperty

SPARK_REMOTE = os.getenv("SPARK_REMOTE")
EXTRA_JARS = os.getenv("SEDONA_PYTHON_EXTRA_JARS")

from shapely import wkt
from shapely.geometry.base import BaseGeometry
Expand All @@ -36,6 +37,10 @@ class TestBase:
@classproperty
def spark(self):
if not hasattr(self, "__spark"):
# This lets a caller override the value of SPARK_HOME to just use whatever
# is provided by pyspark. Otherwise, export SPARK_HOME="" has no effect.
if "SPARK_HOME" in os.environ and not os.environ["SPARK_HOME"]:
del os.environ["SPARK_HOME"]

builder = SedonaContext.builder()
if SPARK_REMOTE:
Expand All @@ -53,6 +58,11 @@ def spark(self):
else:
builder = builder.master("local[*]")

# Allows the Sedona .jar to be explicitly set by the caller (e.g, to run
# pytest against a freshly-built development version of Sedona)
if EXTRA_JARS:
builder.config("spark.jars", EXTRA_JARS)

spark = SedonaContext.create(builder.getOrCreate())

if not SPARK_REMOTE:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sedona.core.spatialPartitioning;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.sedona.core.enums.GridType;
import org.apache.sedona.core.joinJudgement.DedupParams;
import org.locationtech.jts.geom.Envelope;
import org.locationtech.jts.geom.Geometry;
import scala.Tuple2;

public class GenericUniquePartitioner extends SpatialPartitioner {
private SpatialPartitioner parent;

public GenericUniquePartitioner(SpatialPartitioner parent) {
this.parent = parent;
}

public GridType getGridType() {
return parent.gridType;
}

public List<Envelope> getGrids() {
return parent.grids;
}

@Override
public Iterator<Tuple2<Integer, Geometry>> placeObject(Geometry spatialObject) throws Exception {
// Rather than take the first result from the parent, consume the entire iterator
// and return the partition with the minimum ID. This ensures that given the same
// (parent) partitioner, the output partitions from this method will be consistent.
Iterator<Tuple2<Integer, Geometry>> it = parent.placeObject(spatialObject);
int minParitionId = Integer.MAX_VALUE;
Geometry minGeometry = null;
while (it.hasNext()) {
Tuple2<Integer, Geometry> value = it.next();
if (value._1() < minParitionId) {
minParitionId = value._1();
minGeometry = value._2();
}
}

HashSet<Tuple2<Integer, Geometry>> out = new HashSet<Tuple2<Integer, Geometry>>();
if (minGeometry != null) {
out.add(new Tuple2<Integer, Geometry>(minParitionId, minGeometry));
}

return out.iterator();
Comment on lines +51 to +67
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could also just take it.Next() here (i.e., always return the first one) and modify the placeObject implementations to not return an iterator off of a HashSet (i.e., write our own implementation of Iterator, which might be better anyway).

The motivation here is to ensure that the output is deterministic (i.e., if you set grids and ask Sedona to partition, you'll get the same result if you run your pipeline today or tomorrow).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with your current approach. We'd better add comment to the code selecting the partitioning result with the minimum partition id to signify that this is for producing consistent result.

}

@Override
@Nullable
public DedupParams getDedupParams() {
throw new UnsupportedOperationException("Unique partitioner cannot deduplicate join results");
}

@Override
public int numPartitions() {
return parent.numPartitions();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public IndexedGridPartitioner(
}

public IndexedGridPartitioner(GridType gridType, List<Envelope> grids) {
this(gridType, grids, false);
this(gridType, grids, true);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This aligns the default between the IndexedGridPartitioner and the FlatGridPartitioner

}

public IndexedGridPartitioner(List<Envelope> grids, Boolean preserveUncontainedGeometries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public abstract class SpatialPartitioner extends Partitioner implements Serializ
protected final GridType gridType;
protected final List<Envelope> grids;

protected SpatialPartitioner() {
gridType = null;
grids = null;
}
Comment on lines +38 to +41
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cleaner way to do this would probably be to remove the gridType and grids field and make getGridType() and getGrids() abstract? I could also remove this constructror and just pass the GenericUniquePartitioner's parent grids/grid type through here.


protected SpatialPartitioner(GridType gridType, List<Envelope> grids) {
this.gridType = gridType;
this.grids = Objects.requireNonNull(grids, "grids");
Expand Down
Loading
Loading