diff --git a/packages/libs/utils/utils/convert.py b/packages/libs/utils/utils/convert.py index e0dd56d..ba037a6 100644 --- a/packages/libs/utils/utils/convert.py +++ b/packages/libs/utils/utils/convert.py @@ -2,6 +2,10 @@ def to_float(value): return float(value) if value is not None else None +def to_string(value): + return f"{value}".strip() if value is not None else None + + def dms_to_float(dms_value: str): """ Convert Degrees, Minutes, Seconds coordinate to float value diff --git a/packages/pipe-vms-ingestion/common/transforms/calculate_implied_course.py b/packages/pipe-vms-ingestion/common/transforms/calculate_implied_course.py new file mode 100644 index 0000000..e8daab1 --- /dev/null +++ b/packages/pipe-vms-ingestion/common/transforms/calculate_implied_course.py @@ -0,0 +1,23 @@ +import math + + +def calculate_implied_course(lat1, lon1, lat2, lon2): + if None in [lat1, lon1, lat2, lon2]: + return None + # Calculates forward azimuth - course along a great circle + # from a to b + alat = math.radians(lat1) + alon = math.radians(lon1) + blat = math.radians(lat2) + blon = math.radians(lon2) + dlon = blon - alon + return ( + 360 + + math.degrees( + math.atan2( + math.sin(dlon) * math.cos(blat), + math.cos(alat) * math.sin(blat) + - math.sin(alat) * math.cos(blat) * math.cos(dlon), + ) + ) + ) % 360 diff --git a/packages/pipe-vms-ingestion/common/transforms/calculate_implied_speed.py b/packages/pipe-vms-ingestion/common/transforms/calculate_implied_speed.py new file mode 100644 index 0000000..fead129 --- /dev/null +++ b/packages/pipe-vms-ingestion/common/transforms/calculate_implied_speed.py @@ -0,0 +1,11 @@ +from haversine import Unit, haversine + + +def calculate_implied_speed_kt(ts1, lat1, lon1, ts2, lat2, lon2): + if None in [ts1, lat1, lon1, ts2, lat2, lon2]: + return None + prev = (lat1, lon1) + current = (lat2, lon2) + distance_kt = abs(haversine(current, prev, unit=Unit.NAUTICAL_MILES)) + timedelta_h = abs((ts2 - ts1).total_seconds() / (60 * 60)) + return 0 if timedelta_h == 0 else (distance_kt / timedelta_h) diff --git a/packages/pipe-vms-ingestion/tests/vms_ingestion/common/transforms/data/implied_speed_course.py b/packages/pipe-vms-ingestion/tests/vms_ingestion/common/transforms/data/implied_speed_course.py new file mode 100644 index 0000000..fb5464d --- /dev/null +++ b/packages/pipe-vms-ingestion/tests/vms_ingestion/common/transforms/data/implied_speed_course.py @@ -0,0 +1,269 @@ +import datetime + +MESSAGES_WITH_IMPLIED_SPEED_AND_COURSE = [ + # From Belize id 34 (mmsi: 312891000) + ( + "312891000", + datetime.datetime(2023, 12, 27, 0, 1, 22, tzinfo=datetime.timezone.utc), + 16.252783, + -16.671667, + 174.72, + 4.68, + None, + None, + None, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 1, 1, 22, tzinfo=datetime.timezone.utc), + 16.172967, + -16.672, + 180.23, + 4.77, + datetime.datetime(2023, 12, 27, 0, 1, 22, tzinfo=datetime.timezone.utc), + 16.252783, + -16.671667, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 2, 1, 22, tzinfo=datetime.timezone.utc), + 16.092283, + -16.68325, + 187.63, + 4.86, + datetime.datetime(2023, 12, 27, 1, 1, 22, tzinfo=datetime.timezone.utc), + 16.172967, + -16.672, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 3, 1, 22, tzinfo=datetime.timezone.utc), + 16.078967, + -16.7444, + 257.24, + 3.62, + datetime.datetime(2023, 12, 27, 2, 1, 22, tzinfo=datetime.timezone.utc), + 16.092283, + -16.68325, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 4, 1, 22, tzinfo=datetime.timezone.utc), + 16.094583, + -16.682083, + 75.37, + 3.72, + datetime.datetime(2023, 12, 27, 3, 1, 22, tzinfo=datetime.timezone.utc), + 16.078967, + -16.7444, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 5, 1, 22, tzinfo=datetime.timezone.utc), + 16.1258, + -16.6983, + 333.48, + 2.09, + datetime.datetime(2023, 12, 27, 4, 1, 22, tzinfo=datetime.timezone.utc), + 16.094583, + -16.682083, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 6, 1, 22, tzinfo=datetime.timezone.utc), + 16.067667, + -16.705017, + 186.34, + 3.5, + datetime.datetime(2023, 12, 27, 5, 1, 22, tzinfo=datetime.timezone.utc), + 16.1258, + -16.6983, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 7, 1, 22, tzinfo=datetime.timezone.utc), + 16.068, + -16.78825, + 270.25, + 4.81, + datetime.datetime(2023, 12, 27, 6, 1, 22, tzinfo=datetime.timezone.utc), + 16.067667, + -16.705017, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 8, 1, 22, tzinfo=datetime.timezone.utc), + 16.119367, + -16.827733, + 323.56, + 3.82, + datetime.datetime(2023, 12, 27, 7, 1, 22, tzinfo=datetime.timezone.utc), + 16.068, + -16.78825, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 9, 1, 22, tzinfo=datetime.timezone.utc), + 16.105217, + -16.796133, + 114.99, + 2.01, + datetime.datetime(2023, 12, 27, 8, 1, 22, tzinfo=datetime.timezone.utc), + 16.119367, + -16.827733, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 10, 1, 22, tzinfo=datetime.timezone.utc), + 16.1781, + -16.793033, + 2.34, + 4.36, + datetime.datetime(2023, 12, 27, 9, 1, 22, tzinfo=datetime.timezone.utc), + 16.105217, + -16.796133, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 11, 1, 22, tzinfo=datetime.timezone.utc), + 16.131883, + -16.8035, + 192.27, + 2.83, + datetime.datetime(2023, 12, 27, 10, 1, 22, tzinfo=datetime.timezone.utc), + 16.1781, + -16.793033, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 12, 1, 22, tzinfo=datetime.timezone.utc), + 16.162917, + -16.8503, + 304.63, + 3.28, + datetime.datetime(2023, 12, 27, 11, 1, 22, tzinfo=datetime.timezone.utc), + 16.131883, + -16.8035, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 13, 1, 22, tzinfo=datetime.timezone.utc), + 16.093383, + -16.731867, + 121.42, + 8.0, + datetime.datetime(2023, 12, 27, 12, 1, 22, tzinfo=datetime.timezone.utc), + 16.162917, + -16.8503, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 14, 1, 22, tzinfo=datetime.timezone.utc), + 16.119217, + -16.735867, + 351.54, + 1.56, + datetime.datetime(2023, 12, 27, 13, 1, 22, tzinfo=datetime.timezone.utc), + 16.093383, + -16.731867, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 15, 1, 22, tzinfo=datetime.timezone.utc), + 16.150217, + -16.749867, + 336.55, + 2.02, + datetime.datetime(2023, 12, 27, 14, 1, 22, tzinfo=datetime.timezone.utc), + 16.119217, + -16.735867, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 16, 1, 22, tzinfo=datetime.timezone.utc), + 16.162617, + -16.744583, + 22.26, + 0.8, + datetime.datetime(2023, 12, 27, 15, 1, 22, tzinfo=datetime.timezone.utc), + 16.150217, + -16.749867, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 17, 1, 22, tzinfo=datetime.timezone.utc), + 16.084217, + -16.744283, + 179.79, + 4.68, + datetime.datetime(2023, 12, 27, 16, 1, 22, tzinfo=datetime.timezone.utc), + 16.162617, + -16.744583, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 18, 1, 22, tzinfo=datetime.timezone.utc), + 16.1046, + -16.721633, + 46.87, + 1.79, + datetime.datetime(2023, 12, 27, 17, 1, 22, tzinfo=datetime.timezone.utc), + 16.084217, + -16.744283, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 19, 1, 22, tzinfo=datetime.timezone.utc), + 16.181833, + -16.736117, + 349.79, + 4.69, + datetime.datetime(2023, 12, 27, 18, 1, 22, tzinfo=datetime.timezone.utc), + 16.1046, + -16.721633, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 20, 1, 22, tzinfo=datetime.timezone.utc), + 16.253017, + -16.697233, + 27.67, + 4.81, + datetime.datetime(2023, 12, 27, 19, 1, 22, tzinfo=datetime.timezone.utc), + 16.181833, + -16.736117, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 21, 1, 22, tzinfo=datetime.timezone.utc), + 16.3347, + -16.67925, + 11.93, + 4.99, + datetime.datetime(2023, 12, 27, 20, 1, 22, tzinfo=datetime.timezone.utc), + 16.253017, + -16.697233, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 22, 1, 22, tzinfo=datetime.timezone.utc), + 16.274333, + -16.672617, + 173.98, + 3.63, + datetime.datetime(2023, 12, 27, 21, 1, 22, tzinfo=datetime.timezone.utc), + 16.3347, + -16.67925, + ), + ( + "312891000", + datetime.datetime(2023, 12, 27, 23, 1, 22, tzinfo=datetime.timezone.utc), + 16.194967, + -16.677633, + 183.47, + 4.75, + datetime.datetime(2023, 12, 27, 22, 1, 22, tzinfo=datetime.timezone.utc), + 16.274333, + -16.672617, + ), +] diff --git a/packages/pipe-vms-ingestion/tests/vms_ingestion/common/transforms/test_calculate_implied_course.py b/packages/pipe-vms-ingestion/tests/vms_ingestion/common/transforms/test_calculate_implied_course.py new file mode 100644 index 0000000..2f91219 --- /dev/null +++ b/packages/pipe-vms-ingestion/tests/vms_ingestion/common/transforms/test_calculate_implied_course.py @@ -0,0 +1,32 @@ +import pytest +from common.transforms.calculate_implied_course import calculate_implied_course +from tests.vms_ingestion.common.transforms.data.implied_speed_course import ( + MESSAGES_WITH_IMPLIED_SPEED_AND_COURSE, +) + + +class TestCalculateImpliedCourse: + + @pytest.mark.parametrize( + "id,timestamp,lat,lon,course,speed,prev_timestamp,prev_lat,prev_lon", + MESSAGES_WITH_IMPLIED_SPEED_AND_COURSE, + ) + def test_calculate_implied_course( + self, id, timestamp, lat, lon, course, speed, prev_timestamp, prev_lat, prev_lon + ): + implied_course = calculate_implied_course(prev_lat, prev_lon, lat, lon) + + course_is_not_defined = ( + implied_course is None + and prev_timestamp is None + and prev_lat is None + and prev_lon is None + ) + # when course is defined + # calculated course matches approximatelly (1% tolerance) the expected course + # near the 360º calculated course matches approximatelly (1% tolerance) the expected course + assert ( + course_is_not_defined + or implied_course == pytest.approx(course, abs=360 * 0.01) + or abs(360 - implied_course) == pytest.approx(course, abs=360 * 0.01) + ) diff --git a/packages/pipe-vms-ingestion/tests/vms_ingestion/common/transforms/test_calculate_implied_speed.py b/packages/pipe-vms-ingestion/tests/vms_ingestion/common/transforms/test_calculate_implied_speed.py new file mode 100644 index 0000000..0156c00 --- /dev/null +++ b/packages/pipe-vms-ingestion/tests/vms_ingestion/common/transforms/test_calculate_implied_speed.py @@ -0,0 +1,27 @@ +import pytest +from common.transforms.calculate_implied_speed import calculate_implied_speed_kt +from tests.vms_ingestion.common.transforms.data.implied_speed_course import ( + MESSAGES_WITH_IMPLIED_SPEED_AND_COURSE, +) + + +class TestCalculateImpliedSpeed: + + @pytest.mark.parametrize( + "id,timestamp,lat,lon,course,speed,prev_timestamp,prev_lat,prev_lon", + MESSAGES_WITH_IMPLIED_SPEED_AND_COURSE, + ) + def test_calculate_implied_speed_kt( + self, id, timestamp, lat, lon, course, speed, prev_timestamp, prev_lat, prev_lon + ): + implied_speed = calculate_implied_speed_kt( + prev_timestamp, prev_lat, prev_lon, timestamp, lat, lon + ) + + speed_is_not_defined = ( + implied_speed is None + and prev_timestamp is None + and prev_lat is None + and prev_lon is None + ) + assert speed_is_not_defined or implied_speed == pytest.approx(speed, abs=0.1) diff --git a/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/feeds/data/raw_blz.json b/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/feeds/data/raw_blz.json new file mode 100644 index 0000000..406623f --- /dev/null +++ b/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/feeds/data/raw_blz.json @@ -0,0 +1,54 @@ +[ + { + "receiveDate": "2024-01-01 22:15:19+00:00", + "course": null, + "speed": null, + "id": "1082", + "imo": "789123456", + "callsign": null, + "name": "ERTYUIO", + "mmsi": null, + "lon": 1.840367, + "timestamp": "2024-01-01 21:50:21+00:00", + "lat": -35.561983 + }, + { + "receiveDate": "2024-01-01 13:15:22+00:00", + "course": null, + "speed": null, + "id": "2222", + "imo": null, + "callsign": null, + "name": "ZAIRA", + "mmsi": null, + "lon": -47.32655, + "timestamp": "2024-01-01 12:08:35+00:00", + "lat": 6.4383 + }, + { + "receiveDate": "2024-01-01 13:15:21+00:00", + "course": null, + "speed": null, + "id": "2222", + "imo": null, + "callsign": null, + "name": "ZAIRA", + "mmsi": null, + "lon": -47.3742, + "timestamp": "2024-01-01 12:31:00+00:00", + "lat": 6.451333 + }, + { + "receiveDate": "2024-01-01 18:15:22+00:00", + "course": null, + "speed": null, + "id": "402", + "imo": "9876543", + "callsign": null, + "name": "EUGENIA", + "mmsi": "31231231", + "lon": -21.080833, + "timestamp": "2024-01-01 17:45:54+00:00", + "lat": 13.857133 + } +] diff --git a/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/feeds/test_blz_normalize.py b/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/feeds/test_blz_normalize.py new file mode 100644 index 0000000..1aeb3f9 --- /dev/null +++ b/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/feeds/test_blz_normalize.py @@ -0,0 +1,179 @@ +import os +import unittest +from datetime import date, datetime, timezone + +import apache_beam as beam +from apache_beam import pvalue +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from tests.util import pcol_equal_to, read_json +from vms_ingestion.normalization import build_pipeline_options_with_defaults +from vms_ingestion.normalization.feeds.blz_normalize import BLZNormalize + +script_path = os.path.dirname(os.path.abspath(__file__)) + + +class TestBLZNormalize(unittest.TestCase): + + options = build_pipeline_options_with_defaults( + argv=[ + "--country_code=blz", + '--source=""', + '--destination=""', + '--start_date=""', + '--end_date=""', + ] + ) + + # Our input data, which will make up the initial PCollection. + RECORDS = [ + { + **x, + "receiveDate": datetime.fromisoformat(x["receiveDate"]), + "timestamp": datetime.fromisoformat(x["timestamp"]), + } + for x in read_json(f"{script_path}/data/raw_blz.json") + ] + + # Our output data, which is the expected data that the final PCollection must match. + EXPECTED = [ + { + "callsign": None, + "class_b_cs_flag": None, + "course": None, + "destination": None, + "heading": None, + "imo": None, + "ingested_at": None, + "lat": -35.561983, + "length": None, + "lon": 1.840367, + "msgid": "935f98d55704bc12ef6f645883751dd09a90a25c7eb58fd5babd864c687ea019", + "received_at": datetime(2024, 1, 1, 22, 15, 19, tzinfo=timezone.utc), + "receiver": None, + "receiver_type": None, + "shipname": "ERTYUIO", + "shiptype": None, + "source": "BELIZE_VMS", + "source_fleet": None, + "source_provider": "POLESTAR", + "source_ssvid": None, + "source_tenant": "BLZ", + "source_type": "VMS", + "speed": None, + "ssvid": "5256f6f8aac982288b4bb28c05b04e86f09d71b408b7fb9114752336b1cad83e", + "status": None, + "timestamp": datetime(2024, 1, 1, 21, 50, 21, tzinfo=timezone.utc), + "timestamp_date": date(2024, 1, 1), + "type": "VMS", + "width": None, + }, + { + "callsign": None, + "class_b_cs_flag": None, + "course": None, + "destination": None, + "heading": None, + "imo": None, + "ingested_at": None, + "lat": 6.4383, + "length": None, + "lon": -47.32655, + "msgid": "a74eb2931b88b1d3c074f8c44e2262c9d16761271c7ccaf815e37517ab38f0b3", + "received_at": datetime(2024, 1, 1, 13, 15, 22, tzinfo=timezone.utc), + "receiver": None, + "receiver_type": None, + "shipname": "ZAIRA", + "shiptype": None, + "source": "BELIZE_VMS", + "source_fleet": None, + "source_provider": "POLESTAR", + "source_ssvid": None, + "source_tenant": "BLZ", + "source_type": "VMS", + "speed": None, + "ssvid": "7e351bda344871efa0464ad9187ab35cc37e3f847a310cac5e9a4c9fc7456b49", + "status": None, + "timestamp": datetime(2024, 1, 1, 12, 8, 35, tzinfo=timezone.utc), + "timestamp_date": date(2024, 1, 1), + "type": "VMS", + "width": None, + }, + { + "callsign": None, + "class_b_cs_flag": None, + "course": 285.39249139403387, + "destination": None, + "heading": None, + "imo": None, + "ingested_at": None, + "lat": 6.451333, + "length": None, + "lon": -47.3742, + "msgid": "5435473036df2a6e75c43e60e5a27a4cd1f16916f4ca851bb23b409214ff4b94", + "received_at": datetime(2024, 1, 1, 13, 15, 21, tzinfo=timezone.utc), + "receiver": None, + "receiver_type": None, + "shipname": "ZAIRA", + "shiptype": None, + "source": "BELIZE_VMS", + "source_fleet": None, + "source_provider": "POLESTAR", + "source_ssvid": None, + "source_tenant": "BLZ", + "source_type": "VMS", + "speed": 7.892110928616131, + "ssvid": "7e351bda344871efa0464ad9187ab35cc37e3f847a310cac5e9a4c9fc7456b49", + "status": None, + "timestamp": datetime(2024, 1, 1, 12, 31, tzinfo=timezone.utc), + "timestamp_date": date(2024, 1, 1), + "type": "VMS", + "width": None, + }, + { + "callsign": None, + "class_b_cs_flag": None, + "course": None, + "destination": None, + "heading": None, + "imo": "9876543", + "ingested_at": None, + "lat": 13.857133, + "length": None, + "lon": -21.080833, + "msgid": "70a8ff7081d9836488d278058004742b2bba4b6086e0b62a9d64b8fcfa3b8616", + "received_at": datetime(2024, 1, 1, 18, 15, 22, tzinfo=timezone.utc), + "receiver": None, + "receiver_type": None, + "shipname": "EUGENIA", + "shiptype": None, + "source": "BELIZE_VMS", + "source_fleet": None, + "source_provider": "POLESTAR", + "source_ssvid": None, + "source_tenant": "BLZ", + "source_type": "VMS", + "speed": None, + "ssvid": "a4bffdd073c9a18d9d410a75afc8317b1c03c0dbc1f4b95b22627a02337fa42c", + "status": None, + "timestamp": datetime(2024, 1, 1, 17, 45, 54, tzinfo=timezone.utc), + "timestamp_date": date(2024, 1, 1), + "type": "VMS", + "width": None, + }, + ] + + # Example test that tests the pipeline's transforms. + def test_normalize(self): + with TestPipeline(options=TestBLZNormalize.options) as p: + + # Create a PCollection from the RECORDS static input data. + input = p | beam.Create(TestBLZNormalize.RECORDS) + + # Run ALL the pipeline's transforms (in this case, the Normalize transform). + output: pvalue.PCollection = input | BLZNormalize(feed="blz") + + # Assert that the output PCollection matches the EXPECTED data. + assert_that( + output, pcol_equal_to(TestBLZNormalize.EXPECTED), label="CheckOutput" + ) diff --git a/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/feeds/test_pan_normalize.py b/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/feeds/test_pan_normalize.py index 5ee5230..cec4625 100644 --- a/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/feeds/test_pan_normalize.py +++ b/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/feeds/test_pan_normalize.py @@ -47,7 +47,7 @@ class TestPANNormalize(unittest.TestCase): "lat": 5.0, "length": None, "lon": -135.16, - "mmsi": "None", + "mmsi": None, "msgid": "23189111b24c09f93ed697ee840205c97ca253aef0e726e6037b9bbbaf634885", "received_at": None, "receiver": None, @@ -111,13 +111,13 @@ class TestPANNormalize(unittest.TestCase): "lat": 35.042, "length": None, "lon": 138.511, - "mmsi": "None", + "mmsi": None, "msgid": "d9f8d4229bf70996affd5537b1f1bc7e57475b7dbf747f1c1e6ff931809875af", "received_at": None, "receiver": None, "receiver_type": None, "shipname": "MICHELE", - "shiptype": "NONE", + "shiptype": None, "source": "PANAMA_VMS", "source_fleet": None, "source_provider": "ARAP", diff --git a/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/transforms/test_blz_map_source_message.py b/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/transforms/test_blz_map_source_message.py new file mode 100644 index 0000000..b995c65 --- /dev/null +++ b/packages/pipe-vms-ingestion/tests/vms_ingestion/normalization/transforms/test_blz_map_source_message.py @@ -0,0 +1,74 @@ +import unittest +from datetime import datetime, timezone + +import apache_beam as beam +from apache_beam import pvalue +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from tests.util import pcol_equal_to +from vms_ingestion.normalization import build_pipeline_options_with_defaults +from vms_ingestion.normalization.transforms.blz_map_source_message import ( + BLZMapSourceMessage, +) + + +class TestBLZMapSourceMessage(unittest.TestCase): + options = build_pipeline_options_with_defaults( + argv=[ + "--country_code=blz", + '--source=""', + '--destination=""', + '--start_date=""', + '--end_date=""', + ] + ) + # Our input data, which will make up the initial PCollection. + RECORDS = [ + { + "receiveDate": datetime.fromisoformat("2024-01-01 22:15:19+00:00"), + "course": None, + "speed": None, + "id": "1122", + "imo": "789123456", + "callsign": None, + "name": "ERTYUIO", + "mmsi": None, + "lon": 1.840367, + "timestamp": datetime.fromisoformat("2024-01-01 21:50:21+00:00"), + "lat": -35.561983, + } + ] + + # Our output data, which is the expected data that the final PCollection must match. + EXPECTED = [ + { + "callsign": None, + "course": None, + "imo": "789123456", + "lat": -35.561983, + "lon": 1.840367, + "received_at": datetime(2024, 1, 1, 22, 15, 19, tzinfo=timezone.utc), + "shipname": "ERTYUIO", + "shiptype": None, + "speed": None, + "ssvid": "1122", + "timestamp": datetime(2024, 1, 1, 21, 50, 21, tzinfo=timezone.utc), + }, + ] + + # Tests the transform. + def test_blz_map_source_message(self): + with TestPipeline(options=TestBLZMapSourceMessage.options) as p: + + # Create a PCollection from the RECORDS static input data. + input = p | beam.Create(TestBLZMapSourceMessage.RECORDS) + + # Run ALL the pipeline's transforms (in this case, the Normalize transform). + output: pvalue.PCollection = input | BLZMapSourceMessage() + + # Assert that the output PCollection matches the EXPECTED data. + assert_that( + output, + pcol_equal_to(TestBLZMapSourceMessage.EXPECTED), + label="CheckOutput", + ) diff --git a/packages/pipe-vms-ingestion/vms_ingestion/normalization/feed_normalization_factory.py b/packages/pipe-vms-ingestion/vms_ingestion/normalization/feed_normalization_factory.py index e89fba3..24414b3 100644 --- a/packages/pipe-vms-ingestion/vms_ingestion/normalization/feed_normalization_factory.py +++ b/packages/pipe-vms-ingestion/vms_ingestion/normalization/feed_normalization_factory.py @@ -1,4 +1,5 @@ import apache_beam as beam +from vms_ingestion.normalization.feeds.blz_normalize import BLZNormalize from vms_ingestion.normalization.feeds.bra_normalize import BRANormalize from vms_ingestion.normalization.feeds.chl_normalize import CHLNormalize from vms_ingestion.normalization.feeds.cri_normalize import CRINormalize @@ -8,6 +9,7 @@ from vms_ingestion.normalization.feeds.per_normalize import PERNormalize NORMALIZER_BY_FEED = { + "blz": BLZNormalize, "bra": BRANormalize, "chl": CHLNormalize, "cri": CRINormalize, diff --git a/packages/pipe-vms-ingestion/vms_ingestion/normalization/feeds/blz_normalize.py b/packages/pipe-vms-ingestion/vms_ingestion/normalization/feeds/blz_normalize.py new file mode 100644 index 0000000..42d8bd5 --- /dev/null +++ b/packages/pipe-vms-ingestion/vms_ingestion/normalization/feeds/blz_normalize.py @@ -0,0 +1,27 @@ +import apache_beam as beam +from vms_ingestion.normalization.transforms.blz_map_source_message import ( + BLZMapSourceMessage, +) +from vms_ingestion.normalization.transforms.map_normalized_message import ( + MapNormalizedMessage, +) + + +class BLZNormalize(beam.PTransform): + + def __init__(self, feed) -> None: + self.feed = feed + self.source_provider = "POLESTAR" + self.source_format = "BELIZE_VMS" + + def expand(self, pcoll): + + return ( + pcoll + | BLZMapSourceMessage() + | MapNormalizedMessage( + feed=self.feed, + source_provider=self.source_provider, + source_format=self.source_format, + ) + ) diff --git a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/blz_map_source_message.py b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/blz_map_source_message.py new file mode 100644 index 0000000..038cd15 --- /dev/null +++ b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/blz_map_source_message.py @@ -0,0 +1,138 @@ +import functools +import re + +import apache_beam as beam +from common.transforms.calculate_implied_course import calculate_implied_course +from common.transforms.calculate_implied_speed import calculate_implied_speed_kt +from utils.convert import to_float, to_string + + +def extract_shipname_from_name(name: str) -> str: + if not name: + return None + separator = name.find(" - ") # this was for appearisons of 'ex-' before separator. + if separator > 0: + return name[:separator] + if re.match(r"^\([\w ]+\)[ \w ]+\(\w+\)$", name): + return re.match(r"^\([\w ]+\)([ \w ]+)\(\w+\)$", name).group(1).strip() + shipname_match = re.search(r"^[^\-\(]*", name) + return shipname_match.group(0).strip() if shipname_match else None + + +def extract_short_shiptype_from_name(name: str) -> str: + if not name: + return None + short_shiptype_match = re.search(r" - ([^\(]*)", name) + return short_shiptype_match.group(1).upper() if short_shiptype_match else None + + +def extract_hsfl_from_name(name: str) -> str: + if not name: + return None + if re.match(r"^\([\w ]+\)[ \w ]+\(\w+\)$", name): + return re.match(r"^\(([\w ]+)\)[ \w ]+\(\w+\)$", name).group(1).strip() + hsfl_match = re.search(r"([^\(]+)\)\(?[^\(]*\)$", name) + return hsfl_match.group(1) if hsfl_match else None + + +def extract_callsign_from_name(name: str) -> str: + if not name: + return None + callsign_match = re.search(r"\(([^\(]+)\)$", name) + return callsign_match.group(1) if callsign_match else None + + +def extract_fields_from_name(msg): + name = msg["name"] + return { + **msg, + **dict( + { + "shipname": extract_shipname_from_name(name), + "hsfl": extract_hsfl_from_name(name), + "short_shiptype": extract_short_shiptype_from_name(name), + "callsign": extract_callsign_from_name(name), + } + ), + } + + +def set_previous_attr(group): + k, msgs = group + msgs_list = list(msgs) + msgs_list.sort(key=lambda msg: msg["timestamp"]) + + def set_attr(a, b): + prev = a[-1] if len(a) > 0 else None + b["prev_timestamp"] = prev["timestamp"] if prev else None + b["prev_lat"] = prev["lat"] if prev else None + b["prev_lon"] = prev["lon"] if prev else None + a.append(b) + return a + + return functools.reduce(set_attr, msgs_list, []) + + +def implies_speed_and_course(msg): + res = msg.copy() + speed = res.pop("speed") + course = res.pop("course") + if not speed: + speed = calculate_implied_speed_kt( + msg["prev_timestamp"], + msg["prev_lat"], + msg["prev_lon"], + msg["timestamp"], + msg["lat"], + msg["lon"], + ) + if not course: + course = calculate_implied_course( + msg["prev_lat"], msg["prev_lon"], msg["lat"], msg["lon"] + ) + res["speed"] = speed + res["course"] = course + return res + + +shiptype_description = { + "LL": "longline", + "TRW": "trawler", + "SV": "other_seines", + "PS": "purse_seine", +} + + +def blz_map_source_message(msg): + if "short_shiptype" in msg: + if msg["short_shiptype"] in shiptype_description: + msg["shiptype"] = shiptype_description[msg["short_shiptype"]] + else: + msg["shiptype"] = msg["short_shiptype"] + + return { + "shipname": to_string(msg["shipname"]), + "timestamp": msg["timestamp"], + "received_at": msg["receiveDate"], + "lat": to_float(msg["lat"]), + "lon": to_float(msg["lon"]), + "speed": to_float(msg["speed"]), + "course": to_float(msg["course"]), + "ssvid": to_string(msg["id"]), + "callsign": to_string(msg["callsign"]), + "shiptype": to_string(msg["shiptype"]), + "imo": to_string(msg["imo"]), + } + + +class BLZMapSourceMessage(beam.PTransform): + def expand(self, pcoll): + return ( + pcoll + | "Extract From Name" >> beam.Map(extract_fields_from_name) + | "Group by id" >> beam.GroupBy(id=lambda msg: msg["id"]) + | "Set previous timestamp,lat,lon" >> beam.FlatMap(set_previous_attr) + | "Impling Speed and Course if not coming" + >> beam.Map(implies_speed_and_course) + | "Preliminary source fields mapping" >> beam.Map(blz_map_source_message) + ) diff --git a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/bra_map_source_message.py b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/bra_map_source_message.py index c8bb862..21bd28a 100644 --- a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/bra_map_source_message.py +++ b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/bra_map_source_message.py @@ -3,19 +3,19 @@ import re import apache_beam as beam -from utils.convert import to_float +from utils.convert import to_float, to_string def bra_map_source_message(msg): return { - "shipname": f'{msg["nome"]}'.strip(), + "shipname": to_string(msg["nome"]), "timestamp": msg["datahora"], "lat": to_float(msg["lat"].replace(",", ".")), "lon": to_float(msg["lon"].replace(",", ".")), "speed_kph": to_float(msg["speed"]), "course": to_float(msg["curso"]), - "internal_id": f'{msg["ID"]}' if msg.get("ID") else None, - "msgid": f'{msg["mID"]}' if msg.get("mID") else None, + "internal_id": to_string(msg["ID"]), + "msgid": to_string(msg["mID"]), "shiptype": bra_infer_shiptype(msg["codMarinha"]), "callsign": "", } @@ -26,7 +26,7 @@ def bra_infer_shiptype(cod_marinha): # number, the other: empty, CABOTAGEM, INDEFINIDO, PASSEIO, PESCA VOLUNT, # REBOCADOR, SERVICO, SERVIÇO, TESTE, UNDEFINED are left with no type. p = re.compile("^[A-Z|Ç]+|[ ]*$") - code = f"{cod_marinha}".upper().strip() + code = f"{cod_marinha or ''}".upper().strip() m = p.match(code) if m: diff --git a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/ecu_map_source_message.py b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/ecu_map_source_message.py index 0548d84..d32e49d 100644 --- a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/ecu_map_source_message.py +++ b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/ecu_map_source_message.py @@ -3,7 +3,7 @@ import re import apache_beam as beam -from utils.convert import to_float +from utils.convert import to_float, to_string SHIPTYPE_BY_MATRICULA = { "TI": "international traffic", @@ -17,15 +17,15 @@ def ecu_map_source_message(msg): return { - "shipname": f'{msg["nombrenave"]}'.strip(), + "shipname": to_string(msg["nombrenave"]), "timestamp": msg["utc_time"], "lat": to_float(msg["lat"]), "lon": to_float(msg["lon"]), "speed": to_float(msg["velocidad"]), "course": to_float(msg["rumbo"]), - "internal_id": f'{msg["idnave"]}' if msg.get("idnave") else None, + "internal_id": to_string(msg["idnave"]), "shiptype": ecu_infer_shiptype(msg["matriculanave"]), - "callsign": f'{msg["matriculanave"]}', + "callsign": to_string(msg["matriculanave"]), } @@ -34,7 +34,7 @@ def ecu_infer_shiptype(matriculanave): # set of specific strings prefixes = "|".join(list(SHIPTYPE_BY_MATRICULA.keys())) p = re.compile(f"^({prefixes}).*$") - code = f"{matriculanave}".upper().strip() + code = f"{matriculanave or ''}".upper().strip() m = p.match(code) if m: diff --git a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/map_normalized_message.py b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/map_normalized_message.py index 4b15bad..d39e525 100644 --- a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/map_normalized_message.py +++ b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/map_normalized_message.py @@ -7,6 +7,7 @@ standardize_int_str, standardize_str, ) +from utils.convert import to_string from vms_ingestion.normalization.transforms.calculate_msgid import get_message_id from vms_ingestion.normalization.transforms.calculate_ssvid import get_ssvid @@ -27,7 +28,7 @@ def map_normalized_message(msg, feed, source_provider, source_format): "course": msg.get("course"), "heading": msg.get("heading"), "shipname": standardize_str(msg["shipname"]), - "callsign": standardize_str(msg["callsign"]) if msg["callsign"] else None, + "callsign": to_string(standardize_str(msg["callsign"])), "destination": msg.get("destination"), "imo": standardize_imo(msg.get("imo")), "shiptype": standardize_str(msg.get("shiptype")), diff --git a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/nor_map_source_message.py b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/nor_map_source_message.py index f9b2fd3..8656232 100644 --- a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/nor_map_source_message.py +++ b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/nor_map_source_message.py @@ -1,5 +1,5 @@ import apache_beam as beam -from utils.convert import to_float +from utils.convert import to_float, to_string SHIPTYPE_BY_VESSEL_TYPE = { "FISKEFARTØY": "fishing", @@ -11,8 +11,8 @@ def nor_map_source_message(msg): return { - "shipname": f'{msg["vessel_name"]}'.strip(), - "msgid": f'{msg["message_id"]}'.strip(), + "shipname": to_string(msg["vessel_name"]), + "msgid": to_string(msg["message_id"]), "timestamp": msg["timestamp_utc"], "lat": to_float(msg["lat"]), "length": to_float(msg["length"]), @@ -20,7 +20,7 @@ def nor_map_source_message(msg): "speed": to_float(msg["speed"]), "course": to_float(msg["course"]), "shiptype": nor_infer_shiptype(msg["vessel_type"]), - "callsign": f'{msg["callsign"]}', + "callsign": to_string(msg["callsign"]), } diff --git a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/pan_map_source_message.py b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/pan_map_source_message.py index b9d93e6..d4da0f8 100644 --- a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/pan_map_source_message.py +++ b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/pan_map_source_message.py @@ -1,18 +1,18 @@ import apache_beam as beam from shipdataprocess.normalize import normalize_shipname -from utils.convert import to_float +from utils.convert import to_float, to_string def pan_map_source_message(msg): return { - "callsign": f'{msg["callsign"]}'.strip(), + "callsign": to_string(msg["callsign"]), "course": to_float(msg["course"]), - "imo": f'{msg["imo"]}'.strip(), + "imo": to_string(msg["imo"]), "lat": to_float(msg["lat"]), "lon": to_float(msg["lon"]), - "mmsi": f'{msg["mmsi"]}'.strip(), - "shipname": f'{msg["shipname"]}'.strip(), - "shiptype": f'{msg["shiptype"]}'.strip(), + "mmsi": to_string(msg["mmsi"]), + "shipname": to_string(msg["shipname"]), + "shiptype": to_string(msg["shiptype"]), "speed": to_float(msg["speed"]), "ssvid": normalize_shipname(msg.get("shipname")), "timestamp": msg["timestamp"], diff --git a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/per_map_source_message.py b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/per_map_source_message.py index cc4b844..10a3a8d 100644 --- a/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/per_map_source_message.py +++ b/packages/pipe-vms-ingestion/vms_ingestion/normalization/transforms/per_map_source_message.py @@ -1,7 +1,7 @@ import datetime as dt import apache_beam as beam -from utils.convert import to_float +from utils.convert import to_float, to_string FLEET_BY_REGIMEN_DESCRIPTION = { "ARTESANAL": "artisanal", @@ -27,15 +27,15 @@ def per_map_source_message(msg): return { - "shipname": f'{msg["nickname"]}'.strip(), - "timestamp": msg["DATETRANSMISSION"] - + dt.timedelta(hours=5), # convert timestamp from peru timezone to utc + "shipname": to_string(msg["nickname"]), + # convert timestamp from peru timezone to utc + "timestamp": msg["DATETRANSMISSION"] + dt.timedelta(hours=5), "fleet": per_infer_fleet(msg["DESC_REGIMEN"]), "lat": to_float(msg["LATITUDE"]), "lon": to_float(msg["LONGITUDE"]), "speed": to_float(msg["SPEED"]), "course": to_float(msg["COURSE"]), - "ssvid": f'{msg["PLATE"]}'.strip(), + "ssvid": to_string(msg["PLATE"]), "callsign": None, }