Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1040] rework schema definitions in geo processors #1268

Merged
merged 4 commits into from
Feb 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
private static final String LAT_KEY = "latitude-key";
private static final String LNG_KEY = "longitude-key";
private static final String EPSG_KEY = "epsg-key";
private static final String WKT_RUNTIME = "geomWKT";
private static final String GEOMETRY_RUNTIME = "geometry";
private String latitudeMapper;
private String longitudeMapper;
private String epsgMapper;
Expand Down Expand Up @@ -76,7 +76,7 @@ public DataProcessorDescription declareModel() {
.outputStrategy(
OutputStrategies.append(
PrimitivePropertyBuilder
.create(Datatypes.String, WKT_RUNTIME)
.create(Datatypes.String, GEOMETRY_RUNTIME)
.domainProperty("http://www.opengis.net/ont/geosparql#Geometry")
.build()
)
Expand All @@ -101,7 +101,8 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
Point geom = SpGeometryBuilder.createSPGeom(lng, lat, epsg);

if (!geom.isEmpty()) {
event.addField(WKT_RUNTIME, geom.toString());
event.addField(GEOMETRY_RUNTIME, geom.toString());

LOG.debug("Created Geometry: " + geom.toString());
collector.collect(event);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
Expand All @@ -49,7 +47,7 @@ public class ReprojectionProcessor extends StreamPipesDataProcessor {
public static final String GEOM_KEY = "geom-key";
public static final String SOURCE_EPSG_KEY = "source-epsg-key";
public static final String TARGET_EPSG_KEY = "target-epsg-key";
public static final String GEOM_RUNTIME = "geomWKT";
public static final String GEOMETRY_RUNTIME = "geometry";
public static final String EPSG_RUNTIME = "epsg";
private String geometryMapper;
private String sourceEpsgMapper;
Expand All @@ -75,8 +73,6 @@ public DataProcessorDescription declareModel() {
.build())
.outputStrategy(OutputStrategies.keep())
.requiredIntegerParameter(Labels.withId(TARGET_EPSG_KEY), 32632)
.supportedFormats(SupportedFormats.jsonFormat())
.supportedProtocols(SupportedProtocols.kafka())
.build();
}

Expand Down Expand Up @@ -134,7 +130,7 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx

if (!reprojected.isEmpty()) {
event.updateFieldBySelector("s0::" + EPSG_RUNTIME, targetEpsg);
event.updateFieldBySelector("s0::" + GEOM_RUNTIME, reprojected.toText());
event.updateFieldBySelector("s0::" + GEOMETRY_RUNTIME, reprojected.toText());

collector.collect(event);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
private static final String SUBPOINTS_KEY = "subpoints-key";
private static final String DESCRIPTION_KEY = "description-key";
private static final String TRAJECTORY_KEY = "trajectory-key";
private static final String TRAJECTORY_RUNTIME = "trajectoryWKT";
private static final String DESCRIPTION_RUNTIME = "trajectoryDescription";
private static final String TRAJECTORY_GEOMETRY_RUNTIME = "trajectory-geometry";
private static final String TRAJECTORY_EPSG_RUNTIME = "trajectory-epsg";
private static final String DESCRIPTION_RUNTIME = "trajectory-description";
private String pointMapper;
private String epsgMapper;
private String mValueMapper;
Expand Down Expand Up @@ -96,8 +97,12 @@ public DataProcessorDescription declareModel() {
SO.TEXT),
EpProperties.stringEp(
Labels.withId(TRAJECTORY_KEY),
TRAJECTORY_RUNTIME,
"http://www.opengis.net/ont/geosparql#Geometry")
TRAJECTORY_GEOMETRY_RUNTIME,
"http://www.opengis.net/ont/geosparql#Geometry"),
EpProperties.integerEp(
Labels.withId(EPSG_KEY),
TRAJECTORY_EPSG_RUNTIME,
"http://data.ign.fr/def/ignf#CartesianCS")
)
)
.build();
Expand Down Expand Up @@ -131,7 +136,8 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
LineString geom = trajectory.returnAsLineString(eventGeom.getFactory());
// adds to stream
event.addField(DESCRIPTION_RUNTIME, trajectory.getDescription());
event.addField(TRAJECTORY_RUNTIME, geom.toString());
event.addField(TRAJECTORY_GEOMETRY_RUNTIME, geom.toString());
event.addField(TRAJECTORY_EPSG_RUNTIME, epsg);
collector.collect(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,30 @@
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.vocabulary.Geo;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

import java.net.URI;

public class HaversineDistanceCalculatorProcessor extends StreamPipesDataProcessor {
private static final String LAT_1_KEY = "lat1";
private static final String LONG_1_KEY = "long1";
private static final String LAT_2_KEY = "lat2";
private static final String LONG_2_KEY = "long2";
private static final String CALCULATED_DISTANCE_KEY = "calculatedDistance";
private static final String DISTANCE_RUNTIME_NAME = "distance";
String lat1FieldMapper;
String long1FieldMapper;
String lat2FieldMapper;
Expand All @@ -69,12 +72,12 @@ public DataProcessorDescription declareModel() {
Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
.build()
)
.outputStrategy(OutputStrategies
.append(EpProperties.numberEp(
Labels.withId(CALCULATED_DISTANCE_KEY),
"distance",
SO.NUMBER))
)
.outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
.create(Datatypes.Float, DISTANCE_RUNTIME_NAME)
.domainProperty(SO.NUMBER)
.measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer"))
.build())
)
.build();
}

Expand All @@ -99,7 +102,7 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx

double resultDist = HaversineDistanceUtil.dist(lat1, long1, lat2, long2);

event.addField("distance", resultDist);
event.addField(DISTANCE_RUNTIME_NAME, resultDist);

collector.collect(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
private static final String LATITUDE_KEY = "latitude-key";
private static final String LONGITUDE_KEY = "longitude-key";
private static final String COUNT_WINDOW_KEY = "count-window-key";
private static final String SPEED_KEY = "speed-key";
private static final String SPEED_RUNTIME_NAME = "speed";
private String latitudeFieldMapper;
private String longitudeFieldMapper;
private String timestampFieldMapper;
Expand All @@ -75,7 +75,7 @@ public DataProcessorDescription declareModel() {
.requiredIntegerParameter(Labels.withId(COUNT_WINDOW_KEY))
.outputStrategy(
OutputStrategies.append(PrimitivePropertyBuilder
.create(Datatypes.Float, SPEED_KEY)
.create(Datatypes.Float, SPEED_RUNTIME_NAME)
.domainProperty(SO.NUMBER)
.measurementUnit(URI.create("http://qudt.org/vocab/unit#KilometerPerHour"))
.build())
Expand All @@ -98,7 +98,7 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
if (this.buffer.isFull()) {
Event firstEvent = (Event) buffer.get();
double speed = calculateSpeed(firstEvent, event);
event.addField(SPEED_KEY, speed);
event.addField(SPEED_RUNTIME_NAME, speed);
collector.collect(event);
}
this.buffer.add(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection.title=Geo CRS Reprojection
org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection.description=Coordinate reprojection from source to target CRS

wkt-key=WKT.title=WKT
wkt-key=WKT.description=Geometry
geom-key.title=WKT
geom-key.description=Geometry

source-epsg-key.title=CRS of Input Geometry
source-epsg-key.description=EPSG-Code of input point

target_epsg-key.title=Target CRS
target_epsg-key.description=EPSG-Code of target CRS
target-epsg-key.title=Target CRS
target-epsg-key.description=EPSG-Code of target CRS