-
Notifications
You must be signed in to change notification settings - Fork 41
/
datacube.py
2755 lines (2355 loc) · 128 KB
/
datacube.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
The main module for creating earth observation processes. It aims to easily build complex process chains, that can
be evaluated by an openEO backend.
.. data:: THIS
Symbolic reference to the current data cube, to be used as argument in :py:meth:`DataCube.process()` calls
"""
from __future__ import annotations
import datetime
import logging
import pathlib
import typing
import warnings
from builtins import staticmethod
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union
import numpy as np
import requests
import shapely.geometry
import shapely.geometry.base
from shapely.geometry import MultiPolygon, Polygon, mapping
from openeo.api.process import Parameter
from openeo.dates import get_temporal_extent
from openeo.internal.documentation import openeo_process
from openeo.internal.graph_building import PGNode, ReduceNode, _FromNodeMixin
from openeo.internal.jupyter import in_jupyter_context
from openeo.internal.processes.builder import (
ProcessBuilderBase,
convert_callable_to_pgnode,
get_parameter_names,
)
from openeo.internal.warnings import UserDeprecationWarning, deprecated, legacy_alias
from openeo.metadata import (
Band,
BandDimension,
CollectionMetadata,
SpatialDimension,
TemporalDimension,
metadata_from_stac,
)
from openeo.processes import ProcessBuilder
from openeo.rest import BandMathException, OpenEoClientException, OperatorException
from openeo.rest._datacube import (
THIS,
UDF,
_ensure_save_result,
_ProcessGraphAbstraction,
build_child_callback,
)
from openeo.rest.graph_building import CollectionProperty
from openeo.rest.job import BatchJob, RESTJob
from openeo.rest.mlmodel import MlModel
from openeo.rest.service import Service
from openeo.rest.udp import RESTUserDefinedProcess
from openeo.rest.vectorcube import VectorCube
from openeo.util import dict_no_none, guess_format, normalize_crs, rfc3339
if typing.TYPE_CHECKING:
# Imports for type checking only (circular import issue at runtime).
import xarray
from openeo.rest.connection import Connection
from openeo.udf import XarrayDataCube
log = logging.getLogger(__name__)
# Type annotation aliases
InputDate = Union[str, datetime.date, Parameter, PGNode, ProcessBuilderBase, None]
class DataCube(_ProcessGraphAbstraction):
"""
Class representing a openEO (raster) data cube.
The data cube is represented by its corresponding openeo "process graph"
and this process graph can be "grown" to a desired workflow by calling the appropriate methods.
"""
# TODO: set this based on back-end or user preference?
_DEFAULT_RASTER_FORMAT = "GTiff"
def __init__(self, graph: PGNode, connection: Optional[Connection], metadata: Optional[CollectionMetadata] = None):
super().__init__(pgnode=graph, connection=connection)
self.metadata: Optional[CollectionMetadata] = metadata
def process(
self,
process_id: str,
arguments: Optional[dict] = None,
metadata: Optional[CollectionMetadata] = None,
namespace: Optional[str] = None,
**kwargs,
) -> DataCube:
"""
Generic helper to create a new DataCube by applying a process.
:param process_id: process id of the process.
:param arguments: argument dictionary for the process.
:param metadata: optional: metadata to override original cube metadata (e.g. when reducing dimensions)
:param namespace: optional: process namespace
:return: new DataCube instance
"""
pg = self._build_pgnode(process_id=process_id, arguments=arguments, namespace=namespace, **kwargs)
return DataCube(graph=pg, connection=self._connection, metadata=metadata or self.metadata)
graph_add_node = legacy_alias(process, "graph_add_node", since="0.1.1")
def process_with_node(self, pg: PGNode, metadata: Optional[CollectionMetadata] = None) -> DataCube:
"""
Generic helper to create a new DataCube by applying a process (given as process graph node)
:param pg: process graph node (containing process id and arguments)
:param metadata: optional: metadata to override original cube metadata (e.g. when reducing dimensions)
:return: new DataCube instance
"""
# TODO: deep copy `self.metadata` instead of using same instance?
# TODO: cover more cases where metadata has to be altered
# TODO: deprecate `process_with_node``: little added value over just calling DataCube() directly
return DataCube(graph=pg, connection=self._connection, metadata=metadata or self.metadata)
def _do_metadata_normalization(self) -> bool:
"""Do metadata-based normalization/validation of dimension names, band names, ..."""
return isinstance(self.metadata, CollectionMetadata)
def _assert_valid_dimension_name(self, name: str) -> str:
if self._do_metadata_normalization():
self.metadata.assert_valid_dimension(name)
return name
@classmethod
@openeo_process
def load_collection(
cls,
collection_id: Union[str, Parameter],
connection: Optional[Connection] = None,
spatial_extent: Union[Dict[str, float], Parameter, None] = None,
temporal_extent: Union[Sequence[InputDate], Parameter, str, None] = None,
bands: Union[None, List[str], Parameter] = None,
fetch_metadata: bool = True,
properties: Union[
None, Dict[str, Union[str, PGNode, typing.Callable]], List[CollectionProperty], CollectionProperty
] = None,
max_cloud_cover: Optional[float] = None,
) -> DataCube:
"""
Create a new Raster Data cube.
:param collection_id: image collection identifier
:param connection: The backend connection to use.
Can be ``None`` to work without connection and collection metadata.
:param spatial_extent: limit data to specified bounding box or polygons
:param temporal_extent: limit data to specified temporal interval.
Typically, just a two-item list or tuple containing start and end date.
See :ref:`filtering-on-temporal-extent-section` for more details on temporal extent handling and shorthand notation.
:param bands: only add the specified bands.
:param properties: limit data by metadata property predicates.
See :py:func:`~openeo.rest.graph_building.collection_property` for easy construction of such predicates.
:param max_cloud_cover: shortcut to set maximum cloud cover ("eo:cloud_cover" collection property)
:return: new DataCube containing the collection
.. versionchanged:: 0.13.0
added the ``max_cloud_cover`` argument.
.. versionchanged:: 0.23.0
Argument ``temporal_extent``: add support for year/month shorthand notation
as discussed at :ref:`date-shorthand-handling`.
.. versionchanged:: 0.26.0
Add :py:func:`~openeo.rest.graph_building.collection_property` support to ``properties`` argument.
"""
if temporal_extent:
temporal_extent = cls._get_temporal_extent(extent=temporal_extent)
if isinstance(spatial_extent, Parameter):
if spatial_extent.schema.get("type") != "object":
warnings.warn(
"Unexpected parameterized `spatial_extent` in `load_collection`:"
f" expected schema with type 'object' but got {spatial_extent.schema!r}."
)
arguments = {
'id': collection_id,
# TODO: spatial_extent could also be a "geojson" subtype object, so we might want to allow (and convert) shapely shapes as well here.
'spatial_extent': spatial_extent,
'temporal_extent': temporal_extent,
}
if isinstance(collection_id, Parameter):
fetch_metadata = False
metadata: Optional[CollectionMetadata] = (
connection.collection_metadata(collection_id) if connection and fetch_metadata else None
)
if bands:
if isinstance(bands, str):
bands = [bands]
elif isinstance(bands, Parameter):
metadata = None
if metadata:
bands = [b if isinstance(b, str) else metadata.band_dimension.band_name(b) for b in bands]
metadata = metadata.filter_bands(bands)
arguments['bands'] = bands
if isinstance(properties, list):
# TODO: warn about items that are not CollectionProperty objects instead of silently dropping them.
properties = {p.name: p.from_node() for p in properties if isinstance(p, CollectionProperty)}
if isinstance(properties, CollectionProperty):
properties = {properties.name: properties.from_node()}
elif properties is None:
properties = {}
if max_cloud_cover:
properties["eo:cloud_cover"] = lambda v: v <= max_cloud_cover
if properties:
summaries = metadata and metadata.get("summaries") or {}
undefined_properties = set(properties.keys()).difference(summaries.keys())
if undefined_properties:
warnings.warn(
f"{collection_id} property filtering with properties that are undefined "
f"in the collection metadata (summaries): {', '.join(undefined_properties)}.",
stacklevel=2,
)
arguments["properties"] = {
prop: build_child_callback(pred, parent_parameters=["value"]) for prop, pred in properties.items()
}
pg = PGNode(
process_id='load_collection',
arguments=arguments
)
return cls(graph=pg, connection=connection, metadata=metadata)
create_collection = legacy_alias(
load_collection, name="create_collection", since="0.4.6"
)
@classmethod
@deprecated(reason="Depends on non-standard process, replace with :py:meth:`openeo.rest.connection.Connection.load_stac` where possible.",version="0.25.0")
def load_disk_collection(cls, connection: Connection, file_format: str, glob_pattern: str, **options) -> DataCube:
"""
Loads image data from disk as a DataCube.
This is backed by a non-standard process ('load_disk_data'). This will eventually be replaced by standard options such as
:py:meth:`openeo.rest.connection.Connection.load_stac` or https://processes.openeo.org/#load_uploaded_files
:param connection: The connection to use to connect with the backend.
:param file_format: the file format, e.g. 'GTiff'
:param glob_pattern: a glob pattern that matches the files to load from disk
:param options: options specific to the file format
:return: the data as a DataCube
"""
pg = PGNode(
process_id='load_disk_data',
arguments={
'format': file_format,
'glob_pattern': glob_pattern,
'options': options
}
)
return cls(graph=pg, connection=connection)
@classmethod
def load_stac(
cls,
url: str,
spatial_extent: Union[Dict[str, float], Parameter, None] = None,
temporal_extent: Union[Sequence[InputDate], Parameter, str, None] = None,
bands: Optional[List[str]] = None,
properties: Optional[Dict[str, Union[str, PGNode, Callable]]] = None,
connection: Optional[Connection] = None,
) -> DataCube:
"""
Loads data from a static STAC catalog or a STAC API Collection and returns the data as a processable :py:class:`DataCube`.
A batch job result can be loaded by providing a reference to it.
If supported by the underlying metadata and file format, the data that is added to the data cube can be
restricted with the parameters ``spatial_extent``, ``temporal_extent`` and ``bands``.
If no data is available for the given extents, a ``NoDataAvailable`` error is thrown.
Remarks:
* The bands (and all dimensions that specify nominal dimension labels) are expected to be ordered as
specified in the metadata if the ``bands`` parameter is set to ``null``.
* If no additional parameter is specified this would imply that the whole data set is expected to be loaded.
Due to the large size of many data sets, this is not recommended and may be optimized by back-ends to only
load the data that is actually required after evaluating subsequent processes such as filters.
This means that the values should be processed only after the data has been limited to the required extent
and as a consequence also to a manageable size.
:param url: The URL to a static STAC catalog (STAC Item, STAC Collection, or STAC Catalog)
or a specific STAC API Collection that allows to filter items and to download assets.
This includes batch job results, which itself are compliant to STAC.
For external URLs, authentication details such as API keys or tokens may need to be included in the URL.
Batch job results can be specified in two ways:
- For Batch job results at the same back-end, a URL pointing to the corresponding batch job results
endpoint should be provided. The URL usually ends with ``/jobs/{id}/results`` and ``{id}``
is the corresponding batch job ID.
- For external results, a signed URL must be provided. Not all back-ends support signed URLs,
which are provided as a link with the link relation `canonical` in the batch job result metadata.
:param spatial_extent:
Limits the data to load to the specified bounding box or polygons.
For raster data, the process loads the pixel into the data cube if the point at the pixel center intersects
with the bounding box or any of the polygons (as defined in the Simple Features standard by the OGC).
For vector data, the process loads the geometry into the data cube if the geometry is fully within the
bounding box or any of the polygons (as defined in the Simple Features standard by the OGC).
Empty geometries may only be in the data cube if no spatial extent has been provided.
The GeoJSON can be one of the following feature types:
* A ``Polygon`` or ``MultiPolygon`` geometry,
* a ``Feature`` with a ``Polygon`` or ``MultiPolygon`` geometry, or
* a ``FeatureCollection`` containing at least one ``Feature`` with ``Polygon`` or ``MultiPolygon`` geometries.
Set this parameter to ``None`` to set no limit for the spatial extent.
Be careful with this when loading large datasets. It is recommended to use this parameter instead of
using ``filter_bbox()`` or ``filter_spatial()`` directly after loading unbounded data.
:param temporal_extent:
Limits the data to load to the specified left-closed temporal interval.
Applies to all temporal dimensions.
The interval has to be specified as an array with exactly two elements:
1. The first element is the start of the temporal interval.
The specified instance in time is **included** in the interval.
2. The second element is the end of the temporal interval.
The specified instance in time is **excluded** from the interval.
The second element must always be greater/later than the first element.
Otherwise, a `TemporalExtentEmpty` exception is thrown.
Also supports open intervals by setting one of the boundaries to ``None``, but never both.
Set this parameter to ``None`` to set no limit for the temporal extent.
Be careful with this when loading large datasets. It is recommended to use this parameter instead of
using ``filter_temporal()`` directly after loading unbounded data.
:param bands:
Only adds the specified bands into the data cube so that bands that don't match the list
of band names are not available. Applies to all dimensions of type `bands`.
Either the unique band name (metadata field ``name`` in bands) or one of the common band names
(metadata field ``common_name`` in bands) can be specified.
If the unique band name and the common name conflict, the unique band name has a higher priority.
The order of the specified array defines the order of the bands in the data cube.
If multiple bands match a common name, all matched bands are included in the original order.
It is recommended to use this parameter instead of using ``filter_bands()`` directly after loading unbounded data.
:param properties:
Limits the data by metadata properties to include only data in the data cube which
all given conditions return ``True`` for (AND operation).
Specify key-value-pairs with the key being the name of the metadata property,
which can be retrieved with the openEO Data Discovery for Collections.
The value must be a condition (user-defined process) to be evaluated against a STAC API.
This parameter is not supported for static STAC.
:param connection: The connection to use to connect with the backend.
.. versionadded:: 0.33.0
"""
arguments = {"url": url}
# TODO #425 more normalization/validation of extent/band parameters
if spatial_extent:
arguments["spatial_extent"] = spatial_extent
if temporal_extent:
arguments["temporal_extent"] = DataCube._get_temporal_extent(extent=temporal_extent)
if bands:
arguments["bands"] = bands
if properties:
arguments["properties"] = {
prop: build_child_callback(pred, parent_parameters=["value"]) for prop, pred in properties.items()
}
graph = PGNode("load_stac", arguments=arguments)
try:
metadata = metadata_from_stac(url)
except Exception:
log.warning(f"Failed to extract cube metadata from STAC URL {url}", exc_info=True)
metadata = None
return cls(graph=graph, connection=connection, metadata=metadata)
@classmethod
def _get_temporal_extent(
cls,
*args,
start_date: InputDate = None,
end_date: InputDate = None,
extent: Union[Sequence[InputDate], Parameter, str, None] = None,
) -> Union[List[Union[str, Parameter, PGNode, None]], Parameter]:
"""Parameter aware temporal_extent normalizer"""
# TODO: move this outside of DataCube class
# TODO: return extent as tuple instead of list
if len(args) == 1 and isinstance(args[0], Parameter):
assert start_date is None and end_date is None and extent is None
return args[0]
elif len(args) == 0 and isinstance(extent, Parameter):
assert start_date is None and end_date is None
# TODO: warn about unexpected parameter schema
return extent
else:
def convertor(d: Any) -> Any:
# TODO: can this be generalized through _FromNodeMixin?
if isinstance(d, Parameter) or isinstance(d, PGNode):
# TODO: warn about unexpected parameter schema
return d
elif isinstance(d, ProcessBuilderBase):
return d.pgnode
else:
return rfc3339.normalize(d)
return list(
get_temporal_extent(*args, start_date=start_date, end_date=end_date, extent=extent, convertor=convertor)
)
@openeo_process
def filter_temporal(
self,
*args,
start_date: InputDate = None,
end_date: InputDate = None,
extent: Union[Sequence[InputDate], Parameter, str, None] = None,
) -> DataCube:
"""
Limit the DataCube to a certain date range, which can be specified in several ways:
>>> cube.filter_temporal("2019-07-01", "2019-08-01")
>>> cube.filter_temporal(["2019-07-01", "2019-08-01"])
>>> cube.filter_temporal(extent=["2019-07-01", "2019-08-01"])
>>> cube.filter_temporal(start_date="2019-07-01", end_date="2019-08-01"])
See :ref:`filtering-on-temporal-extent-section` for more details on temporal extent handling and shorthand notation.
:param start_date: start date of the filter (inclusive), as a string or date object
:param end_date: end date of the filter (exclusive), as a string or date object
:param extent: temporal extent.
Typically, specified as a two-item list or tuple containing start and end date.
.. versionchanged:: 0.23.0
Arguments ``start_date``, ``end_date`` and ``extent``:
add support for year/month shorthand notation as discussed at :ref:`date-shorthand-handling`.
"""
if len(args) == 1 and isinstance(args[0], (str)):
raise OpenEoClientException(
f"filter_temporal() with a single string argument ({args[0]!r}) is ambiguous."
f" If you want a half-unbounded interval, use something like filter_temporal({args[0]!r}, None) or use explicit keyword arguments."
f" If you want the full interval covering all of {args[0]!r}, use something like filter_temporal(extent={args[0]!r})."
)
return self.process(
process_id='filter_temporal',
arguments={
'data': THIS,
'extent': self._get_temporal_extent(*args, start_date=start_date, end_date=end_date, extent=extent)
}
)
@openeo_process
def filter_bbox(
self,
*args,
west: Optional[float] = None,
south: Optional[float] = None,
east: Optional[float] = None,
north: Optional[float] = None,
crs: Optional[Union[int, str]] = None,
base: Optional[float] = None,
height: Optional[float] = None,
bbox: Optional[Sequence[float]] = None,
) -> DataCube:
"""
Limits the data cube to the specified bounding box.
The bounding box can be specified in multiple ways.
- With keyword arguments::
>>> cube.filter_bbox(west=3, south=51, east=4, north=52, crs=4326)
- With a (west, south, east, north) list or tuple
(note that EPSG:4326 is the default CRS, so it's not necessary to specify it explicitly)::
>>> cube.filter_bbox([3, 51, 4, 52])
>>> cube.filter_bbox(bbox=[3, 51, 4, 52])
- With a bbox dictionary::
>>> bbox = {"west": 3, "south": 51, "east": 4, "north": 52, "crs": 4326}
>>> cube.filter_bbox(bbox)
>>> cube.filter_bbox(bbox=bbox)
>>> cube.filter_bbox(**bbox)
- With a shapely geometry (of which the bounding box will be used)::
>>> cube.filter_bbox(geometry)
>>> cube.filter_bbox(bbox=geometry)
- Passing a parameter::
>>> bbox_param = Parameter(name="my_bbox", schema="object")
>>> cube.filter_bbox(bbox_param)
>>> cube.filter_bbox(bbox=bbox_param)
- With a CRS other than EPSG 4326::
>>> cube.filter_bbox(
... west=652000, east=672000, north=5161000, south=5181000,
... crs=32632
... )
- Deprecated: positional arguments are also supported,
but follow a non-standard order for legacy reasons::
>>> west, east, north, south = 3, 4, 52, 51
>>> cube.filter_bbox(west, east, north, south)
:param crs: value describing the coordinate reference system.
Typically just an int (interpreted as EPSG code, e.g. ``4326``)
or a string (handled as authority string, e.g. ``"EPSG:4326"``).
See :py:func:`openeo.util.normalize_crs` for more details about additional normalization that is applied to this argument.
"""
if args and any(k is not None for k in (west, south, east, north, bbox)):
raise ValueError("Don't mix positional arguments with keyword arguments.")
if bbox and any(k is not None for k in (west, south, east, north)):
raise ValueError("Don't mix `bbox` with `west`/`south`/`east`/`north` keyword arguments.")
if args:
if 4 <= len(args) <= 5:
# Handle old-style west-east-north-south order
# TODO remove handling of this legacy order?
warnings.warn("Deprecated argument order usage: `filter_bbox(west, east, north, south)`."
" Use keyword arguments or tuple/list argument instead.")
west, east, north, south = args[:4]
if len(args) > 4:
crs = normalize_crs(args[4])
elif len(args) == 1 and (isinstance(args[0], (list, tuple)) and len(args[0]) == 4
or isinstance(args[0], (dict, shapely.geometry.base.BaseGeometry, Parameter))):
bbox = args[0]
else:
raise ValueError(args)
if isinstance(bbox, Parameter):
if bbox.schema.get("type") != "object":
warnings.warn(
"Unexpected parameterized `extent` in `filter_bbox`:"
f" expected schema with type 'object' but got {bbox.schema!r}."
)
extent = bbox
else:
if bbox:
if isinstance(bbox, shapely.geometry.base.BaseGeometry):
west, south, east, north = bbox.bounds
elif isinstance(bbox, (list, tuple)) and len(bbox) == 4:
west, south, east, north = bbox[:4]
elif isinstance(bbox, dict):
west, south, east, north = (bbox[k] for k in ["west", "south", "east", "north"])
if "crs" in bbox:
crs = bbox["crs"]
else:
raise ValueError(bbox)
extent = {'west': west, 'east': east, 'north': north, 'south': south}
extent.update(dict_no_none(crs=crs, base=base, height=height))
return self.process(
process_id='filter_bbox',
arguments={
'data': THIS,
'extent': extent
}
)
@openeo_process
def filter_spatial(self, geometries) -> DataCube:
"""
Limits the data cube over the spatial dimensions to the specified geometries.
- For polygons, the filter retains a pixel in the data cube if the point at the pixel center intersects with
at least one of the polygons (as defined in the Simple Features standard by the OGC).
- For points, the process considers the closest pixel center.
- For lines (line strings), the process considers all the pixels whose centers are closest to at least one
point on the line.
More specifically, pixels outside of the bounding box of the given geometry will not be available after filtering.
All pixels inside the bounding box that are not retained will be set to null (no data).
:param geometries: One or more geometries used for filtering, specified as GeoJSON in EPSG:4326.
:return: A data cube restricted to the specified geometries. The dimensions and dimension properties (name,
type, labels, reference system and resolution) remain unchanged, except that the spatial dimensions have less
(or the same) dimension labels.
"""
valid_geojson_types = [
"Point", "MultiPoint", "LineString", "MultiLineString",
"Polygon", "MultiPolygon", "GeometryCollection", "FeatureCollection"
]
geometries = self._get_geometry_argument(geometries, valid_geojson_types=valid_geojson_types, crs=None)
return self.process(
process_id='filter_spatial',
arguments={
'data': THIS,
'geometries': geometries
}
)
@openeo_process
def filter_bands(self, bands: Union[List[Union[str, int]], str]) -> DataCube:
"""
Filter the data cube by the given bands
:param bands: list of band names, common names or band indices. Single band name can also be given as string.
:return: a DataCube instance
"""
if isinstance(bands, str):
bands = [bands]
if self._do_metadata_normalization():
bands = [self.metadata.band_dimension.band_name(b) for b in bands]
cube = self.process(
process_id="filter_bands",
arguments={"data": THIS, "bands": bands},
metadata=self.metadata.filter_bands(bands) if self.metadata else None,
)
return cube
@openeo_process
def filter_labels(
self, condition: Union[PGNode, Callable], dimension: str, context: Optional[dict] = None
) -> DataCube:
"""
Filters the dimension labels in the data cube for the given dimension.
Only the dimension labels that match the specified condition are preserved,
all other labels with their corresponding data get removed.
:param condition: the "child callback" which will be given a single label value (number or string)
and returns a boolean expressing if the label should be preserved.
Also see :ref:`callbackfunctions`.
:param dimension: The name of the dimension to filter on.
.. versionadded:: 0.27.0
"""
condition = build_child_callback(condition, parent_parameters=["value"])
return self.process(
process_id="filter_labels",
arguments=dict_no_none(data=THIS, condition=condition, dimension=dimension, context=context),
)
band_filter = legacy_alias(filter_bands, "band_filter", since="0.1.0")
def band(self, band: Union[str, int]) -> DataCube:
"""
Filter out a single band
:param band: band name, band common name or band index.
:return: a DataCube instance
"""
if self._do_metadata_normalization():
band = self.metadata.band_dimension.band_index(band)
arguments = {"data": {"from_parameter": "data"}}
if isinstance(band, int):
arguments["index"] = band
else:
arguments["label"] = band
return self.reduce_bands(reducer=PGNode(process_id="array_element", arguments=arguments))
@openeo_process
def resample_spatial(
self, resolution: Union[float, Tuple[float, float]], projection: Union[int, str] = None,
method: str = 'near', align: str = 'upper-left'
) -> DataCube:
return self.process('resample_spatial', {
'data': THIS,
'resolution': resolution,
'projection': projection,
'method': method,
'align': align
})
def resample_cube_spatial(self, target: DataCube, method: str = "near") -> DataCube:
"""
Resamples the spatial dimensions (x,y) from a source data cube to align with the corresponding
dimensions of the given target data cube.
Returns a new data cube with the resampled dimensions.
To resample a data cube to a specific resolution or projection regardless of an existing target
data cube, refer to :py:meth:`resample_spatial`.
:param target: A data cube that describes the spatial target resolution.
:param method: Resampling method to use.
:return:
"""
return self.process("resample_cube_spatial", {"data": self, "target": target, "method": method})
@openeo_process
def resample_cube_temporal(
self, target: DataCube, dimension: Optional[str] = None, valid_within: Optional[int] = None
) -> DataCube:
"""
Resamples one or more given temporal dimensions from a source data cube to align with the corresponding
dimensions of the given target data cube using the nearest neighbor method.
Returns a new data cube with the resampled dimensions.
By default, this process simply takes the nearest neighbor independent of the value (including values such as
no-data / ``null``). Depending on the data cubes this may lead to values being assigned to two target timestamps.
To only consider valid values in a specific range around the target timestamps, use the parameter ``valid_within``.
The rare case of ties is resolved by choosing the earlier timestamps.
:param target: A data cube that describes the temporal target resolution.
:param dimension: The name of the temporal dimension to resample.
:param valid_within:
:return:
.. versionadded:: 0.10.0
"""
return self.process(
"resample_cube_temporal",
dict_no_none({"data": self, "target": target, "dimension": dimension, "valid_within": valid_within})
)
def _operator_binary(self, operator: str, other: Union[DataCube, int, float], reverse=False) -> DataCube:
"""Generic handling of (mathematical) binary operator"""
band_math_mode = self._in_bandmath_mode()
if band_math_mode:
if isinstance(other, (int, float)):
return self._bandmath_operator_binary_scalar(operator, other, reverse=reverse)
elif isinstance(other, DataCube):
return self._bandmath_operator_binary_cubes(operator, other)
else:
if isinstance(other, DataCube):
return self._merge_operator_binary_cubes(operator, other)
elif isinstance(other, (int, float)):
# "`apply` math" mode
return self._apply_operator(
operator=operator, other=other, reverse=reverse
)
raise OperatorException(
f"Unsupported operator {operator!r} with `other` type {type(other)!r} (band math mode={band_math_mode})"
)
def _operator_unary(self, operator: str, **kwargs) -> DataCube:
band_math_mode = self._in_bandmath_mode()
if band_math_mode:
return self._bandmath_operator_unary(operator, **kwargs)
else:
return self._apply_operator(operator=operator, extra_arguments=kwargs)
def _apply_operator(
self,
operator: str,
other: Optional[Union[int, float]] = None,
reverse: Optional[bool] = None,
extra_arguments: Optional[dict] = None,
) -> DataCube:
"""
Apply a unary or binary operator/process,
by appending to existing `apply` node, or starting a new one.
:param operator: process id of operator
:param other: for binary operators: "other" argument
:param reverse: for binary operators: "self" and "other" should be swapped (reflected operator mode)
"""
if self.result_node().process_id == "apply":
# Append to existing `apply` node
orig_apply = self.result_node()
data = orig_apply.arguments["data"]
x = {"from_node": orig_apply.arguments["process"]["process_graph"]}
context = orig_apply.arguments.get("context")
else:
# Start new `apply` node.
data = self
x = {"from_parameter": "x"}
context = None
# Build args for child callback.
args = {"x": x, **(extra_arguments or {})}
if other is not None:
# Binary operator mode
args["y"] = other
if reverse:
args["x"], args["y"] = args["y"], args["x"]
child_pg = PGNode(process_id=operator, arguments=args)
return self.process_with_node(
PGNode(
process_id="apply",
arguments=dict_no_none(
data=data,
process={"process_graph": child_pg},
context=context,
),
)
)
@openeo_process(mode="operator")
def add(self, other: Union[DataCube, int, float], reverse=False) -> DataCube:
return self._operator_binary("add", other, reverse=reverse)
@openeo_process(mode="operator")
def subtract(self, other: Union[DataCube, int, float], reverse=False) -> DataCube:
return self._operator_binary("subtract", other, reverse=reverse)
@openeo_process(mode="operator")
def divide(self, other: Union[DataCube, int, float], reverse=False) -> DataCube:
return self._operator_binary("divide", other, reverse=reverse)
@openeo_process(mode="operator")
def multiply(self, other: Union[DataCube, int, float], reverse=False) -> DataCube:
return self._operator_binary("multiply", other, reverse=reverse)
@openeo_process
def normalized_difference(self, other: DataCube) -> DataCube:
# This DataCube method is only a convenience function when in band math mode
assert self._in_bandmath_mode()
assert other._in_bandmath_mode()
return self._operator_binary("normalized_difference", other)
@openeo_process(process_id="or", mode="operator")
def logical_or(self, other: DataCube) -> DataCube:
"""
Apply element-wise logical `or` operation
:param other:
:return: logical_or(this, other)
"""
return self._operator_binary("or", other)
@openeo_process(process_id="and", mode="operator")
def logical_and(self, other: DataCube) -> DataCube:
"""
Apply element-wise logical `and` operation
:param other:
:return: logical_and(this, other)
"""
return self._operator_binary("and", other)
@openeo_process(process_id="not", mode="operator")
def __invert__(self) -> DataCube:
return self._operator_unary("not")
@openeo_process(process_id="neq", mode="operator")
def __ne__(self, other: Union[DataCube, int, float]) -> DataCube:
return self._operator_binary("neq", other)
@openeo_process(process_id="eq", mode="operator")
def __eq__(self, other: Union[DataCube, int, float]) -> DataCube:
"""
Pixelwise comparison of this data cube with another cube or constant.
:param other: Another data cube, or a constant
:return:
"""
return self._operator_binary("eq", other)
@openeo_process(process_id="gt", mode="operator")
def __gt__(self, other: Union[DataCube, int, float]) -> DataCube:
"""
Pairwise comparison of the bands in this data cube with the bands in the 'other' data cube.
:param other:
:return: this > other
"""
return self._operator_binary("gt", other)
@openeo_process(process_id="ge", mode="operator")
def __ge__(self, other: Union[DataCube, int, float]) -> DataCube:
return self._operator_binary("gte", other)
@openeo_process(process_id="lt", mode="operator")
def __lt__(self, other: Union[DataCube, int, float]) -> DataCube:
"""
Pairwise comparison of the bands in this data cube with the bands in the 'other' data cube.
The number of bands in both data cubes has to be the same.
:param other:
:return: this < other
"""
return self._operator_binary("lt", other)
@openeo_process(process_id="le", mode="operator")
def __le__(self, other: Union[DataCube, int, float]) -> DataCube:
return self._operator_binary("lte", other)
@openeo_process(process_id="add", mode="operator")
def __add__(self, other) -> DataCube:
return self.add(other)
@openeo_process(process_id="add", mode="operator")
def __radd__(self, other) -> DataCube:
return self.add(other, reverse=True)
@openeo_process(process_id="subtract", mode="operator")
def __sub__(self, other) -> DataCube:
return self.subtract(other)
@openeo_process(process_id="subtract", mode="operator")
def __rsub__(self, other) -> DataCube:
return self.subtract(other, reverse=True)
@openeo_process(process_id="multiply", mode="operator")
def __neg__(self) -> DataCube:
return self.multiply(-1)
@openeo_process(process_id="multiply", mode="operator")
def __mul__(self, other) -> DataCube:
return self.multiply(other)
@openeo_process(process_id="multiply", mode="operator")
def __rmul__(self, other) -> DataCube:
return self.multiply(other, reverse=True)
@openeo_process(process_id="divide", mode="operator")
def __truediv__(self, other) -> DataCube:
return self.divide(other)
@openeo_process(process_id="divide", mode="operator")
def __rtruediv__(self, other) -> DataCube:
return self.divide(other, reverse=True)
@openeo_process(process_id="power", mode="operator")
def __rpow__(self, other) -> DataCube:
return self._power(other, reverse=True)
@openeo_process(process_id="power", mode="operator")
def __pow__(self, other) -> DataCube:
return self._power(other, reverse=False)
def _power(self, other, reverse=False):
node = self._get_bandmath_node()
x = node.reducer_process_graph()
y = other
if reverse:
x, y = y, x
return self.process_with_node(node.clone_with_new_reducer(
PGNode(process_id="power", base=x, p=y)
))
@openeo_process(process_id="power", mode="operator")
def power(self, p: float):
return self._power(other=p, reverse=False)
@openeo_process(process_id="ln", mode="operator")
def ln(self) -> DataCube:
return self._operator_unary("ln")
@openeo_process(process_id="log", mode="operator")
def logarithm(self, base: float) -> DataCube:
return self._operator_unary("log", base=base)
@openeo_process(process_id="log", mode="operator")
def log2(self) -> DataCube:
return self.logarithm(base=2)
@openeo_process(process_id="log", mode="operator")
def log10(self) -> DataCube:
return self.logarithm(base=10)
@openeo_process(process_id="or", mode="operator")
def __or__(self, other) -> DataCube:
return self.logical_or(other)
@openeo_process(process_id="and", mode="operator")
def __and__(self, other):
return self.logical_and(other)
def _bandmath_operator_binary_cubes(
self, operator, other: DataCube, left_arg_name="x", right_arg_name="y"
) -> DataCube:
"""Band math binary operator with cube as right hand side argument"""
left = self._get_bandmath_node()
right = other._get_bandmath_node()
if left.arguments["data"] != right.arguments["data"]:
raise BandMathException("'Band math' between bands of different data cubes is not supported yet.")
# Build reducer's sub-processgraph
merged = PGNode(
process_id=operator,
arguments={
left_arg_name: {"from_node": left.reducer_process_graph()},
right_arg_name: {"from_node": right.reducer_process_graph()},
},
)
return self.process_with_node(left.clone_with_new_reducer(merged))
def _bandmath_operator_binary_scalar(self, operator: str, other: Union[int, float], reverse=False) -> DataCube:
"""Band math binary operator with scalar value (int or float) as right hand side argument"""
node = self._get_bandmath_node()
x = {'from_node': node.reducer_process_graph()}
y = other
if reverse:
x, y = y, x
return self.process_with_node(node.clone_with_new_reducer(
PGNode(operator, x=x, y=y)
))
def _bandmath_operator_unary(self, operator: str, **kwargs) -> DataCube: