Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Expose spatial partitioning from SpatialRDD #1751

Closed

Conversation

paleolimbot
Copy link
Member

@paleolimbot paleolimbot commented Jan 10, 2025

Did you read the Contributor Guide?

Is this PR related to a JIRA ticket?

Closes #1268.

What changes were proposed in this PR?

This PR exposes spatial partitioning information from the SpatialRDD API. Sedona is exceptionally good at this and the spatial community would love to have access to this information!

There are two pieces of information that would be helpful:

  • The actual boundaries
  • A partitioned RDD that remembers the partition identifier (i.e., partitioned results).

There are a few ideas in this PR...the boundaries seem straightforward but I'm a little new to the RDD API to know what the options are for returning these things.

How was this patch tested?

Working on it!

Did this PR include necessary documentation updates?

  • Yes, I am adding a new API. I am using the current SNAPSHOT version number in vX.Y.Z format.
  • Yes, I have updated the documentation. (Or will when the API is settled)

Copy link
Member Author

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also would benefit from a SpatialPartitioner that removes duplicates (perhaps by wrapping a SpatialPartitioner, consuming the result of placeObject and deterministically choosing one of the results), since most of the time having duplicates when partitioning is not really desired.

@paleolimbot
Copy link
Member Author

Ok! This seems to work:

import os
import pyspark
from sedona.spark import SedonaContext
if "SPARK_HOME" in os.environ:
    del os.environ["SPARK_HOME"]
pyspark_version = pyspark.__version__[:pyspark.__version__.rfind(".")]

config = (
    SedonaContext.builder()
    .config(
        "spark.jars",
        "spark-shaded/target/sedona-spark-shaded-3.3_2.12-1.7.1-SNAPSHOT.jar",
    )
    .config(
        "spark.jars.packages",
        "org.datasyslab:geotools-wrapper:1.7.0-28.5",
    )
    .config(
        "spark.jars.repositories",
        "https://artifacts.unidata.ucar.edu/repository/unidata-all",
    )
    .getOrCreate()
)
sedona = SedonaContext.create(config)
from sedona.spark import Adapter, GridType

!rm -rf cities_maybe_partitioned

df =  sedona.read.format("geoparquet").load("cities.parquet")
rdd = Adapter.toSpatialRdd(df, "geometry")
rdd.analyze()
rdd.spatialPartitioning(GridType.KDBTREE, num_partitions=6)

df2 = Adapter.toDfPartitioned(rdd, sedona)
df2.write.format("geoparquet").save("cities_maybe_partitioned")

!ls cities_maybe_partitioned/*.parquet
#> cities_maybe_partitioned/part-00000-809113f8-6f63-4763-bbd0-3ba609efcdfd-c000.snappy.parquet
#> cities_maybe_partitioned/part-00001-809113f8-6f63-4763-bbd0-3ba609efcdfd-c000.snappy.parquet
#> cities_maybe_partitioned/part-00002-809113f8-6f63-4763-bbd0-3ba609efcdfd-c000.snappy.parquet
#> cities_maybe_partitioned/part-00003-809113f8-6f63-4763-bbd0-3ba609efcdfd-c000.snappy.parquet
#> cities_maybe_partitioned/part-00004-809113f8-6f63-4763-bbd0-3ba609efcdfd-c000.snappy.parquet
#> cities_maybe_partitioned/part-00005-809113f8-6f63-4763-bbd0-3ba609efcdfd-c000.snappy.parquet
#> cities_maybe_partitioned/part-00006-809113f8-6f63-4763-bbd0-3ba609efcdfd-c000.snappy.parquet
#> cities_maybe_partitioned/part-00007-809113f8-6f63-4763-bbd0-3ba609efcdfd-c000.snappy.parquet

@github-actions github-actions bot added the root label Jan 22, 2025
@james-willis
Copy link
Contributor

Would it be better to add a preservePartitions arguments to the toDf method?

@github-actions github-actions bot removed the root label Jan 24, 2025
@github-actions github-actions bot added the docs label Jan 27, 2025
@paleolimbot paleolimbot marked this pull request as ready for review January 27, 2025 19:32
@paleolimbot paleolimbot requested a review from jiayuasu as a code owner January 27, 2025 19:32
Copy link
Member Author

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this to be universally useful (and to remove the warning in code and in the docs), we'll need partitioners that don't introduce duplicates. I'm happy to do that here or in another PR (with my preference being another PR since the changes are largely orthogonal).

Would it be better to add a preservePartitions arguments to the toDf method?

@james-willis I didn't forget about this! I'm happy to defer to anything here...I added it as a separate function because there were already a lot of toDf overloads and I wasn't sure if there would have to be any more options for toDfPartitioned (there aren't yet, so maybe it wasn't needed!).

Comment on lines +261 to +263
@transient lazy val log = LoggerFactory.getLogger(getClass.getName)
log.warn(
"toDfParitioned() may introduce duplicates when used with non-specialized partitioning")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the correct way to go about this? Other classes use with Logging to get a instance-specific logger, but I didn't know how to rig that here since the Adapter is an object and not a class?

@jiayuasu
Copy link
Member

jiayuasu commented Feb 4, 2025

Close in favor of #1780

@jiayuasu jiayuasu closed this Feb 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Preserve Spatial Partitioning From RDD to Dataframe
3 participants