-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutilities.py
132 lines (101 loc) · 4.71 KB
/
utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from datetime import datetime
from datetime import timedelta
from typing import Tuple
from itertools import combinations
import datastore_pb2 as dstore
from fastapi import HTTPException
from google.protobuf.timestamp_pb2 import Timestamp
from grpc_getter import get_ts_ag_request
from pydantic import AwareDatetime
from pydantic import TypeAdapter
def get_datetime_range(datetime_string: str | None) -> Tuple[Timestamp, Timestamp] | None:
if not datetime_string:
return None
errors = {}
start_datetime, end_datetime = Timestamp(), Timestamp()
aware_datetime_type_adapter = TypeAdapter(AwareDatetime)
try:
datetimes = tuple(value.strip() for value in datetime_string.split("/"))
if len(datetimes) == 1:
start_datetime.FromDatetime(aware_datetime_type_adapter.validate_python(datetimes[0]))
end_datetime.FromDatetime(
aware_datetime_type_adapter.validate_python(datetimes[0]) + timedelta(seconds=1)
) # HACK: Add one second so we get some data, as the store returns [start, end)
else:
if datetimes[0] != "..":
start_datetime.FromDatetime(aware_datetime_type_adapter.validate_python(datetimes[0]))
else:
start_datetime.FromDatetime(datetime.min)
if datetimes[1] != "..":
# HACK add one second so that the end_datetime is included in the interval.
end_datetime.FromDatetime(
aware_datetime_type_adapter.validate_python(datetimes[1]) + timedelta(seconds=1)
)
else:
end_datetime.FromDatetime(datetime.max)
if start_datetime.seconds > end_datetime.seconds:
errors["datetime"] = f"Invalid range: {datetimes[0]} > {datetimes[1]}"
except ValueError:
errors["datetime"] = f"Invalid format: {datetime_string}"
if errors:
raise HTTPException(status_code=400, detail=errors)
return start_datetime, end_datetime
# @cached(ttl=600)
async def get_current_parameter_names():
"""
This function get a set of standard_names currently in the datastore
The ttl_hash should be a value that is updated at the same frequency
we want the lru_cache to be valid for.
"""
unique_parameter_names = dstore.GetTSAGRequest(attrs=["parameter_name"])
unique_parameter_names = await get_ts_ag_request(unique_parameter_names)
return {i.combo.parameter_name for i in unique_parameter_names.groups}
async def verify_parameter_names(parameter_names: list) -> None:
"""
Function for verifying that the given parameter names are valid.
Raises error with unknown names if any are found.
"""
unknown_parameter_names = []
for i in parameter_names:
if i not in await get_current_parameter_names():
unknown_parameter_names.append(i)
if unknown_parameter_names:
raise HTTPException(400, detail=f"Unknown parameter-name {unknown_parameter_names}")
def create_url_from_request(request):
# The server root_path contains the path added by a reverse proxy
base_path = request.scope.get("root_path")
# The host will (should) be correctly set from X-Forwarded-Host and X-Forwarded-Scheme
# headers by any proxy in front of it
host = request.headers["host"]
scheme = request.url.scheme
return f"{scheme}://{host}{base_path}/collections"
def split_and_strip(cs_string: str) -> list[str]:
return [i.strip() for i in cs_string.split(",")]
def validate_bbox(bbox: str) -> Tuple[float, float, float, float]:
"""
Function for validating the bbox parameter.
Raises error with invalid bbox if any are found.
"""
errors = {}
try:
left, bottom, right, top = map(float, map(str.strip, bbox.split(",")))
except ValueError:
errors["bbox"] = f"Invalid format: {bbox}"
else:
if left > right or bottom > top:
errors["range"] = f"Invalid bbox range: {bbox}"
if abs(left - right) > 90 or abs(bottom - top) > 90:
errors["range"] = f"Maximum bbox range is 90 degrees: {bbox}"
if not -180 <= left <= 180 or not -180 <= right <= 180:
errors["longitude"] = f"Invalid longitude: {bbox}"
if not -90 <= bottom <= 90 or not -90 <= top <= 90:
errors["latitude"] = f"Invalid latitude: {bbox}"
if errors:
raise HTTPException(status_code=400, detail=errors)
return left, bottom, right, top
def calculate_largest_postition_deviation(positions: list) -> float:
largest_deviation = 0
for i, j in combinations(positions, 2):
if (dist := ((i[0] - j[0]) ** 2) + ((i[1] - j[1]) ** 2) ** 0.5) > largest_deviation:
largest_deviation = dist
return largest_deviation