Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SEDONA-303] Port all Sedona Spark functions to Sedona Flink -- Step 5 #890

Merged
merged 2 commits into from
Jul 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions docs/api/flink/Function.md
Original file line number Diff line number Diff line change
Expand Up @@ -1588,6 +1588,110 @@ SELECT ST_SRID(polygondf.countyshape)
FROM polygondf
```

## ST_SimplifyPreserveTopology

Introduction: Simplifies a geometry and ensures that the result is a valid geometry having the same dimension and number of components as the input,
and with the components having the same topological relationship.

Since: `v1.5.0`

Format: `ST_SimplifyPreserveTopology (A:geometry, distanceTolerance: Double)`

Example:
```sql
SELECT ST_SimplifyPreserveTopology(polygondf.countyshape, 10.0)
FROM polygondf
```

## ST_StartPoint

Introduction: Returns first point of given linestring.

Format: `ST_StartPoint(geom: geometry)`

Since: `v1.5.0`

Example:
```sql
SELECT ST_StartPoint(ST_GeomFromText('LINESTRING(100 150,50 60, 70 80, 160 170)'))
```

Output: `POINT(100 150)`

## ST_SubDivide

Introduction: Returns list of geometries divided based of given maximum number of vertices.

Format: `ST_SubDivide(geom: geometry, maxVertices: int)`

Since: `v1.5.0`

Example:
```sql
SELECT ST_SubDivide(ST_GeomFromText("POLYGON((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 20, 20 30))"), 5)

