From 97769e3054f1871d931638f00bc7308603e24b93 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Mon, 16 Sep 2024 18:37:04 -0700 Subject: [PATCH 01/11] add dbscan scala --- .../scala/org/apache/sedona/stats/Util.scala | 41 +++++ .../sedona/stats/clustering/DBSCAN.scala | 153 ++++++++++++++++++ 2 files changed, 194 insertions(+) create mode 100644 spark/common/src/main/scala/org/apache/sedona/stats/Util.scala create mode 100644 spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala b/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala new file mode 100644 index 0000000000..408a4e1272 --- /dev/null +++ b/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT + +private[stats] object Util { + def getGeometryColumnName(dataframe: DataFrame): String = { + val geomFields = dataframe.schema.fields.filter(_.dataType == GeometryUDT) + if (geomFields.length > 1) { + if (geomFields.exists(_.name == "geometry")) { + "geometry" + } else { + throw new IllegalArgumentException( + "Multiple GeometryType columns found. Provide the column name as an argument.") + } + } else if (geomFields.length == 1) { + geomFields.head.name + } else { + throw new IllegalArgumentException( + "No GeometryType column found. Provide a dataframe containing a geometry column.") + } + } +} diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala b/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala new file mode 100644 index 0000000000..9f3666cf4e --- /dev/null +++ b/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sedona.stats.clustering + +import org.apache.sedona.stats.Util.getGeometryColumnName +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT +import org.apache.spark.sql.sedona_sql.expressions.st_functions.{ST_Distance, ST_DistanceSpheroid} +import org.apache.spark.sql.{Column, DataFrame, SparkSession} +import org.graphframes.GraphFrame + +object DBSCAN { + + private val ID_COLUMN = "__id" + + /** + * Annotates a dataframe with a cluster label for each data record using the DBSCAN algorithm. + * The dataframe should contain at least one GeometryType column. Rows must be unique. If one + * geometry column is present it will be used automatically. If two are present, the one named + * 'geometry' will be used. If more than one are present and neither is named 'geometry', the + * column name must be provided. The new column will be named 'cluster'. + * + * @param dataframe + * apache sedona idDataframe containing the point geometries + * @param epsilon + * minimum distance parameter of DBSCAN algorithm + * @param min_pts + * minimum number of points parameter of DBSCAN algorithm + * @param geometry + * name of the geometry column + * @param includeOutliers + * whether to include outliers in the output. Default is false + * @param useSpheroid + * whether to use a cartesian or spheroidal distance calculation. Default is false + * @return + * The input DataFrame with the cluster label added to each row. Outlier will have a cluster + * value of -1 if included. + */ + def dbscan( + dataframe: DataFrame, + epsilon: Double, + min_pts: Int, + geometry: String = null, + includeOutliers: Boolean = true, + useSpheroid: Boolean = false): DataFrame = { + + // We want to disable broadcast joins because the broadcast reference were using too much driver memory + val spark = SparkSession.getActiveSession.get + + val geometryCol = geometry match { + case null => getGeometryColumnName(dataframe) + case _ => geometry + } + validateInputs(dataframe, epsilon, min_pts, geometryCol) + + val distanceFunction: (Column, Column) => Column = + if (useSpheroid) ST_DistanceSpheroid else ST_Distance + + val hasIdColumn = dataframe.columns.contains("id") + val idDataframe = if (hasIdColumn) { + dataframe + .withColumnRenamed("id", ID_COLUMN) + .withColumn("id", sha2(to_json(struct("*")), 256)) + } else { + dataframe.withColumn("id", sha2(to_json(struct("*")), 256)) + } + + val isCorePointsDF = idDataframe + .alias("left") + .join( + idDataframe.alias("right"), + distanceFunction(col(s"left.$geometryCol"), col(s"right.$geometryCol")) <= epsilon) + .groupBy(col(s"left.id")) + .agg( + first(struct("left.*")).alias("leftContents"), + count(col(s"right.id")).alias("neighbors_count"), + collect_list(col(s"right.id")).alias("neighbors")) + .withColumn("isCore", col("neighbors_count") >= lit(min_pts)) + .select("leftContents.*", "neighbors", "isCore") + .checkpoint() + + val corePointsDF = isCorePointsDF.filter(col("isCore")) + val borderPointsDF = isCorePointsDF.filter(!col("isCore")) + + val coreEdgesDf = corePointsDF + .select(col("id").alias("src"), explode(col("neighbors")).alias("dst")) + .alias("left") + .join(corePointsDF.alias("right"), col("left.dst") === col(s"right.id")) + .select(col("left.src"), col(s"right.id").alias("dst")) + + val connectedComponentsDF = GraphFrame(corePointsDF, coreEdgesDf).connectedComponents.run + + val borderComponentsDF = borderPointsDF + .select(struct("*").alias("leftContent"), explode(col("neighbors")).alias("neighbor")) + .join(connectedComponentsDF.alias("right"), col("neighbor") === col(s"right.id")) + .groupBy(col("leftContent.id")) + .agg( + first(col("leftContent")).alias("leftContent"), + min(col(s"right.component")).alias("component")) + .select("leftContent.*", "component") + + val clusteredPointsDf = borderComponentsDF.union(connectedComponentsDF) + + val outliersDf = idDataframe + .join(clusteredPointsDf, Seq("id"), "left_anti") + .withColumn("isCore", lit(false)) + .withColumn("component", lit(-1)) + .withColumn("neighbors", array().cast("array")) + + val completedDf = ( + if (includeOutliers) clusteredPointsDf.unionByName(outliersDf) + else clusteredPointsDf + ).withColumnRenamed("component", "cluster") + + val returnDf = if (hasIdColumn) { + completedDf.drop("neighbors", "id").withColumnRenamed(ID_COLUMN, "id") + } else { + completedDf.drop("neighbors", "id") + } + + returnDf + + } + + private def validateInputs( + geo_df: DataFrame, + epsilon: Double, + min_pts: Int, + geometry: String): Unit = { + require(epsilon > 0, "epsilon must be greater than 0") + require(min_pts > 0, "min_pts must be greater than 0") + require(geo_df.columns.contains(geometry), "geometry column not found in idDataframe") + require( + geo_df.schema.fields(geo_df.schema.fieldIndex(geometry)).dataType == GeometryUDT, + "geometry column must be of type GeometryType") + } +} From 8e89ad6280b33c7e24bce7595017bac033508086 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Mon, 16 Sep 2024 18:39:56 -0700 Subject: [PATCH 02/11] add dbscan python --- python/sedona/stats/__init__.py | 16 +++++ python/sedona/stats/clustering/__init__.py | 21 +++++++ python/sedona/stats/clustering/dbscan.py | 68 ++++++++++++++++++++++ python/sedona/stats/utils/__init__.py | 33 +++++++++++ 4 files changed, 138 insertions(+) create mode 100644 python/sedona/stats/__init__.py create mode 100644 python/sedona/stats/clustering/__init__.py create mode 100644 python/sedona/stats/clustering/dbscan.py create mode 100644 python/sedona/stats/utils/__init__.py diff --git a/python/sedona/stats/__init__.py b/python/sedona/stats/__init__.py new file mode 100644 index 0000000000..a67d5ea255 --- /dev/null +++ b/python/sedona/stats/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/python/sedona/stats/clustering/__init__.py b/python/sedona/stats/clustering/__init__.py new file mode 100644 index 0000000000..d2399abf8a --- /dev/null +++ b/python/sedona/stats/clustering/__init__.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""The clustering module contains spark based implementations of popular geospatial clustering algorithms. + +These implementations are designed to scale to larger datasets and support various geometric feature types. +""" diff --git a/python/sedona/stats/clustering/dbscan.py b/python/sedona/stats/clustering/dbscan.py new file mode 100644 index 0000000000..bb816e61aa --- /dev/null +++ b/python/sedona/stats/clustering/dbscan.py @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""DBSCAN is a popular clustering algorithm for spatial data. + +It identifies groups of data where enough records are close enough to each other. This implementation leverages spark, +sedona and graphframes to support large scale datasets and various, heterogeneous geometric feature types. +""" +from typing import Optional + +from pyspark.sql import DataFrame, SparkSession + +ID_COLUMN_NAME = "__id" +DEFAULT_MAX_SAMPLE_SIZE = 1000000 # 1 million + + +def dbscan( + dataframe: DataFrame, + epsilon: float, + min_pts: int, + geometry: Optional[str] = None, + include_outliers: bool = True, + use_spheroid=False, +): + """Annotates a dataframe with a cluster label for each data record using the DBSCAN algorithm. + + The dataframe should contain at least one GeometryType column. Rows must be unique. If one geometry column is + present it will be used automatically. If two are present, the one named 'geometry' will be used. If more than one + are present and neither is named 'geometry', the column name must be provided. + + Args: + dataframe: spark dataframe containing the geometries + epsilon: minimum distance parameter of DBSCAN algorithm + min_pts: minimum number of points parameter of DBSCAN algorithm + geometry: name of the geometry column + include_outliers: whether to return outlier points. If True, outliers are returned with a cluster value of -1. + Default is False + use_spheroid: whether to use a cartesian or spheroidal distance calculation. Default is false + + Returns: + A PySpark DataFrame containing the cluster label for each row + """ + sedona = SparkSession.getActiveSession() + + result_df = sedona._jvm.org.apache.sedona.stats.clustering.DBSCAN.dbscan( + dataframe._jdf, + float(epsilon), + min_pts, + geometry, + include_outliers, + use_spheroid, + ) + + return DataFrame(result_df, sedona) diff --git a/python/sedona/stats/utils/__init__.py b/python/sedona/stats/utils/__init__.py new file mode 100644 index 0000000000..995e537b34 --- /dev/null +++ b/python/sedona/stats/utils/__init__.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from pyspark.sql import DataFrame, Column, SparkSession +from sedona.sql.types import GeometryType + + +def get_geometry_column_name(df: DataFrame) -> Column: + geom_fields = [ + field.name for field in df.schema.fields if field.dataType == GeometryType() + ] + if len(geom_fields) > 1: + if "geometry" in geom_fields: + return "geometry" + else: + raise ValueError("Multiple geometry columns found in DataFrame") + if len(geom_fields) == 0: + raise ValueError("No geometry column found in DataFrame") + return geom_fields[0] From 3005f7c2730c40b8d65fbddebdb60188af1dd440 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Mon, 16 Sep 2024 20:24:10 -0700 Subject: [PATCH 03/11] add dbscan tests, pom file changes, pip changes --- pom.xml | 13 ++ python/Pipfile | 2 + python/tests/stats/__init__.py | 0 python/tests/stats/test_dbscan.py | 214 ++++++++++++++++++++++++++++++ python/tests/test_base.py | 2 + spark/common/pom.xml | 5 + 6 files changed, 236 insertions(+) create mode 100644 python/tests/stats/__init__.py create mode 100644 python/tests/stats/test_dbscan.py diff --git a/pom.xml b/pom.xml index f4bca2f27d..d512b2cccc 100644 --- a/pom.xml +++ b/pom.xml @@ -85,6 +85,7 @@ 3.3.0 3.3 2.17.2 + 0.8.3-spark3.4 1.19.0 1.7.36 @@ -394,6 +395,10 @@ true + + Spark Packages + https://repos.spark-packages.org/ + @@ -578,6 +583,8 @@ ${scala.compat.version} ${spark.version} ${scala.version} + ${log4j.version} + ${graphframe.version} @@ -686,6 +693,7 @@ 3.0.3 3.0 2.17.2 + 0.8.1-spark3.0 true @@ -703,6 +711,7 @@ 3.1.2 3.1 2.17.2 + 0.8.2-spark3.1 true @@ -720,6 +729,7 @@ 3.2.0 3.2 2.17.2 + 0.8.2-spark3.2 true @@ -738,6 +748,7 @@ 3.3.0 3.3 2.17.2 + 0.8.3-spark3.4 @@ -752,6 +763,7 @@ 3.4.0 3.4 2.19.0 + 0.8.3-spark3.4 true @@ -768,6 +780,7 @@ 3.5.0 3.5 2.20.0 + 0.8.3-spark3.5 true diff --git a/python/Pipfile b/python/Pipfile index 3440e38da2..6c7e142b38 100644 --- a/python/Pipfile +++ b/python/Pipfile @@ -10,6 +10,8 @@ jupyter="*" mkdocs="*" pytest-cov = "*" +scikit-learn = "*" + [packages] pandas="<=1.5.3" numpy="<2" diff --git a/python/tests/stats/__init__.py b/python/tests/stats/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/tests/stats/test_dbscan.py b/python/tests/stats/test_dbscan.py new file mode 100644 index 0000000000..d08d6f0f0f --- /dev/null +++ b/python/tests/stats/test_dbscan.py @@ -0,0 +1,214 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pyspark.sql.functions as f + +from sedona.sql.st_constructors import ST_MakePoint +from sedona.sql.st_functions import ST_Buffer +from sklearn.cluster import DBSCAN as sklearnDBSCAN +from sedona.stats.clustering.dbscan import dbscan + +from tests.test_base import TestBase + + +class TestDBScan(TestBase): + + def get_data(self): + return [ + {"id": 1, "x": 1.0, "y": 2.0}, + {"id": 2, "x": 3.0, "y": 4.0}, + {"id": 3, "x": 2.5, "y": 4.0}, + {"id": 4, "x": 1.5, "y": 2.5}, + {"id": 5, "x": 3.0, "y": 5.0}, + {"id": 6, "x": 12.8, "y": 4.5}, + {"id": 7, "x": 2.5, "y": 4.5}, + {"id": 8, "x": 1.2, "y": 2.5}, + {"id": 9, "x": 1.0, "y": 3.0}, + {"id": 10, "x": 1.0, "y": 5.0}, + {"id": 11, "x": 1.0, "y": 2.5}, + {"id": 12, "x": 5.0, "y": 6.0}, + {"id": 13, "x": 4.0, "y": 3.0}, + ] + + def create_sample_dataframe(self): + return self.spark.createDataFrame(self.get_data()).select( + ST_MakePoint("x", "y").alias("arealandmark"), "id" + ).repartition(9) + + def get_expected_result(self, input_data, epsilon, min_pts, include_outliers=True): + labels = ( + sklearnDBSCAN(eps=epsilon, min_samples=min_pts) + .fit([[datum["x"], datum["y"]] for datum in input_data]) + .labels_ + ) + expected = [(x[0] + 1, x[1]) for x in list(enumerate(labels))] + clusters = [x for x in set(labels) if (x != -1 or include_outliers)] + cluster_members = { + frozenset([y[0] for y in expected if y[1] == x]) for x in clusters + } + return cluster_members + + def get_actual_results( + self, + input_data, + epsilon, + min_pts, + geometry=None, + id=None, + include_outliers=True, + ): + result = dbscan( + input_data, epsilon, min_pts, geometry, include_outliers=include_outliers + ) + id = id or "id" + clusters_members = [ + (x[id], x.cluster) + for x in result.collect() + if x.cluster != -1 or include_outliers + ] + + clusters = { + frozenset([y[0] for y in clusters_members if y[1] == x]) + for x in set([y[1] for y in clusters_members]) + } + + return clusters + + def test_dbscan_valid_parameters(self): + # repeated broadcast joins with this small data size use a lot of RAM on broadcast references + prior_join_threshold = self.spark.conf.get("sedona.join.autoBroadcastJoinThreshold", None) + self.spark.conf.set( + "sedona.join.autoBroadcastJoinThreshold", -1 + ) + df = self.create_sample_dataframe() + for epsilon in [0.6, 0.7, 0.8]: + for min_pts in [3, 4, 5]: + assert self.get_expected_result( + self.get_data(), epsilon, min_pts + ) == self.get_actual_results(df, epsilon, min_pts) + + if prior_join_threshold is None: + self.spark.conf.unset("sedona.join.autoBroadcastJoinThreshold") + else: + self.spark.conf.set("sedona.join.autoBroadcastJoinThreshold", prior_join_threshold) + + def test_dbscan_valid_parameters_default_column_name(self): + df = self.create_sample_dataframe().select( + "id", f.col("arealandmark").alias("geometryFieldName") + ) + epsilon = 0.6 + min_pts = 4 + + assert self.get_expected_result( + self.get_data(), epsilon, min_pts + ) == self.get_actual_results(df, epsilon, min_pts) + + def test_dbscan_valid_parameters_polygons(self): + df = self.create_sample_dataframe().select( + "id", ST_Buffer(f.col("arealandmark"), 0.000001).alias("geometryFieldName") + ) + epsilon = 0.6 + min_pts = 4 + + assert self.get_expected_result( + self.get_data(), epsilon, min_pts + ) == self.get_actual_results(df, epsilon, min_pts) + + def test_dbscan_supports_other_distance_function(self): + df = self.create_sample_dataframe().select( + "id", ST_Buffer(f.col("arealandmark"), 0.000001).alias("geometryFieldName") + ) + epsilon = 0.6 + min_pts = 4 + + dbscan( + df, + epsilon, + min_pts, + "geometryFieldName", + use_spheroid=True, + ) + + def test_dbscan_invalid_epsilon(self): + df = self.create_sample_dataframe() + + try: + dbscan(df, -0.1, 5, "arealandmark") + assert False + except Exception: + assert True + + def test_dbscan_invalid_min_pts(self): + df = self.create_sample_dataframe() + + try: + dbscan(df, 0.1, -5, "arealandmark") + assert False + except Exception: + assert True + + def test_dbscan_invalid_geometry_column(self): + df = self.create_sample_dataframe() + + try: + dbscan(df, 0.1, 5, "invalid_column") + assert False + except Exception: + assert True + + def test_return_empty_df_when_no_clusters(self): + df = self.create_sample_dataframe() + epsilon = 0.1 + min_pts = 10000 + + assert dbscan(df, epsilon, min_pts, "arealandmark", include_outliers = False).count() == 0 + # picked some coefficient we know yields clusters and thus hit the happy case + assert ( + dbscan(df, epsilon, min_pts, "arealandmark", include_outliers = False).schema + == dbscan(df, 0.6, 3, "arealandmark").schema + ) + + def test_dbscan_doesnt_duplicate_border_points_in_two_clusters(self): + input_df = self.spark.createDataFrame( + [ + {"id": 10, "x": 1.0, "y": 1.8}, + {"id": 11, "x": 1.0, "y": 1.9}, + {"id": 12, "x": 1.0, "y": 2.0}, + {"id": 13, "x": 1.0, "y": 2.1}, + {"id": 14, "x": 2.0, "y": 2.0}, + {"id": 15, "x": 3.0, "y": 1.9}, + {"id": 16, "x": 3.0, "y": 2.0}, + {"id": 17, "x": 3.0, "y": 2.1}, + {"id": 18, "x": 3.0, "y": 2.2}, + ] + ).select(ST_MakePoint("x", "y").alias("geometry"), "id") + + # make sure no id occurs more than once + output_df = dbscan(input_df, 1.0, 4) + + assert output_df.count() == 9 + assert output_df.select("cluster").distinct().count() == 2 + + def test_return_outliers_false_doesnt_return_outliers(self): + df = self.create_sample_dataframe() + for epsilon in [0.6, 0.7, 0.8]: + for min_pts in [3, 4, 5]: + assert self.get_expected_result( + self.get_data(), epsilon, min_pts, include_outliers=False + ) == self.get_actual_results( + df, epsilon, min_pts, include_outliers=False + ) diff --git a/python/tests/test_base.py b/python/tests/test_base.py index fa63fea750..088ee5124e 100644 --- a/python/tests/test_base.py +++ b/python/tests/test_base.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +from tempfile import mkdtemp from sedona.spark import * from sedona.utils.decorators import classproperty @@ -25,6 +26,7 @@ class TestBase: def spark(self): if not hasattr(self, "__spark"): spark = SedonaContext.create(SedonaContext.builder().master("local[*]").getOrCreate()) + spark.sparkContext.setCheckpointDir(mkdtemp()) setattr(self, "__spark", spark) return getattr(self, "__spark") diff --git a/spark/common/pom.xml b/spark/common/pom.xml index 948a35efe9..774b85c24f 100644 --- a/spark/common/pom.xml +++ b/spark/common/pom.xml @@ -157,6 +157,11 @@ + + graphframes + graphframes + ${graphframe.version}-s_${scala.compat.version} + org.scala-lang scala-library From 5f176ea041a127391898d3aad299417dde687de8 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Tue, 17 Sep 2024 11:11:28 -0700 Subject: [PATCH 04/11] disable broadcast joins for all dbscan tests --- python/tests/stats/test_dbscan.py | 30 ++++++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/python/tests/stats/test_dbscan.py b/python/tests/stats/test_dbscan.py index d08d6f0f0f..70f236291d 100644 --- a/python/tests/stats/test_dbscan.py +++ b/python/tests/stats/test_dbscan.py @@ -90,7 +90,6 @@ def get_actual_results( def test_dbscan_valid_parameters(self): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references - prior_join_threshold = self.spark.conf.get("sedona.join.autoBroadcastJoinThreshold", None) self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) @@ -101,12 +100,11 @@ def test_dbscan_valid_parameters(self): self.get_data(), epsilon, min_pts ) == self.get_actual_results(df, epsilon, min_pts) - if prior_join_threshold is None: - self.spark.conf.unset("sedona.join.autoBroadcastJoinThreshold") - else: - self.spark.conf.set("sedona.join.autoBroadcastJoinThreshold", prior_join_threshold) - def test_dbscan_valid_parameters_default_column_name(self): + # repeated broadcast joins with this small data size use a lot of RAM on broadcast references + self.spark.conf.set( + "sedona.join.autoBroadcastJoinThreshold", -1 + ) df = self.create_sample_dataframe().select( "id", f.col("arealandmark").alias("geometryFieldName") ) @@ -118,6 +116,10 @@ def test_dbscan_valid_parameters_default_column_name(self): ) == self.get_actual_results(df, epsilon, min_pts) def test_dbscan_valid_parameters_polygons(self): + # repeated broadcast joins with this small data size use a lot of RAM on broadcast references + self.spark.conf.set( + "sedona.join.autoBroadcastJoinThreshold", -1 + ) df = self.create_sample_dataframe().select( "id", ST_Buffer(f.col("arealandmark"), 0.000001).alias("geometryFieldName") ) @@ -129,6 +131,10 @@ def test_dbscan_valid_parameters_polygons(self): ) == self.get_actual_results(df, epsilon, min_pts) def test_dbscan_supports_other_distance_function(self): + # repeated broadcast joins with this small data size use a lot of RAM on broadcast references + self.spark.conf.set( + "sedona.join.autoBroadcastJoinThreshold", -1 + ) df = self.create_sample_dataframe().select( "id", ST_Buffer(f.col("arealandmark"), 0.000001).alias("geometryFieldName") ) @@ -171,6 +177,10 @@ def test_dbscan_invalid_geometry_column(self): assert True def test_return_empty_df_when_no_clusters(self): + # repeated broadcast joins with this small data size use a lot of RAM on broadcast references + self.spark.conf.set( + "sedona.join.autoBroadcastJoinThreshold", -1 + ) df = self.create_sample_dataframe() epsilon = 0.1 min_pts = 10000 @@ -183,6 +193,10 @@ def test_return_empty_df_when_no_clusters(self): ) def test_dbscan_doesnt_duplicate_border_points_in_two_clusters(self): + # repeated broadcast joins with this small data size use a lot of RAM on broadcast references + self.spark.conf.set( + "sedona.join.autoBroadcastJoinThreshold", -1 + ) input_df = self.spark.createDataFrame( [ {"id": 10, "x": 1.0, "y": 1.8}, @@ -204,6 +218,10 @@ def test_dbscan_doesnt_duplicate_border_points_in_two_clusters(self): assert output_df.select("cluster").distinct().count() == 2 def test_return_outliers_false_doesnt_return_outliers(self): + # repeated broadcast joins with this small data size use a lot of RAM on broadcast references + self.spark.conf.set( + "sedona.join.autoBroadcastJoinThreshold", -1 + ) df = self.create_sample_dataframe() for epsilon in [0.6, 0.7, 0.8]: for min_pts in [3, 4, 5]: From 8aeaee19cc9f5ee33fb3cf815f667b0c164b74d6 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Thu, 19 Sep 2024 21:55:06 -0700 Subject: [PATCH 05/11] disable non-sedona broadcast joins for all dbscan tests --- python/tests/stats/test_dbscan.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/python/tests/stats/test_dbscan.py b/python/tests/stats/test_dbscan.py index 70f236291d..f387ee8a9c 100644 --- a/python/tests/stats/test_dbscan.py +++ b/python/tests/stats/test_dbscan.py @@ -93,6 +93,8 @@ def test_dbscan_valid_parameters(self): self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) + self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + df = self.create_sample_dataframe() for epsilon in [0.6, 0.7, 0.8]: for min_pts in [3, 4, 5]: @@ -105,6 +107,8 @@ def test_dbscan_valid_parameters_default_column_name(self): self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) + self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + df = self.create_sample_dataframe().select( "id", f.col("arealandmark").alias("geometryFieldName") ) @@ -120,6 +124,8 @@ def test_dbscan_valid_parameters_polygons(self): self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) + self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + df = self.create_sample_dataframe().select( "id", ST_Buffer(f.col("arealandmark"), 0.000001).alias("geometryFieldName") ) @@ -135,6 +141,8 @@ def test_dbscan_supports_other_distance_function(self): self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) + self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + df = self.create_sample_dataframe().select( "id", ST_Buffer(f.col("arealandmark"), 0.000001).alias("geometryFieldName") ) @@ -181,6 +189,8 @@ def test_return_empty_df_when_no_clusters(self): self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) + self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + df = self.create_sample_dataframe() epsilon = 0.1 min_pts = 10000 @@ -197,6 +207,8 @@ def test_dbscan_doesnt_duplicate_border_points_in_two_clusters(self): self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) + self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + input_df = self.spark.createDataFrame( [ {"id": 10, "x": 1.0, "y": 1.8}, @@ -222,6 +234,8 @@ def test_return_outliers_false_doesnt_return_outliers(self): self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) + self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) + df = self.create_sample_dataframe() for epsilon in [0.6, 0.7, 0.8]: for min_pts in [3, 4, 5]: From 8e540156135f3afc929ef8ffdb75703cf7367cec Mon Sep 17 00:00:00 2001 From: jameswillis Date: Mon, 23 Sep 2024 17:57:49 -0700 Subject: [PATCH 06/11] unpersist dbscan result assuming that graphframes PR will eventually be merged. --- python/tests/stats/test_dbscan.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/python/tests/stats/test_dbscan.py b/python/tests/stats/test_dbscan.py index f387ee8a9c..e1d1ea07d7 100644 --- a/python/tests/stats/test_dbscan.py +++ b/python/tests/stats/test_dbscan.py @@ -81,6 +81,8 @@ def get_actual_results( if x.cluster != -1 or include_outliers ] + result.unpersist() + clusters = { frozenset([y[0] for y in clusters_members if y[1] == x]) for x in set([y[1] for y in clusters_members]) @@ -237,10 +239,14 @@ def test_return_outliers_false_doesnt_return_outliers(self): self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df = self.create_sample_dataframe() - for epsilon in [0.6, 0.7, 0.8]: - for min_pts in [3, 4, 5]: + for epsilon in [0.6]: + for min_pts in [3]: assert self.get_expected_result( self.get_data(), epsilon, min_pts, include_outliers=False ) == self.get_actual_results( df, epsilon, min_pts, include_outliers=False ) + + + while True: + pass # TODO delete From b0ebf246f91adef534c243e10a4efd86daee6c22 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Mon, 30 Sep 2024 17:50:46 -0700 Subject: [PATCH 07/11] =?UTF-8?q?revisions=20from=20Pawe=C5=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- python/sedona/stats/utils/__init__.py | 17 +-- python/tests/stats/test_dbscan.py | 100 ++++++++---------- .../scala/org/apache/sedona/stats/Util.scala | 22 ++-- 3 files changed, 64 insertions(+), 75 deletions(-) diff --git a/python/sedona/stats/utils/__init__.py b/python/sedona/stats/utils/__init__.py index 995e537b34..5556ff653d 100644 --- a/python/sedona/stats/utils/__init__.py +++ b/python/sedona/stats/utils/__init__.py @@ -23,11 +23,14 @@ def get_geometry_column_name(df: DataFrame) -> Column: geom_fields = [ field.name for field in df.schema.fields if field.dataType == GeometryType() ] - if len(geom_fields) > 1: - if "geometry" in geom_fields: - return "geometry" - else: - raise ValueError("Multiple geometry columns found in DataFrame") + if len(geom_fields) == 0: - raise ValueError("No geometry column found in DataFrame") - return geom_fields[0] + raise ValueError("No GeometryType column found. Provide a dataframe containing a geometry column.") + + if len(geom_fields) == 1: + return geom_fields[0] + + if len(geom_fields) > 1 and "geometry" not in geom_fields: + raise ValueError("Multiple GeometryType columns found. Provide the column name as an argument.") + + return "geometry" diff --git a/python/tests/stats/test_dbscan.py b/python/tests/stats/test_dbscan.py index e1d1ea07d7..ae911ba50b 100644 --- a/python/tests/stats/test_dbscan.py +++ b/python/tests/stats/test_dbscan.py @@ -16,7 +16,9 @@ # under the License. import pyspark.sql.functions as f +import pytest +from itertools import product from sedona.sql.st_constructors import ST_MakePoint from sedona.sql.st_functions import ST_Buffer from sklearn.cluster import DBSCAN as sklearnDBSCAN @@ -27,7 +29,8 @@ class TestDBScan(TestBase): - def get_data(self): + @pytest.fixture + def sample_data(self): return [ {"id": 1, "x": 1.0, "y": 2.0}, {"id": 2, "x": 3.0, "y": 4.0}, @@ -44,8 +47,9 @@ def get_data(self): {"id": 13, "x": 4.0, "y": 3.0}, ] - def create_sample_dataframe(self): - return self.spark.createDataFrame(self.get_data()).select( + @pytest.fixture + def sample_dataframe(self, sample_data): + return self.spark.createDataFrame(sample_data).select( ST_MakePoint("x", "y").alias("arealandmark"), "id" ).repartition(9) @@ -90,62 +94,61 @@ def get_actual_results( return clusters - def test_dbscan_valid_parameters(self): + @pytest.mark.parametrize("epsilon", [0.6, 0.7, 0.8]) + @pytest.mark.parametrize("min_pts", [3, 4, 5]) + def test_dbscan_valid_parameters(self, sample_data, sample_dataframe, epsilon, min_pts): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) - df = self.create_sample_dataframe() - for epsilon in [0.6, 0.7, 0.8]: - for min_pts in [3, 4, 5]: - assert self.get_expected_result( - self.get_data(), epsilon, min_pts - ) == self.get_actual_results(df, epsilon, min_pts) + assert self.get_expected_result( + sample_data, epsilon, min_pts + ) == self.get_actual_results(sample_dataframe, epsilon, min_pts) - def test_dbscan_valid_parameters_default_column_name(self): + def test_dbscan_valid_parameters_default_column_name(self, sample_data, sample_dataframe): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) - df = self.create_sample_dataframe().select( + df = sample_dataframe.select( "id", f.col("arealandmark").alias("geometryFieldName") ) epsilon = 0.6 min_pts = 4 assert self.get_expected_result( - self.get_data(), epsilon, min_pts + sample_data, epsilon, min_pts ) == self.get_actual_results(df, epsilon, min_pts) - def test_dbscan_valid_parameters_polygons(self): + def test_dbscan_valid_parameters_polygons(self, sample_data, sample_dataframe): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) - df = self.create_sample_dataframe().select( + df = sample_dataframe.select( "id", ST_Buffer(f.col("arealandmark"), 0.000001).alias("geometryFieldName") ) epsilon = 0.6 min_pts = 4 assert self.get_expected_result( - self.get_data(), epsilon, min_pts + sample_data, epsilon, min_pts ) == self.get_actual_results(df, epsilon, min_pts) - def test_dbscan_supports_other_distance_function(self): + def test_dbscan_supports_other_distance_function(self, sample_dataframe): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) - df = self.create_sample_dataframe().select( + df = sample_dataframe.select( "id", ST_Buffer(f.col("arealandmark"), 0.000001).alias("geometryFieldName") ) epsilon = 0.6 @@ -159,49 +162,34 @@ def test_dbscan_supports_other_distance_function(self): use_spheroid=True, ) - def test_dbscan_invalid_epsilon(self): - df = self.create_sample_dataframe() - - try: - dbscan(df, -0.1, 5, "arealandmark") - assert False - except Exception: - assert True + def test_dbscan_invalid_epsilon(self, sample_dataframe): + with pytest.raises(Exception): + dbscan(sample_dataframe, -0.1, 5, "arealandmark") - def test_dbscan_invalid_min_pts(self): - df = self.create_sample_dataframe() + def test_dbscan_invalid_min_pts(self, sample_dataframe): + with pytest.raises(Exception): + dbscan(sample_dataframe, 0.1, -5, "arealandmark") - try: - dbscan(df, 0.1, -5, "arealandmark") - assert False - except Exception: - assert True + def test_dbscan_invalid_geometry_column(self, sample_dataframe): + with pytest.raises(Exception): + dbscan(sample_dataframe, 0.1, 5, "invalid_column") - def test_dbscan_invalid_geometry_column(self): - df = self.create_sample_dataframe() - try: - dbscan(df, 0.1, 5, "invalid_column") - assert False - except Exception: - assert True - - def test_return_empty_df_when_no_clusters(self): + def test_return_empty_df_when_no_clusters(self, sample_data, sample_dataframe): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) - df = self.create_sample_dataframe() epsilon = 0.1 min_pts = 10000 - assert dbscan(df, epsilon, min_pts, "arealandmark", include_outliers = False).count() == 0 + assert dbscan(sample_dataframe, epsilon, min_pts, "arealandmark", include_outliers = False).count() == 0 # picked some coefficient we know yields clusters and thus hit the happy case assert ( - dbscan(df, epsilon, min_pts, "arealandmark", include_outliers = False).schema - == dbscan(df, 0.6, 3, "arealandmark").schema + dbscan(sample_dataframe, epsilon, min_pts, "arealandmark", include_outliers = False).schema + == dbscan(sample_dataframe, 0.6, 3, "arealandmark").schema ) def test_dbscan_doesnt_duplicate_border_points_in_two_clusters(self): @@ -231,22 +219,18 @@ def test_dbscan_doesnt_duplicate_border_points_in_two_clusters(self): assert output_df.count() == 9 assert output_df.select("cluster").distinct().count() == 2 - def test_return_outliers_false_doesnt_return_outliers(self): + def test_return_outliers_false_doesnt_return_outliers(self, sample_data, sample_dataframe): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references self.spark.conf.set( "sedona.join.autoBroadcastJoinThreshold", -1 ) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) - df = self.create_sample_dataframe() - for epsilon in [0.6]: - for min_pts in [3]: - assert self.get_expected_result( - self.get_data(), epsilon, min_pts, include_outliers=False - ) == self.get_actual_results( - df, epsilon, min_pts, include_outliers=False - ) - + epsilon = 0.6 + min_pts = 3 - while True: - pass # TODO delete + assert self.get_expected_result( + sample_data, epsilon, min_pts, include_outliers=False + ) == self.get_actual_results( + sample_dataframe, epsilon, min_pts, include_outliers=False + ) diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala b/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala index 408a4e1272..f9bfe9bc5c 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala @@ -24,18 +24,20 @@ import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT private[stats] object Util { def getGeometryColumnName(dataframe: DataFrame): String = { val geomFields = dataframe.schema.fields.filter(_.dataType == GeometryUDT) - if (geomFields.length > 1) { - if (geomFields.exists(_.name == "geometry")) { - "geometry" - } else { - throw new IllegalArgumentException( - "Multiple GeometryType columns found. Provide the column name as an argument.") - } - } else if (geomFields.length == 1) { + + if (geomFields.isEmpty) { + throw new IllegalArgumentException("No GeometryType column found. Provide a dataframe containing a geometry column.") + } + + if (geomFields.length == 1) { geomFields.head.name - } else { + } + + if (geomFields.length > 1 && !geomFields.exists(_.name == "geometry")) { throw new IllegalArgumentException( - "No GeometryType column found. Provide a dataframe containing a geometry column.") + "Multiple GeometryType columns found. Provide the column name as an argument.") } + + "geometry" } } From 6efed7e8f4c9365a3553673b8d6576a9c40516f2 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Mon, 30 Sep 2024 18:25:08 -0700 Subject: [PATCH 08/11] add documentation --- docs/api/stats/sql.md | 23 ++++++++ docs/tutorial/sql.md | 52 +++++++++++++++++++ python/tests/stats/test_dbscan.py | 5 ++ .../sedona/stats/clustering/DBSCAN.scala | 6 +-- 4 files changed, 83 insertions(+), 3 deletions(-) create mode 100644 docs/api/stats/sql.md diff --git a/docs/api/stats/sql.md b/docs/api/stats/sql.md new file mode 100644 index 0000000000..3355a28482 --- /dev/null +++ b/docs/api/stats/sql.md @@ -0,0 +1,23 @@ +## Overview +The stats module of Sedona implements Scala and Python functions that can be called on dataframes with spatial columns to perform geospatial statistical analysis. The stats module is built on top of the core module and provides a set of functions that can be used to perform spatial analysis on dataframes. The stats module is designed to be used with the core module and the viz module to provide a complete set of geospatial analysis tools. + +## Using DBSCAN +The DBSCAN function is provided at `org.apache.sedona.stats.DBSCAN.dbscan` in scala/java and `sedona.stats.dbscan.dbscan` in python. + +The function annotates a dataframe with a cluster label for each data record using the DBSCAN algorithm. +The dataframe should contain at least one GeometryType column. Rows must be unique. If one +geometry column is present it will be used automatically. If two are present, the one named +'geometry' will be used. If more than one are present and none are named 'geometry', the +column name must be provided. The new column will be named 'cluster'. + +#### Parameters +names in parentheses are python variable names +- dataframe - dataframe to cluster. Must contain at least one GeometryType column +- epsilon - minimum distance parameter of DBSCAN algorithm +- minPts (min_pts) - minimum number of points parameter of DBSCAN algorithm +- geometry - name of the geometry column +- includeOutliers (include_outliers) - whether to include outliers in the output. Default is false +- useSpheroid (use_spheroid) - whether to use a cartesian or spheroidal distance calculation. Default is false + + +The output is the input DataFrame with the cluster label added to each row. Outlier will have a cluster value of -1 if included. diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md index 0bb5979fe9..4a74baab89 100644 --- a/docs/tutorial/sql.md +++ b/docs/tutorial/sql.md @@ -739,6 +739,58 @@ The coordinates of polygons have been changed. The output will be like this: ``` +## Cluster with DBSCAN +Sedona provides an implementation of the [DBSCAN](https://en.wikipedia.org/wiki/Dbscan) algorithm to cluster spatial data. + +The algorithm is available as a Scala and Python function called on a spatial dataframe. The returned dataframe has an additional column added containing the unique identifier of the cluster that record is a member of and a boolean column indicating if the record is a core point. + +The first parameter is the dataframe, the next two are the epsilon and min_points parameters of the DBSCAN algorithm. + +=== "Scala" + + ```scala + import org.apache.sedona.stats.DBSCAN.dbscan + + dbscan(df, 0.1, 5).show() + ``` + +=== "Java" + + ```java + import org.apache.sedona.stats.DBSCAN; + + DBSCAN.dbscan(df, 0.1, 5).show(); + ``` + +=== "Python" + + ```python + from sedona.stats.dbscan import dbscan + + dbscan(df, 0.1, 5).show() + ``` + +The output will look like this: +``` ++----------------+---+------+-------+ +| geometry| id|isCore|cluster| ++----------------+---+------+-------+ +| POINT (2.5 4)| 3| false| 1| +| POINT (3 4)| 2| false| 1| +| POINT (3 5)| 5| false| 1| +| POINT (1 3)| 9| true| 0| +| POINT (2.5 4.5)| 7| true| 1| +| POINT (1 2)| 1| true| 0| +| POINT (1.5 2.5)| 4| true| 0| +| POINT (1.2 2.5)| 8| true| 0| +| POINT (1 2.5)| 11| true| 0| +| POINT (1 5)| 10| false| -1| +| POINT (5 6)| 12| false| -1| +|POINT (12.8 4.5)| 6| false| -1| +| POINT (4 3)| 13| false| -1| ++----------------+---+------+-------+ +``` + ## Run spatial queries After creating a Geometry type column, you are able to run spatial queries. diff --git a/python/tests/stats/test_dbscan.py b/python/tests/stats/test_dbscan.py index ae911ba50b..e63b702409 100644 --- a/python/tests/stats/test_dbscan.py +++ b/python/tests/stats/test_dbscan.py @@ -78,6 +78,9 @@ def get_actual_results( result = dbscan( input_data, epsilon, min_pts, geometry, include_outliers=include_outliers ) + + result.show() + id = id or "id" clusters_members = [ (x[id], x.cluster) @@ -107,6 +110,8 @@ def test_dbscan_valid_parameters(self, sample_data, sample_dataframe, epsilon, m sample_data, epsilon, min_pts ) == self.get_actual_results(sample_dataframe, epsilon, min_pts) + assert False + def test_dbscan_valid_parameters_default_column_name(self, sample_data, sample_dataframe): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references self.spark.conf.set( diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala b/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala index 9f3666cf4e..e06ee5397e 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala @@ -37,10 +37,10 @@ object DBSCAN { * column name must be provided. The new column will be named 'cluster'. * * @param dataframe - * apache sedona idDataframe containing the point geometries + * dataframe to cluster. Must contain at least one GeometryType column * @param epsilon * minimum distance parameter of DBSCAN algorithm - * @param min_pts + * @param minPts * minimum number of points parameter of DBSCAN algorithm * @param geometry * name of the geometry column @@ -55,7 +55,7 @@ object DBSCAN { def dbscan( dataframe: DataFrame, epsilon: Double, - min_pts: Int, + minPts: Int, geometry: String = null, includeOutliers: Boolean = true, useSpheroid: Boolean = false): DataFrame = { From 8788144683f7c559930d058173c80dbd21e9d8e7 Mon Sep 17 00:00:00 2001 From: James Willis Date: Tue, 1 Oct 2024 11:08:11 -0700 Subject: [PATCH 09/11] styling in docs Co-authored-by: Kelly-Ann Dolor --- docs/api/stats/sql.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/api/stats/sql.md b/docs/api/stats/sql.md index 3355a28482..c54e623bd2 100644 --- a/docs/api/stats/sql.md +++ b/docs/api/stats/sql.md @@ -5,7 +5,7 @@ The stats module of Sedona implements Scala and Python functions that can be cal The DBSCAN function is provided at `org.apache.sedona.stats.DBSCAN.dbscan` in scala/java and `sedona.stats.dbscan.dbscan` in python. The function annotates a dataframe with a cluster label for each data record using the DBSCAN algorithm. -The dataframe should contain at least one GeometryType column. Rows must be unique. If one +The dataframe should contain at least one `GeometryType` column. Rows must be unique. If one geometry column is present it will be used automatically. If two are present, the one named 'geometry' will be used. If more than one are present and none are named 'geometry', the column name must be provided. The new column will be named 'cluster'. From 5796b69aae92bc8d49fd1a1a95ee3e463b16cec7 Mon Sep 17 00:00:00 2001 From: James Willis Date: Tue, 1 Oct 2024 11:08:30 -0700 Subject: [PATCH 10/11] reword stats documentation Co-authored-by: Kelly-Ann Dolor --- docs/api/stats/sql.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/docs/api/stats/sql.md b/docs/api/stats/sql.md index c54e623bd2..8b2841e96a 100644 --- a/docs/api/stats/sql.md +++ b/docs/api/stats/sql.md @@ -1,5 +1,10 @@ ## Overview -The stats module of Sedona implements Scala and Python functions that can be called on dataframes with spatial columns to perform geospatial statistical analysis. The stats module is built on top of the core module and provides a set of functions that can be used to perform spatial analysis on dataframes. The stats module is designed to be used with the core module and the viz module to provide a complete set of geospatial analysis tools. +Sedona's stats module provides Scala and Python functions for conducting geospatial +statistical analysis on dataframes with spatial columns. +The stats module is built on top of the core module and provides a set of functions +that can be used to perform spatial analysis on these dataframes. The stats module +is designed to be used with the core module and the viz module to provide a +complete set of geospatial analysis tools. ## Using DBSCAN The DBSCAN function is provided at `org.apache.sedona.stats.DBSCAN.dbscan` in scala/java and `sedona.stats.dbscan.dbscan` in python. From 4203753bc78ef0ccfb34a5f5ada1f1f54d0bd6a9 Mon Sep 17 00:00:00 2001 From: jameswillis Date: Mon, 7 Oct 2024 12:37:43 -0700 Subject: [PATCH 11/11] clean up --- docs/api/stats/sql.md | 9 +- docs/tutorial/sql.md | 2 + mkdocs.yml | 2 + python/sedona/stats/utils/__init__.py | 36 -------- python/tests/stats/test_dbscan.py | 86 ++++++++++--------- .../scala/org/apache/sedona/stats/Util.scala | 14 ++- .../sedona/stats/clustering/DBSCAN.scala | 11 +-- 7 files changed, 68 insertions(+), 92 deletions(-) delete mode 100644 python/sedona/stats/utils/__init__.py diff --git a/docs/api/stats/sql.md b/docs/api/stats/sql.md index 8b2841e96a..fe7c0e90ef 100644 --- a/docs/api/stats/sql.md +++ b/docs/api/stats/sql.md @@ -1,12 +1,14 @@ ## Overview + Sedona's stats module provides Scala and Python functions for conducting geospatial -statistical analysis on dataframes with spatial columns. +statistical analysis on dataframes with spatial columns. The stats module is built on top of the core module and provides a set of functions that can be used to perform spatial analysis on these dataframes. The stats module is designed to be used with the core module and the viz module to provide a complete set of geospatial analysis tools. ## Using DBSCAN + The DBSCAN function is provided at `org.apache.sedona.stats.DBSCAN.dbscan` in scala/java and `sedona.stats.dbscan.dbscan` in python. The function annotates a dataframe with a cluster label for each data record using the DBSCAN algorithm. @@ -15,8 +17,10 @@ geometry column is present it will be used automatically. If two are present, th 'geometry' will be used. If more than one are present and none are named 'geometry', the column name must be provided. The new column will be named 'cluster'. -#### Parameters +### Parameters + names in parentheses are python variable names + - dataframe - dataframe to cluster. Must contain at least one GeometryType column - epsilon - minimum distance parameter of DBSCAN algorithm - minPts (min_pts) - minimum number of points parameter of DBSCAN algorithm @@ -24,5 +28,4 @@ names in parentheses are python variable names - includeOutliers (include_outliers) - whether to include outliers in the output. Default is false - useSpheroid (use_spheroid) - whether to use a cartesian or spheroidal distance calculation. Default is false - The output is the input DataFrame with the cluster label added to each row. Outlier will have a cluster value of -1 if included. diff --git a/docs/tutorial/sql.md b/docs/tutorial/sql.md index 8cdb44bc51..ae71b10e9c 100644 --- a/docs/tutorial/sql.md +++ b/docs/tutorial/sql.md @@ -740,6 +740,7 @@ The coordinates of polygons have been changed. The output will be like this: ``` ## Cluster with DBSCAN + Sedona provides an implementation of the [DBSCAN](https://en.wikipedia.org/wiki/Dbscan) algorithm to cluster spatial data. The algorithm is available as a Scala and Python function called on a spatial dataframe. The returned dataframe has an additional column added containing the unique identifier of the cluster that record is a member of and a boolean column indicating if the record is a core point. @@ -771,6 +772,7 @@ The first parameter is the dataframe, the next two are the epsilon and min_point ``` The output will look like this: + ``` +----------------+---+------+-------+ | geometry| id|isCore|cluster| diff --git a/mkdocs.yml b/mkdocs.yml index 149742f6ca..68ce12073c 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -83,6 +83,8 @@ nav: - Parameter: api/sql/Parameter.md - RDD (core): - Scala/Java doc: api/java-api.md + - Stats: + - DataFrame: api/stats/sql.md - Viz: - DataFrame/SQL: api/viz/sql.md - RDD: api/viz/java-api.md diff --git a/python/sedona/stats/utils/__init__.py b/python/sedona/stats/utils/__init__.py deleted file mode 100644 index 5556ff653d..0000000000 --- a/python/sedona/stats/utils/__init__.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from pyspark.sql import DataFrame, Column, SparkSession -from sedona.sql.types import GeometryType - - -def get_geometry_column_name(df: DataFrame) -> Column: - geom_fields = [ - field.name for field in df.schema.fields if field.dataType == GeometryType() - ] - - if len(geom_fields) == 0: - raise ValueError("No GeometryType column found. Provide a dataframe containing a geometry column.") - - if len(geom_fields) == 1: - return geom_fields[0] - - if len(geom_fields) > 1 and "geometry" not in geom_fields: - raise ValueError("Multiple GeometryType columns found. Provide the column name as an argument.") - - return "geometry" diff --git a/python/tests/stats/test_dbscan.py b/python/tests/stats/test_dbscan.py index e63b702409..60cc8a991e 100644 --- a/python/tests/stats/test_dbscan.py +++ b/python/tests/stats/test_dbscan.py @@ -49,9 +49,11 @@ def sample_data(self): @pytest.fixture def sample_dataframe(self, sample_data): - return self.spark.createDataFrame(sample_data).select( - ST_MakePoint("x", "y").alias("arealandmark"), "id" - ).repartition(9) + return ( + self.spark.createDataFrame(sample_data) + .select(ST_MakePoint("x", "y").alias("arealandmark"), "id") + .repartition(9) + ) def get_expected_result(self, input_data, epsilon, min_pts, include_outliers=True): labels = ( @@ -67,13 +69,13 @@ def get_expected_result(self, input_data, epsilon, min_pts, include_outliers=Tru return cluster_members def get_actual_results( - self, - input_data, - epsilon, - min_pts, - geometry=None, - id=None, - include_outliers=True, + self, + input_data, + epsilon, + min_pts, + geometry=None, + id=None, + include_outliers=True, ): result = dbscan( input_data, epsilon, min_pts, geometry, include_outliers=include_outliers @@ -99,24 +101,22 @@ def get_actual_results( @pytest.mark.parametrize("epsilon", [0.6, 0.7, 0.8]) @pytest.mark.parametrize("min_pts", [3, 4, 5]) - def test_dbscan_valid_parameters(self, sample_data, sample_dataframe, epsilon, min_pts): + def test_dbscan_valid_parameters( + self, sample_data, sample_dataframe, epsilon, min_pts + ): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references - self.spark.conf.set( - "sedona.join.autoBroadcastJoinThreshold", -1 - ) + self.spark.conf.set("sedona.join.autoBroadcastJoinThreshold", -1) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) assert self.get_expected_result( sample_data, epsilon, min_pts ) == self.get_actual_results(sample_dataframe, epsilon, min_pts) - assert False - - def test_dbscan_valid_parameters_default_column_name(self, sample_data, sample_dataframe): + def test_dbscan_valid_parameters_default_column_name( + self, sample_data, sample_dataframe + ): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references - self.spark.conf.set( - "sedona.join.autoBroadcastJoinThreshold", -1 - ) + self.spark.conf.set("sedona.join.autoBroadcastJoinThreshold", -1) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df = sample_dataframe.select( @@ -131,9 +131,7 @@ def test_dbscan_valid_parameters_default_column_name(self, sample_data, sample_d def test_dbscan_valid_parameters_polygons(self, sample_data, sample_dataframe): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references - self.spark.conf.set( - "sedona.join.autoBroadcastJoinThreshold", -1 - ) + self.spark.conf.set("sedona.join.autoBroadcastJoinThreshold", -1) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df = sample_dataframe.select( @@ -148,9 +146,7 @@ def test_dbscan_valid_parameters_polygons(self, sample_data, sample_dataframe): def test_dbscan_supports_other_distance_function(self, sample_dataframe): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references - self.spark.conf.set( - "sedona.join.autoBroadcastJoinThreshold", -1 - ) + self.spark.conf.set("sedona.join.autoBroadcastJoinThreshold", -1) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) df = sample_dataframe.select( @@ -179,29 +175,39 @@ def test_dbscan_invalid_geometry_column(self, sample_dataframe): with pytest.raises(Exception): dbscan(sample_dataframe, 0.1, 5, "invalid_column") - def test_return_empty_df_when_no_clusters(self, sample_data, sample_dataframe): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references - self.spark.conf.set( - "sedona.join.autoBroadcastJoinThreshold", -1 - ) + self.spark.conf.set("sedona.join.autoBroadcastJoinThreshold", -1) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) epsilon = 0.1 min_pts = 10000 - assert dbscan(sample_dataframe, epsilon, min_pts, "arealandmark", include_outliers = False).count() == 0 + assert ( + dbscan( + sample_dataframe, + epsilon, + min_pts, + "arealandmark", + include_outliers=False, + ).count() + == 0 + ) # picked some coefficient we know yields clusters and thus hit the happy case assert ( - dbscan(sample_dataframe, epsilon, min_pts, "arealandmark", include_outliers = False).schema - == dbscan(sample_dataframe, 0.6, 3, "arealandmark").schema + dbscan( + sample_dataframe, + epsilon, + min_pts, + "arealandmark", + include_outliers=False, + ).schema + == dbscan(sample_dataframe, 0.6, 3, "arealandmark").schema ) def test_dbscan_doesnt_duplicate_border_points_in_two_clusters(self): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references - self.spark.conf.set( - "sedona.join.autoBroadcastJoinThreshold", -1 - ) + self.spark.conf.set("sedona.join.autoBroadcastJoinThreshold", -1) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) input_df = self.spark.createDataFrame( @@ -224,11 +230,11 @@ def test_dbscan_doesnt_duplicate_border_points_in_two_clusters(self): assert output_df.count() == 9 assert output_df.select("cluster").distinct().count() == 2 - def test_return_outliers_false_doesnt_return_outliers(self, sample_data, sample_dataframe): + def test_return_outliers_false_doesnt_return_outliers( + self, sample_data, sample_dataframe + ): # repeated broadcast joins with this small data size use a lot of RAM on broadcast references - self.spark.conf.set( - "sedona.join.autoBroadcastJoinThreshold", -1 - ) + self.spark.conf.set("sedona.join.autoBroadcastJoinThreshold", -1) self.spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) epsilon = 0.6 diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala b/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala index f9bfe9bc5c..cdfe5fca23 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/Util.scala @@ -25,18 +25,16 @@ private[stats] object Util { def getGeometryColumnName(dataframe: DataFrame): String = { val geomFields = dataframe.schema.fields.filter(_.dataType == GeometryUDT) - if (geomFields.isEmpty) { - throw new IllegalArgumentException("No GeometryType column found. Provide a dataframe containing a geometry column.") - } + if (geomFields.isEmpty) + throw new IllegalArgumentException( + "No GeometryType column found. Provide a dataframe containing a geometry column.") - if (geomFields.length == 1) { - geomFields.head.name - } + if (geomFields.length == 1) + return geomFields.head.name - if (geomFields.length > 1 && !geomFields.exists(_.name == "geometry")) { + if (geomFields.length > 1 && !geomFields.exists(_.name == "geometry")) throw new IllegalArgumentException( "Multiple GeometryType columns found. Provide the column name as an argument.") - } "geometry" } diff --git a/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala b/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala index e06ee5397e..5bc691c2dd 100644 --- a/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala +++ b/spark/common/src/main/scala/org/apache/sedona/stats/clustering/DBSCAN.scala @@ -67,7 +67,8 @@ object DBSCAN { case null => getGeometryColumnName(dataframe) case _ => geometry } - validateInputs(dataframe, epsilon, min_pts, geometryCol) + + validateInputs(dataframe, epsilon, minPts, geometryCol) val distanceFunction: (Column, Column) => Column = if (useSpheroid) ST_DistanceSpheroid else ST_Distance @@ -91,7 +92,7 @@ object DBSCAN { first(struct("left.*")).alias("leftContents"), count(col(s"right.id")).alias("neighbors_count"), collect_list(col(s"right.id")).alias("neighbors")) - .withColumn("isCore", col("neighbors_count") >= lit(min_pts)) + .withColumn("isCore", col("neighbors_count") >= lit(minPts)) .select("leftContents.*", "neighbors", "isCore") .checkpoint() @@ -141,11 +142,11 @@ object DBSCAN { private def validateInputs( geo_df: DataFrame, epsilon: Double, - min_pts: Int, + minPts: Int, geometry: String): Unit = { require(epsilon > 0, "epsilon must be greater than 0") - require(min_pts > 0, "min_pts must be greater than 0") - require(geo_df.columns.contains(geometry), "geometry column not found in idDataframe") + require(minPts > 0, "minPts must be greater than 0") + require(geo_df.columns.contains(geometry), "geometry column not found in dataframe") require( geo_df.schema.fields(geo_df.schema.fieldIndex(geometry)).dataType == GeometryUDT, "geometry column must be of type GeometryType")