From d13099f53af7ed8eec058221320119e812078f85 Mon Sep 17 00:00:00 2001 From: Junhao Liu <992364620@qq.com> Date: Thu, 13 Jul 2023 01:44:50 +0800 Subject: [PATCH] [SEDONA-303] Port all Sedona Spark functions to Sedona Flink -- Step 7 (#896) --- docs/api/flink/Constructor.md | 14 +++ docs/api/flink/Predicate.md | 62 ++++++++++++- .../java/org/apache/sedona/flink/Catalog.java | 5 + .../flink/expressions/Constructors.java | 16 +++- .../sedona/flink/expressions/Predicates.java | 92 +++++++++++++++++-- .../apache/sedona/flink/ConstructorTest.java | 26 ++++++ .../apache/sedona/flink/PredicateTest.java | 35 +++++++ 7 files changed, 239 insertions(+), 11 deletions(-) diff --git a/docs/api/flink/Constructor.md b/docs/api/flink/Constructor.md index f24313e86a..226719b6c4 100644 --- a/docs/api/flink/Constructor.md +++ b/docs/api/flink/Constructor.md @@ -176,6 +176,20 @@ SELECT ST_Point(x, y) AS pointshape FROM pointtable ``` +## ST_PointZ + +Introduction: Construct a Point from X, Y and Z and an optional srid. If srid is not set, it defaults to 0 (unknown). + +Format: `ST_PointZ (X:decimal, Y:decimal, Z:decimal)` +Format: `ST_PointZ (X:decimal, Y:decimal, Z:decimal, srid:integer)` + +Since: `v1.5.0` + +SQL example: +```sql +SELECT ST_PointZ(1.0, 2.0, 3.0) AS pointshape +``` + ## ST_PointFromText Introduction: Construct a Point from Text, delimited by Delimiter diff --git a/docs/api/flink/Predicate.md b/docs/api/flink/Predicate.md index 8794ddc6d8..994b7cd15a 100644 --- a/docs/api/flink/Predicate.md +++ b/docs/api/flink/Predicate.md @@ -13,6 +13,21 @@ FROM pointdf WHERE ST_Contains(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), pointdf.arealandmark) ``` +## ST_Crosses + +Introduction: Return true if A crosses B + +Format: `ST_Crosses (A:geometry, B:geometry)` + +Since: `v1.5.0` + +SQL example: +```sql +SELECT * +FROM pointdf +WHERE ST_Crosses(pointdf.arealandmark, ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0)) +``` + ## ST_Disjoint Introduction: Return true if A and B are disjoint @@ -21,13 +36,28 @@ Format: `ST_Disjoint (A:geometry, B:geometry)` Since: `v1.2.1` -Spark SQL example: +SQL example: ```sql SELECT * FROM pointdf WHERE ST_Disjoinnt(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), pointdf.arealandmark) ``` +## ST_Equals + +Introduction: Return true if A equals to B + +Format: `ST_Equals (A:geometry, B:geometry)` + +Since: `v1.5.0` + +SQL example: +```sql +SELECT * +FROM pointdf +WHERE ST_Equals(pointdf.arealandmark, ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0)) +``` + ## ST_Intersects Introduction: Return true if A intersects B @@ -43,6 +73,36 @@ FROM pointdf WHERE ST_Intersects(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), pointdf.arealandmark) ``` +## ST_Overlaps + +Introduction: Return true if A overlaps B + +Format: `ST_Overlaps (A:geometry, B:geometry)` + +Since: `v1.5.0` + +SQL example: +```sql +SELECT * +FROM geom +WHERE ST_Overlaps(geom.geom_a, geom.geom_b) +``` + +## ST_Touches + +Introduction: Return true if A touches B + +Format: `ST_Touches (A:geometry, B:geometry)` + +Since: `v1.5.0` + +SQL example: +```sql +SELECT * +FROM pointdf +WHERE ST_Touches(pointdf.arealandmark, ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0)) +``` + ## ST_Within Introduction: Return true if A is within B diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java index 5d3432275f..5cfe8537b9 100644 --- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java +++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java @@ -24,6 +24,7 @@ public static UserDefinedFunction[] getFuncs() { new Aggregators.ST_Envelope_Aggr(), new Aggregators.ST_Union_Aggr(), new Constructors.ST_Point(), + new Constructors.ST_PointZ(), new Constructors.ST_PointFromText(), new Constructors.ST_LineStringFromText(), new Constructors.ST_LineFromText(), @@ -140,11 +141,15 @@ public static UserDefinedFunction[] getPredicates() { return new UserDefinedFunction[]{ new Predicates.ST_Intersects(), new Predicates.ST_Contains(), + new Predicates.ST_Crosses(), new Predicates.ST_Within(), new Predicates.ST_Covers(), new Predicates.ST_CoveredBy(), new Predicates.ST_Disjoint(), + new Predicates.ST_Equals(), new Predicates.ST_OrderingEquals(), + new Predicates.ST_Overlaps(), + new Predicates.ST_Touches(), }; } } diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java index 36bbeba8d3..fcc426a0a2 100644 --- a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java +++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java @@ -38,9 +38,19 @@ private static Geometry getGeometryByType(String geom, String inputDelimiter, Ge public static class ST_Point extends ScalarFunction { @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) public Geometry eval(@DataTypeHint("Double") Double x, @DataTypeHint("Double") Double y) throws ParseException { - Coordinate coordinates = new Coordinate(x, y); - GeometryFactory geometryFactory = new GeometryFactory(); - return geometryFactory.createPoint(coordinates); + return org.apache.sedona.common.Constructors.point(x, y); + } + } + + public static class ST_PointZ extends ScalarFunction { + @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) + public Geometry eval(@DataTypeHint("Double") Double x, @DataTypeHint("Double") Double y, @DataTypeHint("Double") Double z) throws ParseException { + return eval(x, y, z, 0); + } + + @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) + public Geometry eval(@DataTypeHint("Double") Double x, @DataTypeHint("Double") Double y, @DataTypeHint("Double") Double z, @DataTypeHint("Integer") Integer srid) throws ParseException { + return org.apache.sedona.common.Constructors.pointZ(x, y, z, srid); } } diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java index b29e6ad5fd..f8416d1948 100644 --- a/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java +++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java @@ -33,7 +33,7 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt { Geometry geom1 = (Geometry) o1; Geometry geom2 = (Geometry) o2; - return geom1.intersects(geom2); + return org.apache.sedona.common.Predicates.intersects(geom1, geom2); } } @@ -53,7 +53,7 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt { Geometry geom1 = (Geometry) o1; Geometry geom2 = (Geometry) o2; - return geom1.contains(geom2); + return org.apache.sedona.common.Predicates.contains(geom1, geom2); } } @@ -72,7 +72,7 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt { Geometry geom1 = (Geometry) o1; Geometry geom2 = (Geometry) o2; - return geom1.within(geom2); + return org.apache.sedona.common.Predicates.within(geom1, geom2); } } @@ -92,7 +92,7 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt { Geometry geom1 = (Geometry) o1; Geometry geom2 = (Geometry) o2; - return geom1.covers(geom2); + return org.apache.sedona.common.Predicates.covers(geom1, geom2); } } @@ -112,7 +112,25 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt { Geometry geom1 = (Geometry) o1; Geometry geom2 = (Geometry) o2; - return geom1.coveredBy(geom2); + return org.apache.sedona.common.Predicates.coveredBy(geom1, geom2); + } + } + + public static class ST_Crosses extends ScalarFunction + { + /** + * Constructor for relation checking without duplicate removal + */ + public ST_Crosses() + { + } + + @DataTypeHint("Boolean") + public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) + { + Geometry geom1 = (Geometry) o1; + Geometry geom2 = (Geometry) o2; + return org.apache.sedona.common.Predicates.crosses(geom1, geom2); } } @@ -132,7 +150,27 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt { Geometry geom1 = (Geometry) o1; Geometry geom2 = (Geometry) o2; - return geom1.disjoint(geom2); + return org.apache.sedona.common.Predicates.disjoint(geom1, geom2); + } + } + + public static class ST_Equals + extends ScalarFunction + { + + /** + * Constructor for relation checking without duplicate removal + */ + public ST_Equals() + { + } + + @DataTypeHint("Boolean") + public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) + { + Geometry geom1 = (Geometry) o1; + Geometry geom2 = (Geometry) o2; + return org.apache.sedona.common.Predicates.equals(geom1, geom2); } } @@ -152,7 +190,47 @@ public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jt { Geometry geom1 = (Geometry) o1; Geometry geom2 = (Geometry) o2; - return geom1.equalsExact(geom2); + return org.apache.sedona.common.Predicates.orderingEquals(geom1, geom2); + } + } + + public static class ST_Overlaps + extends ScalarFunction + { + + /** + * Constructor for relation checking without duplicate removal + */ + public ST_Overlaps() + { + } + + @DataTypeHint("Boolean") + public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) + { + Geometry geom1 = (Geometry) o1; + Geometry geom2 = (Geometry) o2; + return org.apache.sedona.common.Predicates.overlaps(geom1, geom2); + } + } + + public static class ST_Touches + extends ScalarFunction + { + + /** + * Constructor for relation checking without duplicate removal + */ + public ST_Touches() + { + } + + @DataTypeHint("Boolean") + public Boolean eval(@DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o1, @DataTypeHint(value = "RAW", bridgedTo = org.locationtech.jts.geom.Geometry.class) Object o2) + { + Geometry geom1 = (Geometry) o1; + Geometry geom2 = (Geometry) o2; + return org.apache.sedona.common.Predicates.touches(geom1, geom2); } } } diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java index 2b0514e282..a39a8a0422 100644 --- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java +++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java @@ -21,6 +21,7 @@ import org.locationtech.jts.geom.Coordinate; import org.locationtech.jts.geom.Geometry; import org.locationtech.jts.geom.GeometryFactory; +import org.locationtech.jts.geom.Point; import org.locationtech.jts.geom.Polygon; import org.wololo.jts2geojson.GeoJSONReader; @@ -73,6 +74,31 @@ public void test2DPoint() { assertEquals(expected, result); } + @Test + public void testPointZ() { + List data = new ArrayList<>(); + data.add(Row.of(2.0, 2.0, 5.0, "point")); + String[] colNames = new String[]{"x", "y", "z", "name_point"}; + + TypeInformation[] colTypes = { + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO, + BasicTypeInfo.STRING_TYPE_INFO}; + RowTypeInfo typeInfo = new RowTypeInfo(colTypes, colNames); + DataStream ds = env.fromCollection(data).returns(typeInfo); + Table pointTable = tableEnv.fromDataStream(ds); + + Table geomTable = pointTable + .select(call(Constructors.ST_PointZ.class.getSimpleName(), $(colNames[0]), $(colNames[1]), $(colNames[2])) + .as(colNames[3])); + + Point result = first(geomTable) + .getFieldAs(colNames[3]); + + assertEquals(5.0, result.getCoordinate().getZ(), 1e-6); + } + @Test public void testPointFromText() { List data = createPointWKT(testDataSize); diff --git a/flink/src/test/java/org/apache/sedona/flink/PredicateTest.java b/flink/src/test/java/org/apache/sedona/flink/PredicateTest.java index 1b991f93c3..237c16d524 100644 --- a/flink/src/test/java/org/apache/sedona/flink/PredicateTest.java +++ b/flink/src/test/java/org/apache/sedona/flink/PredicateTest.java @@ -14,10 +14,13 @@ package org.apache.sedona.flink; import org.apache.flink.table.api.Table; +import org.apache.sedona.flink.expressions.Predicates; import org.junit.BeforeClass; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.call; public class PredicateTest extends TestBase{ @BeforeClass @@ -79,6 +82,22 @@ public void testCoveredBy() { assertEquals(1, count(result)); } + @Test + public void testCrosses() { + Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('MULTIPOINT((0 0), (2 2))') AS g1, ST_GeomFromWKT('LINESTRING(-1 -1, 1 1)') as g2"); + table = table.select(call(Predicates.ST_Crosses.class.getSimpleName(), $("g1"), $("g2"))); + Boolean actual = (Boolean) first(table).getField(0); + assertEquals(true, actual); + } + + @Test + public void testEquals() { + Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0 0, 2 2)') AS g1, ST_GeomFromWKT('LINESTRING (0 0, 1 1, 2 2)') as g2"); + table = table.select(call(Predicates.ST_Equals.class.getSimpleName(), $("g1"), $("g2"))); + Boolean actual = (Boolean) first(table).getField(0); + assertEquals(true, actual); + } + @Test public void testOrderingEquals() { Table lineStringTable = createLineStringTable(testDataSize); @@ -87,4 +106,20 @@ public void testOrderingEquals() { Table result = lineStringTable.filter(expr); assertEquals(1, count(result)); } + + @Test + public void testOverlaps() { + Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0 0, 2 2)') AS g1, ST_GeomFromWKT('LINESTRING (1 1, 3 3)') as g2"); + table = table.select(call(Predicates.ST_Overlaps.class.getSimpleName(), $("g1"), $("g2"))); + Boolean actual = (Boolean) first(table).getField(0); + assertEquals(true, actual); + } + + @Test + public void testTouches() { + Table table = tableEnv.sqlQuery("SELECT ST_GeomFromWKT('LINESTRING (0 0, 1 0)') AS g1, ST_GeomFromWKT('LINESTRING (0 0, 1 1)') as g2"); + table = table.select(call(Predicates.ST_Touches.class.getSimpleName(), $("g1"), $("g2"))); + Boolean actual = (Boolean) first(table).getField(0); + assertEquals(true, actual); + } }