```

Output:
```
[
POLYGON((37.857142857142854 20, 35 10, 10 20, 37.857142857142854 20)),
POLYGON((15 20, 10 20, 15 40, 15 20)),
POLYGON((20 20, 15 20, 15 30, 20 30, 20 20)),
POLYGON((26.428571428571427 20, 20 20, 20 30, 26.4285714 23.5714285, 26.4285714 20)),
POLYGON((15 30, 15 40, 20 40, 20 30, 15 30)),
POLYGON((20 40, 26.4285714 40, 26.4285714 32.1428571, 20 30, 20 40)),
POLYGON((37.8571428 20, 30 20, 34.0476190 32.1428571, 37.8571428 32.1428571, 37.8571428 20)),
POLYGON((34.0476190 34.6825396, 26.4285714 32.1428571, 26.4285714 40, 34.0476190 40, 34.0476190 34.6825396)),
POLYGON((34.0476190 32.1428571, 35 35, 37.8571428 35, 37.8571428 32.1428571, 34.0476190 32.1428571)),
POLYGON((35 35, 34.0476190 34.6825396, 34.0476190 35, 35 35)),
POLYGON((34.0476190 35, 34.0476190 40, 37.8571428 40, 37.8571428 35, 34.0476190 35)),
POLYGON((30 20, 26.4285714 20, 26.4285714 23.5714285, 30 20)),
POLYGON((15 40, 37.8571428 43.8095238, 37.8571428 40, 15 40)),
POLYGON((45 45, 37.8571428 20, 37.8571428 43.8095238, 45 45))
]
```

Example:

```sql
SELECT ST_SubDivide(ST_GeomFromText("LINESTRING(0 0, 85 85, 100 100, 120 120, 21 21, 10 10, 5 5)"), 5)
```

Output:
```
[
LINESTRING(0 0, 5 5)
LINESTRING(5 5, 10 10)
LINESTRING(10 10, 21 21)
LINESTRING(21 21, 60 60)
LINESTRING(60 60, 85 85)
LINESTRING(85 85, 100 100)
LINESTRING(100 100, 120 120)
]
```

## ST_SymDifference

Introduction: Return the symmetrical difference between geometry A and B (return parts of geometries which are in either of the sets, but not in their intersection)


Format: `ST_SymDifference (A:geometry, B:geometry)`

Since: `v1.5.0`

Example:

```sql
SELECT ST_SymDifference(ST_GeomFromWKT('POLYGON ((-3 -3, 3 -3, 3 3, -3 3, -3 -3))'), ST_GeomFromWKT('POLYGON ((-2 -3, 4 -3, 4 3, -2 3, -2 -3))'))
```

Result:

```
MULTIPOLYGON (((-2 -3, -3 -3, -3 3, -2 3, -2 -3)), ((3 -3, 3 3, 4 3, 4 -3, 3 -3)))
```

## ST_Transform

Introduction:
Expand Down
4 changes: 4 additions & 0 deletions flink/src/main/java/org/apache/sedona/flink/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ public static UserDefinedFunction[] getFuncs() {
new Functions.ST_MakePolygon(),
new Functions.ST_MakeValid(),
new Functions.ST_Multi(),
new Functions.ST_StartPoint(),
new Functions.ST_SimplifyPreserveTopology(),
new Functions.ST_Split(),
new Functions.ST_Subdivide(),
new Functions.ST_SymDifference(),
new Functions.ST_S2CellIDs(),
new Functions.ST_GeometricMedian(),
new Functions.ST_NumPoints(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,14 @@ public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.j
}
}

public static class ST_StartPoint extends ScalarFunction {
@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) {
Geometry geom = (Geometry) o;
return org.apache.sedona.common.Functions.startPoint(geom);
}
}

public static class ST_Split extends ScalarFunction {
@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1,
Expand All @@ -718,6 +726,34 @@ public Long[] eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts
}
}

public static class ST_SimplifyPreserveTopology extends ScalarFunction {
@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o,
@DataTypeHint("Double") Double distanceTolerance) {
Geometry geom = (Geometry) o;
return org.apache.sedona.common.Functions.simplifyPreserveTopology(geom, distanceTolerance);
}
}

public static class ST_Subdivide extends ScalarFunction {
@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry[].class)
public Geometry[] eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o,
@DataTypeHint("INT") Integer maxVertices) {
Geometry geom = (Geometry) o;
return org.apache.sedona.common.Functions.subDivide(geom, maxVertices);
}
}

public static class ST_SymDifference extends ScalarFunction {
@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1,
@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) {
Geometry geom1 = (Geometry) o1;
Geometry geom2 = (Geometry) o2;
return org.apache.sedona.common.Functions.symDifference(geom1, geom2);
}
}

public static class ST_GeometricMedian extends ScalarFunction {
@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class)
public Geometry eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o) throws Exception {
Expand Down
33 changes: 33 additions & 0 deletions flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -783,12 +783,45 @@ public void testMulti() {
assertEquals("MULTIPOINT ((0 0))", result.toString());
}

@Test
public void testStartPoint() {
Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0 0, 1 0)') AS geom");
table = table.select(call(Functions.ST_StartPoint.class.getSimpleName(), $("geom")));
Geometry result = (Geometry) first(table).getField(0);
assertEquals("POINT (0 0)", result.toString());
}

@Test
public void testSimplifyPreserveTopology() {
Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('POLYGON ((0 0, 1 0, 1 0.9, 1 1, 0 0))') AS geom");
table = table.select(call(Functions.ST_SimplifyPreserveTopology.class.getSimpleName(), $("geom"), 0.2));
Geometry result = (Geometry) first(table).getField(0);
assertEquals("POLYGON ((0 0, 1 0, 1 1, 0 0))", result.toString());
}

@Test
public void testSplit() {
Table pointTable = tableEnv.sqlQuery("SELECT ST_Split(ST_GeomFromWKT('LINESTRING (0 0, 1.5 1.5, 2 2)'), ST_GeomFromWKT('MULTIPOINT (0.5 0.5, 1 1)'))");
assertEquals("MULTILINESTRING ((0 0, 0.5 0.5), (0.5 0.5, 1 1), (1 1, 1.5 1.5, 2 2))", ((Geometry)first(pointTable).getField(0)).norm().toText());
}

@Test
public void testSubdivide() {
Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0 0, 1 0, 2 0, 3 0, 4 0, 5 0)') AS geom");
table = table.select(call(Functions.ST_Subdivide.class.getSimpleName(), $("geom"), 5));
Geometry[] result = (Geometry[]) first(table).getField(0);
assertEquals("LINESTRING (0 0, 2.5 0)", result[0].toString());
assertEquals("LINESTRING (2.5 0, 5 0)", result[1].toString());
}

@Test
public void testSymDifference() {
Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('POLYGON ((-1 -1, 1 -1, 1 1, -1 1, -1 -1))') AS a, ST_GeomFromWKT('POLYGON ((0 -2, 2 -2, 2 0, 0 0, 0 -2))') AS b");
table = table.select(call(Functions.ST_SymDifference.class.getSimpleName(), $("a"), $("b")));
Geometry result = (Geometry) first(table).getField(0);
assertEquals("MULTIPOLYGON (((0 -1, -1 -1, -1 1, 1 1, 1 0, 0 0, 0 -1)), ((0 -1, 1 -1, 1 0, 2 0, 2 -2, 0 -2, 0 -1)))", result.toString());
}

@Test
public void testS2CellIDs() {
String initExplodeQuery = "SELECT id, geom, cell_tbl.cell from (VALUES %s) as raw_tbl(id, geom, cells) CROSS JOIN UNNEST(raw_tbl.cells) AS cell_tbl (cell)";
Expand Down