Skip to content

Commit

Permalink
[SEDONA-303] Port all Sedona Spark function to Sedona Flink -- ST_Int…
Browse files Browse the repository at this point in the history
…ersection_Aggr (#905)
  • Loading branch information
yyy1000 authored Jul 17, 2023
1 parent af3aac6 commit 8d7ca37
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 0 deletions.
14 changes: 14 additions & 0 deletions docs/api/flink/Aggregator.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ SELECT ST_Envelope_Aggr(pointdf.arealandmark)
FROM pointdf
```

## ST_Intersection_Aggr

Introduction: Return the polygon intersection of all polygons in A

Format: `ST_Intersection_Aggr (A:geometryColumn)`

Since: `v1.5.0`

SQL example:
```sql
SELECT ST_Intersection_Aggr(polygondf.polygonshape)
FROM polygondf
```

## ST_Union_Aggr

Introduction: Return the polygon union of all polygons in A. All inputs must be polygons.
Expand Down
1 change: 1 addition & 0 deletions flink/src/main/java/org/apache/sedona/flink/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class Catalog {
public static UserDefinedFunction[] getFuncs() {
return new UserDefinedFunction[]{
new Aggregators.ST_Envelope_Aggr(),
new Aggregators.ST_Intersection_Aggr(),
new Aggregators.ST_Union_Aggr(),
new Constructors.ST_Point(),
new Constructors.ST_PointZ(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,61 @@ public void resetAccumulator(Accumulators.Envelope acc) {
}
}


// Compute the Union boundary of numbers of geometries
//
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
public static class ST_Intersection_Aggr extends AggregateFunction<Geometry, Accumulators.AccGeometry> {

@Override
public Accumulators.AccGeometry createAccumulator() {
return new Accumulators.AccGeometry();
}

@Override
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
public Geometry getValue(Accumulators.AccGeometry acc) {
return acc.geom;
}

public void accumulate(Accumulators.AccGeometry acc,
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
if (acc.geom == null){
acc.geom = (Geometry) o;
} else {
acc.geom = acc.geom.intersection((Geometry) o);
}
}

/**
* TODO: find an efficient algorithm to incrementally and decrementally update the accumulator
*
* @param acc
* @param o
*/
public void retract(Accumulators.AccGeometry acc,
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
Geometry geometry = (Geometry) o;
assert (false);
}

public void merge (Accumulators.AccGeometry acc, Iterable < Accumulators.AccGeometry > it){
for (Accumulators.AccGeometry a : it) {
if (acc.geom == null){
// make accumulate equal to acc
acc.geom = a.geom;
} else {
acc.geom = acc.geom.intersection(a.geom);
}
}
}

public void resetAccumulator (Accumulators.AccGeometry acc){
acc.geom = null;
}
}


// Compute the Union boundary of numbers of geometries
//
@DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
Expand Down
13 changes: 13 additions & 0 deletions flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ public void testKNN() {
assertEquals(5.656854249492381, last(resultTable).getField(0));
}

@Test
public void testIntersection_Aggr(){
Table polygonTable = createPolygonOverlappingTable(testDataSize);
Table result = polygonTable.select(call("ST_Intersection_Aggr", $(polygonColNames[0])));
Row last = last(result);
assertEquals("LINESTRING EMPTY", last.getField(0).toString());

polygonTable = createPolygonOverlappingTable(3);
result = polygonTable.select(call("ST_Intersection_Aggr", $(polygonColNames[0])));
last = last(result);
assertEquals("LINESTRING (1 1, 1 0)", last.getField(0).toString());
}

@Test
public void testUnion_Aggr(){
Table polygonTable = createPolygonOverlappingTable(testDataSize);
Expand Down

0 comments on commit 8d7ca37

Please sign in to comment.