diff --git a/docs/api/flink/Aggregator.md b/docs/api/flink/Aggregator.md index 8e4e955417..57e0c7e8c2 100644 --- a/docs/api/flink/Aggregator.md +++ b/docs/api/flink/Aggregator.md @@ -12,6 +12,20 @@ SELECT ST_Envelope_Aggr(pointdf.arealandmark) FROM pointdf ``` +## ST_Intersection_Aggr + +Introduction: Return the polygon intersection of all polygons in A + +Format: `ST_Intersection_Aggr (A:geometryColumn)` + +Since: `v1.5.0` + +SQL example: +```sql +SELECT ST_Intersection_Aggr(polygondf.polygonshape) +FROM polygondf +``` + ## ST_Union_Aggr Introduction: Return the polygon union of all polygons in A. All inputs must be polygons. diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java index 3087ec23e4..d779b1ef37 100644 --- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java +++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java @@ -22,6 +22,7 @@ public class Catalog { public static UserDefinedFunction[] getFuncs() { return new UserDefinedFunction[]{ new Aggregators.ST_Envelope_Aggr(), + new Aggregators.ST_Intersection_Aggr(), new Aggregators.ST_Union_Aggr(), new Constructors.ST_Point(), new Constructors.ST_PointZ(), diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java index 3b019907f6..c69a98b32f 100644 --- a/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java +++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java @@ -82,6 +82,61 @@ public void resetAccumulator(Accumulators.Envelope acc) { } } + + // Compute the Union boundary of numbers of geometries + // + @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) + public static class ST_Intersection_Aggr extends AggregateFunction { + + @Override + public Accumulators.AccGeometry createAccumulator() { + return new Accumulators.AccGeometry(); + } + + @Override + @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) + public Geometry getValue(Accumulators.AccGeometry acc) { + return acc.geom; + } + + public void accumulate(Accumulators.AccGeometry acc, + @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) { + if (acc.geom == null){ + acc.geom = (Geometry) o; + } else { + acc.geom = acc.geom.intersection((Geometry) o); + } + } + + /** + * TODO: find an efficient algorithm to incrementally and decrementally update the accumulator + * + * @param acc + * @param o + */ + public void retract(Accumulators.AccGeometry acc, + @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) { + Geometry geometry = (Geometry) o; + assert (false); + } + + public void merge (Accumulators.AccGeometry acc, Iterable < Accumulators.AccGeometry > it){ + for (Accumulators.AccGeometry a : it) { + if (acc.geom == null){ + // make accumulate equal to acc + acc.geom = a.geom; + } else { + acc.geom = acc.geom.intersection(a.geom); + } + } + } + + public void resetAccumulator (Accumulators.AccGeometry acc){ + acc.geom = null; + } + } + + // Compute the Union boundary of numbers of geometries // @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) diff --git a/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java index 1e8e3fe30d..9958a33e0b 100644 --- a/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java +++ b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java @@ -55,6 +55,19 @@ public void testKNN() { assertEquals(5.656854249492381, last(resultTable).getField(0)); } + @Test + public void testIntersection_Aggr(){ + Table polygonTable = createPolygonOverlappingTable(testDataSize); + Table result = polygonTable.select(call("ST_Intersection_Aggr", $(polygonColNames[0]))); + Row last = last(result); + assertEquals("LINESTRING EMPTY", last.getField(0).toString()); + + polygonTable = createPolygonOverlappingTable(3); + result = polygonTable.select(call("ST_Intersection_Aggr", $(polygonColNames[0]))); + last = last(result); + assertEquals("LINESTRING (1 1, 1 0)", last.getField(0).toString()); + } + @Test public void testUnion_Aggr(){ Table polygonTable = createPolygonOverlappingTable(testDataSize);