From db859a0f310d4538638154ea19dad5fbf3a164b2 Mon Sep 17 00:00:00 2001 From: Igor Davidyuk Date: Wed, 6 Mar 2024 12:31:39 +0100 Subject: [PATCH] converters without otx Signed-off-by: Igor Davidyuk --- geti_sdk/data_models/label.py | 20 + geti_sdk/data_models/label_group.py | 51 ++ geti_sdk/data_models/label_schema.py | 65 +++ geti_sdk/data_models/shapes.py | 11 + geti_sdk/deployment/deployed_model.py | 134 ++--- geti_sdk/deployment/deployment.py | 2 +- .../postprocessing.py | 22 +- .../services/__init__.py | 17 + .../prediction_to_annotation_converter.py | 498 ++++++++++++++++++ .../utils/__init__.py | 17 + .../utils/detection_utils.py | 87 +++ .../utils/segmentation_utils.py | 247 +++++++++ geti_sdk/deployment/utils.py | 16 + 13 files changed, 1112 insertions(+), 75 deletions(-) create mode 100644 geti_sdk/data_models/label_group.py create mode 100644 geti_sdk/data_models/label_schema.py create mode 100644 geti_sdk/deployment/predictions_postprocessing/services/__init__.py create mode 100644 geti_sdk/deployment/predictions_postprocessing/services/prediction_to_annotation_converter.py create mode 100644 geti_sdk/deployment/predictions_postprocessing/utils/__init__.py create mode 100644 geti_sdk/deployment/predictions_postprocessing/utils/detection_utils.py create mode 100644 geti_sdk/deployment/predictions_postprocessing/utils/segmentation_utils.py diff --git a/geti_sdk/data_models/label.py b/geti_sdk/data_models/label.py index cd9af192..15f08993 100644 --- a/geti_sdk/data_models/label.py +++ b/geti_sdk/data_models/label.py @@ -23,6 +23,7 @@ from otx.api.entities.scored_label import ScoredLabel as OteScoredLabel from geti_sdk.data_models.enums import TaskType +from geti_sdk.data_models.enums.domain import Domain @attr.define @@ -64,10 +65,29 @@ class Label: group: str is_empty: bool hotkey: str = "" + domain: Optional[Domain] = None id: Optional[str] = None parent_id: Optional[str] = None is_anomalous: Optional[bool] = None + def __key(self) -> Tuple[str, str]: + """ + Return a tuple representing the key of the label. + + The key is a tuple containing the name and color of the label. + + :return: A tuple representing the key of the label. + """ + return (self.name, self.color) + + def __hash__(self) -> int: + """ + Calculate the hash value of the object. + + :return: The hash value of the object. + """ + return hash(self.__key()) + def to_ote(self, task_type: TaskType) -> LabelEntity: """ Convert the `Label` instance to an OTE SDK LabelEntity object. diff --git a/geti_sdk/data_models/label_group.py b/geti_sdk/data_models/label_group.py new file mode 100644 index 00000000..fc996aed --- /dev/null +++ b/geti_sdk/data_models/label_group.py @@ -0,0 +1,51 @@ +# Copyright (C) 2024 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions +# and limitations under the License. + +from enum import Enum +from typing import List, Optional + +from geti_sdk.data_models.label import Label + + +class LabelGroupType(Enum): + """Enum to indicate the LabelGroupType.""" + + EXCLUSIVE = 1 + EMPTY_LABEL = 2 + + +class LabelGroup: + """ + Representation of a group of labels. + """ + + def __init__( + self, + name: str, + labels: List[Label], + group_type: LabelGroupType = LabelGroupType.EXCLUSIVE, + id: Optional[str] = None, + ) -> None: + """ + Initialize a LabelGroup object. + + :param name: The name of the label group. + :param labels: A list of Label objects associated with the group. + :param group_type: The type of the label group. Defaults to LabelGroupType.EXCLUSIVE. + :param id: The ID of the label group. Defaults to None. + """ + self.id = id + self.name = name + self.group_type = group_type + self.labels = sorted(labels, key=lambda label: label.id) diff --git a/geti_sdk/data_models/label_schema.py b/geti_sdk/data_models/label_schema.py new file mode 100644 index 00000000..f7bba3eb --- /dev/null +++ b/geti_sdk/data_models/label_schema.py @@ -0,0 +1,65 @@ +# Copyright (C) 2024 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions +# and limitations under the License. + +from typing import List, Optional + +from geti_sdk.data_models.label import Label +from geti_sdk.data_models.label_group import LabelGroup, LabelGroupType + + +class LabelSchema: + """ + The `LabelSchema` class defines the structure and properties of labels and label groups. + + :param label_groups: Optional list of `LabelGroup` objects representing the label groups in the schema + """ + + def __init__(self, label_groups: Optional[List[LabelGroup]] = None) -> None: + """ + Initialize a new instance of the `LabelSchema` class. + + :param label_groups: Optional list of `LabelGroup` objects representing the label groups in the schema + """ + self._groups = label_groups + + def get_labels(self, include_empty: bool = False) -> List[Label]: + """ + Get the labels in the label schema. + + :param include_empty: Flag determining whether to include empty labels + :return: List of all labels in the label schema + """ + labels = { + label + for group in self._groups + for label in group.labels + if include_empty or not label.is_empty + } + return sorted(list(labels), key=lambda label: label.id) + + def get_groups(self, include_empty: bool = False) -> List[LabelGroup]: + """ + Get the label groups in the label schema. + + :param include_empty: Flag determining whether to include empty label groups + :return: List of all label groups in the label schema + """ + if include_empty: + return self._groups + + return [ + group + for group in self._groups + if group.group_type != LabelGroupType.EMPTY_LABEL + ] diff --git a/geti_sdk/data_models/shapes.py b/geti_sdk/data_models/shapes.py index 081d427a..ddf7bd40 100644 --- a/geti_sdk/data_models/shapes.py +++ b/geti_sdk/data_models/shapes.py @@ -278,6 +278,17 @@ def y_max(self) -> int: """ return self.y + self.height + @classmethod + def generate_full_box(cls, image_width: int, image_height: int) -> "Rectangle": + """ + Return a rectangle that fully encapsulates the image. + + :param image_width: Width of the image to which the rectangle applies (in pixels) + :param image_height: Height of the image to which the rectangle applies (in pixels) + :return: Rectangle: A rectangle that fully encapsulates the image. + """ + return cls(x=0, y=0, width=image_width, height=image_height) + @attr.define(slots=False) class Ellipse(Shape): diff --git a/geti_sdk/deployment/deployed_model.py b/geti_sdk/deployment/deployed_model.py index 8cb92505..902f7919 100644 --- a/geti_sdk/deployment/deployed_model.py +++ b/geti_sdk/deployment/deployed_model.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions # and limitations under the License. -import datetime import importlib.util import json import logging @@ -26,32 +25,43 @@ import attr import numpy as np -from otx.algorithms.classification.utils import get_cls_inferencer_configuration -from otx.api.entities.color import Color -from otx.api.entities.label import Domain as OTEDomain -from otx.api.entities.label import LabelEntity -from otx.api.entities.label_schema import LabelGroup, LabelGroupType, LabelSchemaEntity +from openvino.model_api.adapters import OpenvinoAdapter, OVMSAdapter +from openvino.model_api.models import Model as model_api_Model +from openvino.runtime import Core from geti_sdk.data_models import OptimizedModel, Project, TaskConfiguration +from geti_sdk.data_models.enums.domain import Domain # from geti_sdk.data_models.annotations import Annotation -# from geti_sdk.data_models.label import ScoredLabel +from geti_sdk.data_models.label import Label +from geti_sdk.data_models.label_group import LabelGroup +from geti_sdk.data_models.label_schema import LabelSchema from geti_sdk.data_models.predictions import Prediction, ResultMedium -from geti_sdk.data_models.task import Task # from geti_sdk.deployment.legacy_converters import ( # AnomalyClassificationToAnnotationConverter, # ) from geti_sdk.deployment.predictions_postprocessing.postprocessing import Postprocessor +from geti_sdk.deployment.predictions_postprocessing.services.prediction_to_annotation_converter import ( + ConverterFactory, +) from geti_sdk.http_session import GetiSession from geti_sdk.rest_converters import ConfigurationRESTConverter, ModelRESTConverter from .utils import ( generate_ovms_model_address, generate_ovms_model_name, + rgb_to_hex, target_device_is_ovms, ) +# from otx.algorithms.classification.utils import get_cls_inferencer_configuration +# from otx.api.entities.color import Color +# from otx.api.entities.label import Domain as OTEDomain +# from otx.api.entities.label import LabelEntity +# from otx.api.entities.label_schema import LabelGroup, LabelGroupType, LabelSchemaEntity + + MODEL_DIR_NAME = "model" PYTHON_DIR_NAME = "python" WRAPPER_DIR_NAME = "model_wrappers" @@ -90,7 +100,7 @@ def __attrs_post_init__(self): self._needs_tempdir_deletion: bool = False self._tempdir_path: Optional[str] = None self._has_custom_model_wrappers: bool = False - self._label_schema: Optional[LabelSchemaEntity] = None + # self._label_schema: Optional[LabelSchemaEntity] = None # Attributes related to model explainability self._saliency_key: Optional[str] = None @@ -232,24 +242,10 @@ def load_inference_model( :return: OpenVino inference engine model that can be used to make predictions on images """ - try: - from openvino.model_api.adapters import ( - OpenvinoAdapter, - OVMSAdapter, - create_core, - ) - from openvino.model_api.models import Model as OMZModel - except ImportError as error: - raise ValueError( - f"Unable to load inference model for {self}. Relevant OpenVINO " - f"packages were not found. Please make sure that OpenVINO is installed " - f"correctly." - ) from error - if not target_device_is_ovms(device=device): # Run the model locally model_adapter = OpenvinoAdapter( - create_core(), + Core(), model=os.path.join(self._model_data_path, "model.xml"), weights_path=os.path.join(self._model_data_path, "model.bin"), device=device, @@ -284,22 +280,26 @@ def load_inference_model( # Load model configuration config_path = os.path.join(self._model_data_path, "config.json") - if os.path.isfile(config_path): - with open(config_path, "r") as config_file: - configuration_json = json.load(config_file) - model_type = configuration_json.get("type_of_model") - parameters = configuration_json.get("model_parameters") - label_dictionary = parameters.pop(LABELS_CONFIG_KEY, None) - if configuration is not None: - configuration.update(parameters) - else: - configuration = parameters - else: + if not os.path.isfile(config_path): raise ValueError( f"Missing configuration file `config.json` for deployed model `{self}`," f" unable to load inference model." ) + with open(config_path, "r") as config_file: + configuration_json = json.load(config_file) + model_type = configuration_json.get("type_of_model") + + # Update model parameters + parameters = configuration_json.get("model_parameters") + if configuration is not None: + configuration.update(parameters) + else: + configuration = parameters + # Parse label schema + label_dictionary = configuration_json.get("model_parameters").pop( + LABELS_CONFIG_KEY, None + ) self._parse_label_schema_from_dict(label_dictionary) # Create model wrapper with the loaded configuration @@ -322,19 +322,19 @@ def load_inference_model( f"is required, but could not be found at path " f"{wrapper_module_path}." ) from ex - - if model_type == "otx_classification": - configuration = get_cls_inferencer_configuration(self.ote_label_schema) - - model = OMZModel.create_model( + model = model_api_Model.create_model( model=model_adapter, model_type=model_type, - configuration=configuration, preload=True, ) - self.openvino_model_parameters = configuration + # self.openvino_model_parameters = configuration self._inference_model = model + # Load results to Prediction converter + self._converter = ConverterFactory.create_converter( + self.label_schema, configuration + ) + # TODO: This is a workaround to fix the issue that causes the output blob name # to be unset. Remove this once it has been fixed on OTX/ModelAPI side output_names = list(self._inference_model.outputs.keys()) @@ -580,7 +580,7 @@ def _postprocess_explain_outputs( repr_vector = None return saliency_map, repr_vector - def infer(self, image: np.ndarray, task: Task, explain: bool = False) -> Prediction: + def infer(self, image: np.ndarray, explain: bool = False) -> Prediction: """ Run inference on an already preprocessed image. @@ -589,19 +589,23 @@ def infer(self, image: np.ndarray, task: Task, explain: bool = False) -> Predict :return: Dictionary containing the model outputs """ preprocessed_image, metadata = self._preprocess(image) + # metadata is a dict with keys 'original_shape' and 'resized_shape' inference_results: Dict[str, np.ndarray] = self._inference_model.infer_sync( preprocessed_image ) postprocessing_results = self._postprocess(inference_results, metadata=metadata) # Create a postprocessor - if self._postprocessor is None: - self._postprocessor = Postprocessor( - labels=self.ote_label_schema, - configuration=self.openvino_model_parameters, - task=task, - ) - prediction = self._postprocessor(postprocessing_results, image, metadata) + # if self._postprocessor is None: + # self._postprocessor = Postprocessor( + # label_schema=self.ote_label_schema, + # configuration=self.openvino_model_parameters, + # task=task, + # ) + # prediction = self._postprocessor(postprocessing_results, image, metadata) + prediction = self._converter.convert_to_prediction( + postprocessing_results, image_shape=metadata["original_shape"] + ) # Add optional explainability outputs if explain: @@ -616,14 +620,14 @@ def infer(self, image: np.ndarray, task: Task, explain: bool = False) -> Predict return prediction @property - def ote_label_schema(self) -> LabelSchemaEntity: + def label_schema(self) -> LabelSchema: """ - Return the OTE LabelSchema for the model. + Return the LabelSchema for the model. This requires the inference model to be loaded, getting this property while inference models are not loaded will raise a ValueError - :return: LabelSchemaEntity containing the OTE SDK label schema for the model + :return: LabelSchema containing the SDK label schema for the model """ if self._label_schema is None: raise ValueError( @@ -637,7 +641,7 @@ def _parse_label_schema_from_dict( ) -> None: """ Parse the dictionary contained in the model `config.json` file, and - generate an OTE LabelSchemaEntity from it. + generate an LabelSchema from it. :param label_schema_dict: Dictionary containing the label schema information to parse @@ -645,27 +649,31 @@ def _parse_label_schema_from_dict( label_groups_list = label_schema_dict[LABEL_GROUPS_KEY] labels_dict = label_schema_dict[ALL_LABELS_KEY] for key, value in labels_dict.items(): - label_entity = LabelEntity( - id=value["_id"], + color_tuple = tuple( + int(value["color"][key]) for key in ["red", "green", "blue"] + ) + label_entity = Label( name=value["name"], - hotkey=value["hotkey"], - domain=OTEDomain[value["domain"]], - color=Color(**value["color"]), + color=rgb_to_hex(color_tuple), + group=None, is_empty=value.get("is_empty", False), - creation_date=datetime.datetime.fromisoformat(value["creation_date"]), + hotkey=value["hotkey"], + domain=Domain[value["domain"]], + id=value["_id"], + is_anomalous=value.get("is_anomalous", False), ) labels_dict[key] = label_entity label_groups: List[LabelGroup] = [] for group_dict in label_groups_list: - labels: List[LabelEntity] = [ + labels: List[Label] = [ labels_dict[label_id] for label_id in group_dict["label_ids"] ] label_groups.append( LabelGroup( id=group_dict["_id"], name=group_dict["name"], - group_type=LabelGroupType[group_dict["relation_type"]], + group_type=group_dict["relation_type"], labels=labels, ) ) - self._label_schema = LabelSchemaEntity(label_groups=label_groups) + self._label_schema = LabelSchema(label_groups=label_groups) diff --git a/geti_sdk/deployment/deployment.py b/geti_sdk/deployment/deployment.py index 4c584210..88dfdb0a 100644 --- a/geti_sdk/deployment/deployment.py +++ b/geti_sdk/deployment/deployment.py @@ -231,7 +231,7 @@ def _infer_task( :return: Inference result """ model = self._get_model_for_task(task) - return model.infer(image, task, explain) + return model.infer(image, explain) def _infer_pipeline(self, image: np.ndarray, explain: bool = False) -> Prediction: """ diff --git a/geti_sdk/deployment/predictions_postprocessing/postprocessing.py b/geti_sdk/deployment/predictions_postprocessing/postprocessing.py index b6c1630d..79e78a86 100644 --- a/geti_sdk/deployment/predictions_postprocessing/postprocessing.py +++ b/geti_sdk/deployment/predictions_postprocessing/postprocessing.py @@ -68,12 +68,12 @@ class Postprocessor: :param task: Task object containing the task metadata. """ - def __init__(self, labels, configuration, task: Task) -> None: + def __init__(self, label_schema, configuration, task: Task) -> None: self.task = task - self.ote_label_schema = labels + self.ote_label_schema = label_schema # Create OTX converter - converter_args = {"labels": labels} + converter_args = {"labels": self.ote_label_schema} if otx.__version__ > "1.2.0": if "use_ellipse_shapes" not in configuration.keys(): configuration.update({"use_ellipse_shapes": False}) @@ -152,14 +152,14 @@ def __call__( else: prediction = Prediction(annotations=[]) - print( - "pre-converter", - postprocessing_results, - "metadata", - metadata, - ) - print("width", width, "height", height) - print("post-converter", prediction) + # print( + # "pre-converter", + # postprocessing_results, + # "metadata", + # metadata, + # ) + # print("width", width, "height", height) + # print("post-converter", prediction) # Empty label is not generated by OTE correctly, append it here if there are # no other predictions diff --git a/geti_sdk/deployment/predictions_postprocessing/services/__init__.py b/geti_sdk/deployment/predictions_postprocessing/services/__init__.py new file mode 100644 index 00000000..5ecbbdac --- /dev/null +++ b/geti_sdk/deployment/predictions_postprocessing/services/__init__.py @@ -0,0 +1,17 @@ +# INTEL CONFIDENTIAL +# +# Copyright (C) 2023 Intel Corporation +# +# This software and the related documents are Intel copyrighted materials, and +# your use of them is governed by the express license under which they were provided to +# you ("License"). Unless the License provides otherwise, you may not use, modify, copy, +# publish, distribute, disclose or transmit this software or the related documents +# without Intel's prior written permission. +# +# This software and the related documents are provided as is, +# with no express or implied warranties, other than those that are expressly stated +# in the License. + +""" +The module contains services for predictions post-processing. +""" diff --git a/geti_sdk/deployment/predictions_postprocessing/services/prediction_to_annotation_converter.py b/geti_sdk/deployment/predictions_postprocessing/services/prediction_to_annotation_converter.py new file mode 100644 index 00000000..f22650ab --- /dev/null +++ b/geti_sdk/deployment/predictions_postprocessing/services/prediction_to_annotation_converter.py @@ -0,0 +1,498 @@ +# INTEL CONFIDENTIAL +# +# Copyright (C) 2024 Intel Corporation +# +# This software and the related documents are Intel copyrighted materials, and +# your use of them is governed by the express license under which they were provided to +# you ("License"). Unless the License provides otherwise, you may not use, modify, copy, +# publish, distribute, disclose or transmit this software or the related documents +# without Intel's prior written permission. +# +# This software and the related documents are provided as is, +# with no express or implied warranties, other than those that are expressly stated +# in the License. + +"""Module implements the InferenceResultsToPredictionConverter class.""" + +import abc +from typing import Any, Dict, NamedTuple, Optional, Tuple, Union + +import cv2 +import numpy as np +from openvino.model_api.models.utils import ( + AnomalyResult, + ClassificationResult, + DetectionResult, + ImageResultWithSoftPrediction, + InstanceSegmentationResult, +) + +from geti_sdk.data_models.annotations import Annotation + +# from otx.api.entities.annotation import Annotation +from geti_sdk.data_models.enums.domain import Domain + +# from otx.api.entities.label import Domain +# from otx.api.entities.scored_label import ScoredLabel +from geti_sdk.data_models.label import ScoredLabel +from geti_sdk.data_models.label_schema import LabelSchema +from geti_sdk.data_models.predictions import Prediction +from geti_sdk.data_models.shapes import ( + Ellipse, + Point, + Polygon, + Rectangle, + RotatedRectangle, +) + +# from otx.api.entities.shapes.ellipse import Ellipse +# from otx.api.entities.shapes.polygon import Point, Polygon +# from otx.api.entities.shapes.rectangle import Rectangle +from geti_sdk.deployment.predictions_postprocessing.utils.detection_utils import ( + detection2array, +) +from geti_sdk.deployment.predictions_postprocessing.utils.segmentation_utils import ( + create_annotation_from_segmentation_map, +) + + +class InferenceResultsToPredictionConverter(metaclass=abc.ABCMeta): + """Interface for the converter""" + + @abc.abstractmethod + def convert_to_prediction(self, predictions: NamedTuple, **kwargs) -> Prediction: + """ + Convert raw predictions to Annotation format. + + :param predictions: raw predictions from inference + :return: lisf of annotation objects containing the shapes obtained from the raw predictions. + """ + raise NotImplementedError + + +class ClassificationToPredictionConverter(InferenceResultsToPredictionConverter): + """ + Converts ModelAPI Classification predictions to Annotations. + + :param label_schema: LabelSchema containing the label info of the task + """ + + def __init__(self, label_schema: LabelSchema): + all_labels = label_schema.get_labels(include_empty=True) + # add empty labels if only one non-empty label exits + non_empty_labels = [label for label in all_labels if not label.is_empty] + self.labels = all_labels if len(non_empty_labels) == 1 else non_empty_labels + # get the first empty label + self.empty_label = next((label for label in all_labels if label.is_empty), None) + multilabel = len(label_schema.get_groups(False)) > 1 + multilabel = multilabel and len(label_schema.get_groups(False)) == len( + label_schema.get_labels(include_empty=False) + ) + self.hierarchical = not multilabel and len(label_schema.get_groups(False)) > 1 + + self.label_schema = label_schema + + def convert_to_prediction( + self, predictions: ClassificationResult, image_shape: Tuple[int], **kwargs + ) -> Prediction: # noqa: ARG003 + """ + Convert ModelAPI ClassificationResult predictions to sc_sdk annotations. + + :param predictions: classification labels represented in ModelAPI format (label_index, label_name, confidence) + :return: list of full box annotations objects with corresponding label + """ + labels = [] + for label in predictions.top_labels: + labels.append( + ScoredLabel.from_label(self.labels[label[0]], float(label[-1])) + ) + + if not labels and self.empty_label: + labels = [ScoredLabel.from_label(self.empty_label, probability=1.0)] + + annotations = Annotation( + shape=Rectangle.generate_full_box(image_shape[1], image_shape[0]), + labels=labels, + ) + return Prediction(annotations) + + +class DetectionToPredictionConverter(InferenceResultsToPredictionConverter): + """ + Converts ModelAPI Detection objects to Prediction. + + :param label_schema: LabelSchema containing the label info of the task + :param configuration: optional model configuration setting + """ + + def __init__( + self, label_schema: LabelSchema, configuration: Optional[dict[str, Any]] = None + ): + self.labels = label_schema.get_labels(include_empty=False) + self.label_map = dict(enumerate(self.labels)) + self.use_ellipse_shapes = False + self.confidence_threshold = 0.0 + if configuration is not None: + if "use_ellipse_shapes" in configuration: + self.use_ellipse_shapes = configuration["use_ellipse_shapes"] + if "confidence_threshold" in configuration: + self.confidence_threshold = configuration["confidence_threshold"] + + def convert_to_prediction( + self, predictions: DetectionResult, **kwargs + ) -> Prediction: + """ + Convert ModelAPI DetectionResult predictions to Prediction. + + :param predictions: detection represented in ModelAPI format (label, confidence, x1, y1, x2, y2). + + _Note: + - `label` can be any integer that can be mapped to `self.labels` + - `confidence` should be a value between 0 and 1 + - `x1`, `x2`, `y1` and `y2` are expected to be in pixel + :return: list of annotations object containing the boxes obtained from the prediction + """ + detections = detection2array(predictions.objects) + + annotations = [] + if ( + len(detections) + and detections.shape[1:] < (6,) + or detections.shape[1:] > (7,) + ): + raise ValueError( + f"Shape of prediction is not expected, expected (n, 7) or (n, 6) but got {detections.shape}" + ) + + for detection in detections: + # Some OpenVINO models use an output shape of [7,] + # If this is the case, skip the first value as it is not used + _detection = detection[1:] if detection.shape == (7,) else detection + + label = int(_detection[0]) + confidence = _detection[1] + scored_label = ScoredLabel.from_label(self.label_map[label], confidence) + coords = _detection[2:] + shape: Ellipse | Rectangle + + if confidence < self.confidence_threshold: + continue + + bbox_width = coords[2] - coords[0] + bbox_height = coords[3] - coords[1] + if self.use_ellipse_shapes: + shape = Ellipse(coords[0], coords[1], bbox_width, bbox_height) + else: + shape = Rectangle(coords[0], coords[1], bbox_width, bbox_height) + + annotation = Annotation(shape=shape, labels=[scored_label]) + annotations.append(annotation) + return Prediction(annotations) + + +class RotatedRectToPredictionConverter(DetectionToPredictionConverter): + """ + Converts ModelAPI Rotated Detection objects to Prediction. + + :param label_schema: LabelSchema containing the label info of the task + """ + + def convert_to_prediction( + self, predictions: InstanceSegmentationResult, **kwargs + ) -> Prediction: + """ + Convert ModelAPI instance segmentation predictions to a rotated bounding box annotation format. + + :param predictions: segmentation represented in ModelAPI format + :return: list of annotations containing the rotated boxes obtained from the segmentation contours + :raises ValueError: if metadata is missing from the preprocess step + """ + annotations = [] + if hasattr(predictions, "segmentedObjects"): + predictions = predictions.segmentedObjects + shape: Union[RotatedRectangle, Ellipse] + # for obj in predictions: + for score, class_idx, box, mask in zip(*predictions): + if score < self.confidence_threshold: + continue + if self.use_ellipse_shapes: + shape = Ellipse(box[0], box[1], box[2] - box[0], box[3] - box[1]) + annotations.append( + Annotation( + shape, + labels=[ + ScoredLabel.from_label( + self.labels[int(class_idx) - 1], float(score) + ) + ], + ) + ) + else: + mask = mask.astype(np.uint8) + contours, hierarchies = cv2.findContours( + mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE + ) + if hierarchies is None: + continue + for contour, hierarchy in zip(contours, hierarchies[0]): + if hierarchy[3] != -1: + continue + if len(contour) <= 2 or cv2.contourArea(contour) < 1.0: + continue + points = [ + Point( + x=point[0], + y=point[1], + ) + for point in cv2.boxPoints(cv2.minAreaRect(contour)) + ] + shape = Polygon(points=points) + annotations.append( + Annotation( + shape=RotatedRectangle.from_polygon(shape), + labels=[ + ScoredLabel.from_label( + self.labels[int(class_idx) - 1], float(score) + ) + ], + ) + ) + return Prediction(annotations) + + +class MaskToAnnotationConverter(InferenceResultsToPredictionConverter): + """Converts DetectionBox Predictions ModelAPI to Annotations.""" + + def __init__( + self, label_schema: LabelSchema, configuration: Optional[Dict[str, Any]] = None + ): + self.labels = label_schema.get_labels(include_empty=False) + self.use_ellipse_shapes = False + self.confidence_threshold = 0.0 + if configuration is not None: + if "use_ellipse_shapes" in configuration: + self.use_ellipse_shapes = configuration["use_ellipse_shapes"] + if "confidence_threshold" in configuration: + self.confidence_threshold = configuration["confidence_threshold"] + + def convert_to_prediction( + self, predictions: tuple, **kwargs: Dict[str, Any] + ) -> Prediction: + """ + Convert predictions to Annotation Scene using the metadata. + + :param predictions: Raw predictions from the model. + :return: Prediction object. + """ + annotations = [] + shape: Union[Polygon, Ellipse] + for score, class_idx, box, mask in zip(*predictions): + if score < self.confidence_threshold: + continue + if self.use_ellipse_shapes: + shape = shape = Ellipse( + box[0], box[1], box[2] - box[0], box[3] - box[1] + ) + annotations.append( + Annotation( + shape=shape, + labels=[ + ScoredLabel.from_label( + self.labels[int(class_idx) - 1], float(score) + ) + ], + ) + ) + else: + mask = mask.astype(np.uint8) + contours, hierarchies = cv2.findContours( + mask, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_SIMPLE + ) + if hierarchies is None: + continue + for contour, hierarchy in zip(contours, hierarchies[0]): + if hierarchy[3] != -1: + continue + if len(contour) <= 2 or cv2.contourArea(contour) < 1.0: + continue + contour = list(contour) + points = [ + Point( + x=point[0][0], + y=point[0][1], + ) + for point in contour + ] + shape = Polygon(points=points) + annotations.append( + Annotation( + shape=shape, + labels=[ + ScoredLabel.from_label( + self.labels[int(class_idx) - 1], float(score) + ) + ], + ) + ) + return Prediction(annotations) + + +class SegmentationToPredictionConverter(InferenceResultsToPredictionConverter): + """ + Converts ModelAPI Segmentation objects to Annotations. + + :param label_schema: LabelSchema containing the label info of the task + """ + + def __init__(self, label_schema: LabelSchema): + self.labels = label_schema.get_labels(include_empty=False) + # NB: index=0 is reserved for the background label + self.label_map = dict(enumerate(self.labels, 1)) + + def convert_to_prediction( + self, predictions: ImageResultWithSoftPrediction, **kwargs # noqa: ARG002 + ) -> Prediction: + """ + Convert ModelAPI instance segmentation predictions to sc_sdk annotations. + + :param predictions: segmentation represented in ModelAPI format + :return: list of annotations object containing the contour polygon obtained from the segmentation + """ + annotations = create_annotation_from_segmentation_map( + hard_prediction=predictions.resultImage, + soft_prediction=predictions.soft_prediction, + label_map=self.label_map, + ) + return Prediction(annotations) + + +class AnomalyToPredictionConverter(InferenceResultsToPredictionConverter): + """ + Convert ModelAPI AnomalyResult predictions to Annotations. + + :param label_schema: LabelSchema containing the label info of the task + """ + + def __init__(self, label_schema: LabelSchema): + self.labels = label_schema.get_labels(include_empty=False) + self.normal_label = next( + label for label in self.labels if not label.is_anomalous + ) + self.anomalous_label = next( + label for label in self.labels if label.is_anomalous + ) + self.domain = self.anomalous_label.domain + + def convert_to_prediction( + self, predictions: AnomalyResult, image_shape: Tuple[int], **kwargs + ) -> Prediction: # noqa: ARG002 + """ + Convert ModelAPI AnomalyResult predictions to sc_sdk annotations. + + :param predictions: anomaly result represented in ModelAPI format (same for all anomaly tasks) + :return: list of annotation objects based on the specific anomaly task: + - Classification: single label (normal or anomalous). + - Segmentation: contour polygon representing the segmentation. + - Detection: predicted bounding boxes. + """ + pred_label = predictions.pred_label + label = self.anomalous_label if pred_label == "Anomalous" else self.normal_label + annotations: list[Annotation] = [] + match self.domain: + case Domain.ANOMALY_CLASSIFICATION: + scored_label = ScoredLabel.from_label( + label=label, probability=float(predictions.pred_score) + ) + annotations = [ + Annotation( + Rectangle.generate_full_box(*image_shape[1::-1]), + labels=[scored_label], + ) + ] + case Domain.ANOMALY_SEGMENTATION: + annotations = create_annotation_from_segmentation_map( + hard_prediction=predictions.pred_mask, + soft_prediction=predictions.anomaly_map.squeeze(), + label_map={0: self.normal_label, 1: self.anomalous_label}, + ) + case Domain.ANOMALY_DETECTION: + for box in predictions.pred_boxes: + annotations.append( + Annotation( + Rectangle(box[0], box[1], box[2] - box[0], box[3] - box[1]), + labels=[ + ScoredLabel.from_label( + label=self.anomalous_label, + probability=predictions.pred_score, + ) + ], + ) + ) + case _: + raise ValueError( + f"Cannot convert predictions for task '{self.domain.name}'. Only Anomaly tasks are supported." + ) + if not annotations: + scored_label = ScoredLabel.from_label( + label=self.normal_label, probability=1.0 + ) + annotations = [ + Annotation( + Rectangle.generate_full_box(*image_shape[1::-1]), + labels=[scored_label], + ) + ] + return Prediction(annotations) + + +class ConverterFactory: + """ + Factory class for creating inference result to prediction converters based on the model's task. + """ + + @staticmethod + def create_converter( + label_schema: LabelSchema, configuration: dict[str, Any] | None = None + ) -> InferenceResultsToPredictionConverter: + """ + Create the appropriate inferencer object according to the model's task. + + :param label_schema: The label schema containing the label info of the task. + :param configuration: Optional configuration for the converter. Defaults to None. + :return: The created inference result to prediction converter. + :raises ValueError: If the task type cannot be determined from the label schema. + """ + domain = ConverterFactory._get_labels_domain(label_schema) + if domain == Domain.CLASSIFICATION: + return ClassificationToPredictionConverter(label_schema) + if domain == Domain.DETECTION: + return DetectionToPredictionConverter(label_schema, configuration) + if domain == Domain.SEGMENTATION: + return SegmentationToPredictionConverter(label_schema) + if domain == Domain.ROTATED_DETECTION: + return RotatedRectToPredictionConverter(label_schema, configuration) + if domain == Domain.INSTANCE_SEGMENTATION: + return MaskToAnnotationConverter(label_schema, configuration) + if domain in ( + Domain.ANOMALY_CLASSIFICATION, + Domain.ANOMALY_SEGMENTATION, + Domain.ANOMALY_DETECTION, + ): + return AnomalyToPredictionConverter(label_schema) + raise ValueError(f"Cannot create inferencer for task type '{domain.name}'.") + + @staticmethod + def _get_labels_domain(label_schema: LabelSchema) -> Domain: + """ + Return the domain (task type) associated with the model's labels. + + :param label_schema: The label schema containing the label info of the task. + :return: The domain of the task. + :raises ValueError: If the task type cannot be determined from the label schema. + """ + try: + return next( + label.domain for label in label_schema.get_labels(include_empty=False) + ) + except StopIteration: + raise ValueError("Cannot determine the task for the model") diff --git a/geti_sdk/deployment/predictions_postprocessing/utils/__init__.py b/geti_sdk/deployment/predictions_postprocessing/utils/__init__.py new file mode 100644 index 00000000..8857252c --- /dev/null +++ b/geti_sdk/deployment/predictions_postprocessing/utils/__init__.py @@ -0,0 +1,17 @@ +# INTEL CONFIDENTIAL +# +# Copyright (C) 2024 Intel Corporation +# +# This software and the related documents are Intel copyrighted materials, and +# your use of them is governed by the express license under which they were provided to +# you ("License"). Unless the License provides otherwise, you may not use, modify, copy, +# publish, distribute, disclose or transmit this software or the related documents +# without Intel's prior written permission. +# +# This software and the related documents are provided as is, +# with no express or implied warranties, other than those that are expressly stated +# in the License. + +""" +The module contains utility functions for post-processing predictions. +""" diff --git a/geti_sdk/deployment/predictions_postprocessing/utils/detection_utils.py b/geti_sdk/deployment/predictions_postprocessing/utils/detection_utils.py new file mode 100644 index 00000000..768541dd --- /dev/null +++ b/geti_sdk/deployment/predictions_postprocessing/utils/detection_utils.py @@ -0,0 +1,87 @@ +# INTEL CONFIDENTIAL +# +# Copyright (C) 2024 Intel Corporation +# +# This software and the related documents are Intel copyrighted materials, and +# your use of them is governed by the express license under which they were provided to +# you ("License"). Unless the License provides otherwise, you may not use, modify, copy, +# publish, distribute, disclose or transmit this software or the related documents +# without Intel's prior written permission. +# +# This software and the related documents are provided as is, +# with no express or implied warranties, other than those that are expressly stated +# in the License. + +import json +import logging + +import numpy as np +from openvino.model_api.models.utils import Detection + +from geti_sdk.data_models.model import Model + +# from sc_sdk.entities.model import Model + +logger = logging.getLogger(__name__) + + +def detection2array(detections: list[Detection]) -> np.ndarray: + """ + Convert list of OpenVINO Detection to a numpy array. + + :param detections: list of OpenVINO Detection containing [score, id, xmin, ymin, xmax, ymax] + :return: numpy array with [label, confidence, x1, y1, x2, y2] + """ + scores = np.empty((0, 1), dtype=np.float32) + labels = np.empty((0, 1), dtype=np.uint32) + boxes = np.empty((0, 4), dtype=np.float32) + for det in detections: + if (det.xmax - det.xmin) * (det.ymax - det.ymin) < 1.0: + continue + scores = np.append(scores, [[det.score]], axis=0) + labels = np.append(labels, [[det.id]], axis=0) + boxes = np.append( + boxes, + [[float(det.xmin), float(det.ymin), float(det.xmax), float(det.ymax)]], + axis=0, + ) + return np.concatenate((labels, scores, boxes), -1) + + +def get_detection_inferencer_configuration(model: Model) -> dict: + """ + Get detection configuration from the model. + + :param model: (Geti) Model to get the detection configuration from + :return: dict representing the detection configuration + """ + config = json.loads(model.get_data("config.json")) + _flatten_config_values(config) + + configuration = {} + if config["postprocessing"].get("result_based_confidence_threshold", False): + configuration["confidence_threshold"] = float( + np.frombuffer(model.get_data("confidence_threshold"), dtype=np.float32)[0] + ) + configuration["use_ellipse_shapes"] = config["postprocessing"].get( + "use_ellipse_shapes", False + ) + + logger.info(f"Detection inferencer configuration: {configuration}") + return configuration + + +def _flatten_config_values(config: dict) -> None: + """ + Extract the "value" field from any nested config. + + Flattening the structure of the config dictionary. The original config dictionary is modified in-place. + + :param config: config dictionary + """ + for key, value in config.items(): + if isinstance(value, dict): + if "value" in value: + config[key] = value["value"] + else: + _flatten_config_values(value) diff --git a/geti_sdk/deployment/predictions_postprocessing/utils/segmentation_utils.py b/geti_sdk/deployment/predictions_postprocessing/utils/segmentation_utils.py new file mode 100644 index 00000000..d0fbeb76 --- /dev/null +++ b/geti_sdk/deployment/predictions_postprocessing/utils/segmentation_utils.py @@ -0,0 +1,247 @@ +# INTEL CONFIDENTIAL +# +# Copyright (C) 2024 Intel Corporation +# +# This software and the related documents are Intel copyrighted materials, and +# your use of them is governed by the express license under which they were provided to +# you ("License"). Unless the License provides otherwise, you may not use, modify, copy, +# publish, distribute, disclose or transmit this software or the related documents +# without Intel's prior written permission. +# +# This software and the related documents are provided as is, +# with no express or implied warranties, other than those that are expressly stated +# in the License. +import logging +from copy import copy +from typing import cast + +import cv2 +import numpy as np + +from geti_sdk.data_models.annotations import Annotation +from geti_sdk.data_models.label import ScoredLabel +from geti_sdk.data_models.shapes import Point, Polygon + +# from bson import ObjectId + + +# from otx.api.entities.annotation import Annotation +# from otx.api.entities.id import ID +# from otx.api.entities.scored_label import ScoredLabel +# from otx.api.entities.shapes.polygon import Point, Polygon +# from otx.api.utils.shape_factory import ShapeFactory + +# from sc_sdk.entities.dataset_item import DatasetItem +# from sc_sdk.entities.label import Label + +logger = logging.getLogger(__name__) + +Contour = list[tuple[float, float]] +ContourInternal = list[tuple[float, float] | None] + + +def create_hard_prediction_from_soft_prediction( + soft_prediction: np.ndarray, soft_threshold: float, blur_strength: int = 5 +) -> np.ndarray: + """ + Create a hard prediction containing the final label index per pixel. + + :param soft_prediction: Output from segmentation network. Assumes floating point values, between 0.0 and 1.0. + Can be a 2d-array of shape (height, width) or per-class segmentation logits of shape (height, width, n_classes) + :param soft_threshold: minimum class confidence for each pixel. + The higher the value, the more strict the segmentation is (usually set to 0.5) + :param blur_strength: The higher the value, the smoother the segmentation output will be, but less accurate + :return: numpy array of the hard prediction + """ + soft_prediction_blurred = cv2.blur(soft_prediction, (blur_strength, blur_strength)) + if len(soft_prediction.shape) == 3: + # Apply threshold to filter out `unconfident` predictions, then get max along + # class dimension + soft_prediction_blurred[soft_prediction_blurred < soft_threshold] = 0 + hard_prediction = np.argmax(soft_prediction_blurred, axis=2) + elif len(soft_prediction.shape) == 2: + # In the binary case, simply apply threshold + hard_prediction = soft_prediction_blurred > soft_threshold + else: + raise ValueError( + f"Invalid prediction input of shape {soft_prediction.shape}. " + f"Expected either a 2D or 3D array." + ) + return hard_prediction + + +def get_subcontours(contour: Contour) -> list[Contour]: + """ + Split contour into sub-contours that do not have self intersections. + + :param contour: the contour to split + :return: list of sub-contours + """ + + def find_loops(points: ContourInternal) -> list: + """For each consecutive pair of equivalent rows in the input matrix returns their indices.""" + _, inverse, count = np.unique(points, axis=0, return_inverse=True, return_counts=True) # type: ignore + duplicates = np.where(count > 1)[0] + indices = [] + for x in duplicates: + y = np.nonzero(inverse == x)[0] + for i, _ in enumerate(y[:-1]): + indices.append(y[i : i + 2]) + return indices + + base_contour = cast(ContourInternal, copy(contour)) + + # Make sure that contour is closed. + if not np.array_equal(base_contour[0], base_contour[-1]): # type: ignore + base_contour.append(base_contour[0]) + + subcontours: list[Contour] = [] + loops = sorted(find_loops(base_contour), key=lambda x: x[0], reverse=True) + for loop in loops: + i, j = loop + subcontour = base_contour[i:j] + subcontour = [x for x in subcontour if x is not None] + subcontours.append(cast(Contour, subcontour)) + base_contour[i:j] = [None] * (j - i) + + return [i for i in subcontours if len(i) > 2] + + +def create_annotation_from_segmentation_map( + hard_prediction: np.ndarray, soft_prediction: np.ndarray, label_map: dict +) -> list[Annotation]: + """ + Create polygons from the soft predictions. + + Note: background label will be ignored and not be converted to polygons. + + :param hard_prediction: hard prediction containing the final label index per pixel. + See function `create_hard_prediction_from_soft_prediction`. + :param soft_prediction: soft prediction with shape H x W x N_labels, + where soft_prediction[:, :, 0] is the soft prediction for + background. If soft_prediction is of H x W shape, it is + assumed that this soft prediction will be applied for all + labels. + :param label_map: dictionary mapping labels to an index. It is assumed + that the first item in the dictionary corresponds to the + background label and will therefore be ignored. + :return: list of annotations with polygons + """ + # pylint: disable=too-many-locals + height, width = hard_prediction.shape[:2] + img_class = hard_prediction.swapaxes(0, 1) + + # pylint: disable=too-many-nested-blocks + annotations: list[Annotation] = [] + for label_index, label in label_map.items(): + # Skip background + if label_index == 0: + continue + + # obtain current label soft prediction + if len(soft_prediction.shape) == 3: + current_label_soft_prediction = soft_prediction[:, :, label_index] + else: + current_label_soft_prediction = soft_prediction + + obj_group = img_class == label_index + label_index_map = (obj_group.T.astype(int) * 255).astype(np.uint8) + + # Contour retrieval mode CCOMP (Connected components) creates a two-level + # hierarchy of contours + contours, hierarchies = cv2.findContours( + label_index_map, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_NONE + ) + + if hierarchies is not None: + for contour, hierarchy in zip(contours, hierarchies[0]): + if len(contour) <= 2 or cv2.contourArea(contour) < 1.0: + continue + + if hierarchy[3] == -1: + # In this case a contour does not represent a hole + _contour = [(point[0][0], point[0][1]) for point in contour] + + # Split contour into subcontours that do not have self intersections. + subcontours = get_subcontours(_contour) + for subcontour in subcontours: + # compute probability of the shape + mask = np.zeros(hard_prediction.shape, dtype=np.uint8) + cv2.drawContours( + mask, + np.asarray([[[x, y]] for x, y in subcontour]), + contourIdx=-1, + color=1, + thickness=-1, + ) + probability = cv2.mean(current_label_soft_prediction, mask)[0] + + # convert the list of points to a closed polygon + points = [Point(x=x, y=y) for x, y in subcontour] + polygon = Polygon(points=points) + + if polygon.area > 0: + # Contour is a closed polygon with area > 0 + annotations.append( + Annotation( + shape=polygon, + labels=[ScoredLabel.from_label(label, probability)], + # id=ID(ObjectId()), + ) + ) + else: + # Contour is a closed polygon with area == 0 + logger.warning( + "The geometry of the segmentation map you are converting " + "is not fully supported. Polygons with a area of zero " + "will be removed.", + ) + else: + # If contour hierarchy[3] != -1 then contour has a parent and + # therefore is a hole + # Do not allow holes in segmentation masks to be filled silently, + # but trigger warning instead + logger.warning( + "The geometry of the segmentation map you are converting is " + "not fully supported. A hole was found and will be filled.", + ) + return annotations + + +# def mask_from_annotation(annotations: list[Annotation], labels: list[Label], width: int, height: int) -> np.ndarray: +# """ +# Generate a segmentation mask of a numpy image, and a list of shapes. + +# The mask is will be two dimensional and the value of each pixel matches the class +# index with offset 1. The background class index is zero. labels[0] matches pixel +# value 1, etc. The class index is determined based on the order of `labels`: + +# :param annotations: List of annotations to plot in mask +# :param labels: List of labels. The index position of the label determines the class number in the segmentation mask. +# :param width: Width of the mask +# :param height: Height of the mask +# :return: 2d numpy array of mask +# """ + +# mask = np.zeros(shape=(height, width), dtype=np.uint8) +# for annotation in annotations: +# shape = annotation.shape +# if not isinstance(shape, Polygon): +# shape = ShapeFactory.shape_as_polygon(annotation.shape) +# known_labels = [ +# label for label in annotation.get_labels() if isinstance(label, ScoredLabel) and label.get_label() in labels +# ] +# if len(known_labels) == 0: +# # Skip unknown shapes +# continue + +# label_to_compare = known_labels[0].get_label() + +# class_idx = labels.index(label_to_compare) + 1 +# contour = [] +# for point in shape.points: +# contour.append([int(point.x * width), int(point.y * height)]) + +# mask = cv2.drawContours(mask, np.asarray([contour]), 0, (class_idx, class_idx, class_idx), -1) + +# return np.expand_dims(mask, axis=2) diff --git a/geti_sdk/deployment/utils.py b/geti_sdk/deployment/utils.py index 78307e51..458aafc6 100644 --- a/geti_sdk/deployment/utils.py +++ b/geti_sdk/deployment/utils.py @@ -14,6 +14,7 @@ import re from importlib import resources +from typing import Tuple from pathvalidate import sanitize_filepath @@ -86,3 +87,18 @@ def target_device_is_ovms(device: str) -> bool: r"^((https?://)|(www.))(?:([a-zA-Z]+)|(\d+\.\d+\.\d+\.\d+)):\d{1,5}?$" ) return server_pattern.match(device) is not None + + +def rgb_to_hex(rgb: Tuple[int, int, int]) -> str: + """ + Convert an RGB color value to its corresponding hexadecimal representation. + + :param rgb: A tuple representing the RGB color value, where each element is an integer between 0 and 255. + :return: The hexadecimal representation of the RGB color value. + + _Example: + + >>> rgb_to_hex((255, 0, 0)) + '#ff0000' + """ + return "#{:02x}{:02x}{:02x}".format(rgb[0], rgb[1], rgb[2])