1import base64
- 2from collections import defaultdict
- 3from pathlib import Path
- 4from typing import Iterable, List, Dict
- 5
- 6from otlmow_model.OtlmowModel.BaseClasses.OTLObject import OTLObject, create_dict_from_asset, get_attribute_by_name, \
- 7 dynamic_create_instance_from_ns_and_name
- 8from otlmow_model.OtlmowModel.Helpers.GenericHelper import validate_guid
- 9from otlmow_model.OtlmowModel.Helpers.generated_lists import get_hardcoded_relation_dict
- 10
+ 2import warnings
+ 3from collections import defaultdict
+ 4from pathlib import Path
+ 5from typing import Iterable, List, Dict
+ 6
+ 7from otlmow_model.OtlmowModel.BaseClasses.OTLObject import OTLObject, create_dict_from_asset, get_attribute_by_name, \
+ 8 dynamic_create_instance_from_ns_and_name
+ 9from otlmow_model.OtlmowModel.Helpers.GenericHelper import validate_guid
+ 10from otlmow_model.OtlmowModel.Helpers.generated_lists import get_hardcoded_relation_dict
11
- 12def count_assets_by_type(objects: Iterable[OTLObject]) -> defaultdict:
- 13 d = defaultdict(int)
- 14 for i in objects:
- 15 d[i.typeURI] += 1
- 16 return d
- 17
+ 12
+ 13def count_assets_by_type(objects: Iterable[OTLObject]) -> defaultdict:
+ 14 d = defaultdict(int)
+ 15 for i in objects:
+ 16 d[i.typeURI] += 1
+ 17 return d
18
- 19def print_overview_assets(objects: Iterable[OTLObject]) -> None:
- 20 for k, v in count_assets_by_type(objects).items():
- 21 print(f'counting {str(v)} assets of type {k}')
- 22
+ 19
+ 20def print_overview_assets(objects: Iterable[OTLObject]) -> None:
+ 21 for k, v in count_assets_by_type(objects).items():
+ 22 print(f'counting {str(v)} assets of type {k}')
23
- 24# TODO move to converter (this is using a dotnotation)
- 25def remove_duplicates_in_iterable_based_on_property(iterable: Iterable[OTLObject], property_name: str) -> []:
- 26 d = {}
- 27 for asset in iterable:
- 28 item = asset
- 29 last = property_name
- 30 while '.' in last:
- 31 first = last.split('.')[0]
- 32 last = last.split('.', 1)[1]
- 33 if hasattr(item, first):
- 34 item = getattr(item, first)
- 35 if isinstance(item, list):
- 36 raise NotImplementedError(
- 37 "can't use remove_duplicates_in_iterable_based_on_property() when the property value is a list")
- 38 if hasattr(item, last):
- 39 item_prop = getattr(item, last)
- 40 if isinstance(item_prop, list):
- 41 raise NotImplementedError(
- 42 "can't use remove_duplicates_in_iterable_based_on_property() when the property value is a list")
- 43 item_prop_str = str(item_prop)
- 44 if item_prop_str not in d:
- 45 d[item_prop_str] = asset
- 46 return list(d.values())
- 47
+ 24
+ 25# TODO move to converter (this is using a dotnotation)
+ 26def remove_duplicates_in_iterable_based_on_property(iterable: Iterable[OTLObject], property_name: str) -> []:
+ 27 d = {}
+ 28 for asset in iterable:
+ 29 item = asset
+ 30 last = property_name
+ 31 while '.' in last:
+ 32 first = last.split('.')[0]
+ 33 last = last.split('.', 1)[1]
+ 34 if hasattr(item, first):
+ 35 item = getattr(item, first)
+ 36 if isinstance(item, list):
+ 37 raise NotImplementedError(
+ 38 "can't use remove_duplicates_in_iterable_based_on_property() when the property value is a list")
+ 39 if hasattr(item, last):
+ 40 item_prop = getattr(item, last)
+ 41 if isinstance(item_prop, list):
+ 42 raise NotImplementedError(
+ 43 "can't use remove_duplicates_in_iterable_based_on_property() when the property value is a list")
+ 44 item_prop_str = str(item_prop)
+ 45 if item_prop_str not in d:
+ 46 d[item_prop_str] = asset
+ 47 return list(d.values())
48
- 49def compare_two_lists_of_objects_object_level(first_list: List[OTLObject], second_list: List[OTLObject],
- 50 model_directory=None) -> List:
- 51 """Given two lists of objects return the differences from the second list compared to the first list.
- 52 Returns full objects from the second list when unmatched with the first list. """
- 53 l1 = list(map(lambda x: create_dict_from_asset(x), first_list))
- 54 l2 = list(map(lambda x: create_dict_from_asset(x), second_list))
- 55 diff_list = [d for d in l2 if d not in l1]
- 56 return list(map(lambda x: OTLObject.from_dict(x, model_directory), diff_list))
- 57
+ 49
+ 50def compare_two_lists_of_objects_object_level(first_list: List[OTLObject], second_list: List[OTLObject],
+ 51 model_directory=None) -> List:
+ 52 """Given two lists of objects return the differences from the second list compared to the first list.
+ 53 Returns full objects from the second list when unmatched with the first list. """
+ 54 l1 = list(map(lambda x: create_dict_from_asset(x), first_list))
+ 55 l2 = list(map(lambda x: create_dict_from_asset(x), second_list))
+ 56 diff_list = [d for d in l2 if d not in l1]
+ 57 return list(map(lambda x: OTLObject.from_dict(x, model_directory), diff_list))
58
- 59def custom_dict_diff(first_dict, second_dict):
- 60 diff_dict = {}
- 61 for k, v in second_dict.items():
- 62 orig_v = first_dict.get(k)
- 63 if orig_v is None:
- 64 diff_dict[k] = v
- 65 continue
- 66 if orig_v != v:
- 67 if isinstance(v, dict) and isinstance(orig_v, dict):
- 68 result_dict = custom_dict_diff(orig_v, v)
- 69 if result_dict != {}:
- 70 diff_dict[k] = custom_dict_diff(orig_v, v)
- 71 else:
- 72 diff_dict[k] = v
- 73 return diff_dict
- 74
+ 59
+ 60def custom_dict_diff(first_dict, second_dict):
+ 61 diff_dict = {}
+ 62 for k, v in second_dict.items():
+ 63 orig_v = first_dict.get(k)
+ 64 if orig_v is None:
+ 65 diff_dict[k] = v
+ 66 continue
+ 67 if orig_v != v:
+ 68 if isinstance(v, dict) and isinstance(orig_v, dict):
+ 69 result_dict = custom_dict_diff(orig_v, v)
+ 70 if result_dict != {}:
+ 71 diff_dict[k] = custom_dict_diff(orig_v, v)
+ 72 else:
+ 73 diff_dict[k] = v
+ 74 return diff_dict
75
- 76def compare_two_lists_of_objects_attribute_level(first_list: List[OTLObject], second_list: List[OTLObject],
- 77 model_directory: Path = None) -> List:
- 78 """
- 79 Given two lists of objects return the differences from the second list compared to the first list.
- 80 Assumes both lists have objects with a unique assetId. Returns partial objects (on attribute level)
- 81 from the second list when unmatched with the first list. """
- 82 if model_directory is None:
- 83 current_file_path = Path(__file__)
- 84 model_directory = current_file_path.parent.parent.parent
- 85
- 86 l1 = list(map(lambda x: create_dict_from_asset(x), first_list))
- 87 verify_asset_id_is_unique_within_list(l1)
- 88
- 89 l2 = list(map(lambda x: create_dict_from_asset(x), second_list))
- 90 verify_asset_id_is_unique_within_list(l2)
- 91
- 92 l1_dict_list = {dict_asset['assetId']['identificator']: dict_asset for dict_asset in l1}
- 93 l1_dict_list_keys = list(l1_dict_list.keys())
- 94
- 95 diff_list = []
- 96 for d in l2:
- 97 asset_id = d['assetId']['identificator']
- 98 if asset_id not in l1_dict_list_keys:
- 99 diff_list.append(d)
- 100 continue
- 101 orig_dict = l1_dict_list[asset_id]
- 102 if orig_dict == d:
- 103 continue
- 104 diff_dict = custom_dict_diff(orig_dict, d)
- 105 if diff_dict == {}:
- 106 continue
- 107 diff_dict['assetId'] = {'identificator': asset_id}
- 108 diff_dict['typeURI'] = orig_dict['typeURI']
- 109 diff_list.append(diff_dict)
- 110
- 111 return list(map(lambda x: OTLObject.from_dict(x, model_directory), diff_list))
- 112
+ 76
+ 77def compare_two_lists_of_objects_attribute_level(first_list: List[OTLObject], second_list: List[OTLObject],
+ 78 model_directory: Path = None) -> List:
+ 79 """
+ 80 Given two lists of objects return the differences from the second list compared to the first list.
+ 81 Assumes both lists have objects with a unique assetId. Returns partial objects (on attribute level)
+ 82 from the second list when unmatched with the first list. """
+ 83 if model_directory is None:
+ 84 current_file_path = Path(__file__)
+ 85 model_directory = current_file_path.parent.parent.parent
+ 86
+ 87 l1 = list(map(lambda x: create_dict_from_asset(x), first_list))
+ 88 verify_asset_id_is_unique_within_list(l1)
+ 89
+ 90 l2 = list(map(lambda x: create_dict_from_asset(x), second_list))
+ 91 verify_asset_id_is_unique_within_list(l2)
+ 92
+ 93 l1_dict_list = {dict_asset['assetId']['identificator']: dict_asset for dict_asset in l1}
+ 94 l1_dict_list_keys = list(l1_dict_list.keys())
+ 95
+ 96 diff_list = []
+ 97 for d in l2:
+ 98 asset_id = d['assetId']['identificator']
+ 99 if asset_id not in l1_dict_list_keys:
+ 100 diff_list.append(d)
+ 101 continue
+ 102 orig_dict = l1_dict_list[asset_id]
+ 103 if orig_dict == d:
+ 104 continue
+ 105 diff_dict = custom_dict_diff(orig_dict, d)
+ 106 if diff_dict == {}:
+ 107 continue
+ 108 diff_dict['assetId'] = {'identificator': asset_id}
+ 109 diff_dict['typeURI'] = orig_dict['typeURI']
+ 110 diff_list.append(diff_dict)
+ 111
+ 112 return list(map(lambda x: OTLObject.from_dict(x, model_directory), diff_list))
113
- 114def verify_asset_id_is_unique_within_list(dict_list: List[Dict]) -> bool:
- 115 d = {}
- 116 for asset_dict in dict_list:
- 117 asset_id = asset_dict['assetId']['identificator']
- 118 if asset_id is None:
- 119 raise ValueError(f'This list has a None value for assetId for at least one asset in this list:\n'
- 120 f'{asset_dict}')
- 121 asset_d = d.get(asset_id)
- 122 if asset_d is not None:
- 123 raise ValueError(f"There are non-unique assetId's in assets in this list: {asset_id}")
- 124 d[asset_dict['assetId']['identificator']] = asset_dict
- 125 return True
- 126
+ 114
+ 115def verify_asset_id_is_unique_within_list(dict_list: List[Dict]) -> bool:
+ 116 d = {}
+ 117 for asset_dict in dict_list:
+ 118 asset_id = asset_dict['assetId']['identificator']
+ 119 if asset_id is None:
+ 120 raise ValueError(f'This list has a None value for assetId for at least one asset in this list:\n'
+ 121 f'{asset_dict}')
+ 122 asset_d = d.get(asset_id)
+ 123 if asset_d is not None:
+ 124 raise ValueError(f"There are non-unique assetId's in assets in this list: {asset_id}")
+ 125 d[asset_dict['assetId']['identificator']] = asset_dict
+ 126 return True
127
- 128def is_relation(otl_object: OTLObject, model_directory=Path(__file__).parent.parent.parent) -> bool:
- 129 type_uri = otl_object.typeURI
- 130 relation_dict = get_hardcoded_relation_dict(model_directory=model_directory)
- 131 if type_uri in relation_dict:
- 132 return True
- 133
+ 128
+ 129def is_relation(otl_object: OTLObject, model_directory=Path(__file__).parent.parent.parent) -> bool:
+ 130 type_uri = otl_object.typeURI
+ 131 relation_dict = get_hardcoded_relation_dict(model_directory=model_directory)
+ 132 if type_uri in relation_dict:
+ 133 return True
134
- 135def is_directional_relation(otl_object: OTLObject, model_directory=Path(__file__).parent.parent.parent) -> bool:
- 136 type_uri = otl_object.typeURI
- 137 relation_dict = get_hardcoded_relation_dict(model_directory=model_directory)
- 138 relation_info = relation_dict.get(type_uri)
- 139 if relation_info is None:
- 140 return False
- 141 return relation_dict[type_uri]['directional']
- 142
+ 135
+ 136def is_directional_relation(otl_object: OTLObject, model_directory=Path(__file__).parent.parent.parent) -> bool:
+ 137 type_uri = otl_object.typeURI
+ 138 relation_dict = get_hardcoded_relation_dict(model_directory=model_directory)
+ 139 relation_info = relation_dict.get(type_uri)
+ 140 if relation_info is None:
+ 141 return False
+ 142 return relation_dict[type_uri]['directional']
143
- 144def combine_two_asset_instances(asset1: OTLObject, asset2: OTLObject, allow_attribute_overrides: bool = True) -> OTLObject:
- 145 if asset1 is None:
- 146 raise ValueError('asset1 is None')
- 147 if asset2 is None:
- 148 raise ValueError('asset2 is None')
- 149 if asset1.assetId.identificator is None:
- 150 raise ValueError('asset1 has no assetId.identificator')
- 151 if asset2.assetId.identificator is None:
- 152 raise ValueError('asset2 has no assetId.identificator')
- 153 if asset1.assetId.identificator != asset2.assetId.identificator:
- 154 raise ValueError('asset1 and asset2 have different assetId.identificator values')
- 155
- 156 for attr in asset2:
- 157 attribute = get_attribute_by_name(asset1, attr.naam)
- 158 if attribute is None:
- 159 raise ValueError(f'attribute {attr.naam} does not exist in asset1')
- 160 if attribute.objectUri == 'https://wegenenverkeer.data.vlaanderen.be/ns/implementatieelement#AIMObject.assetId':
- 161 continue
- 162 if attribute.waarde is not None and not allow_attribute_overrides and attribute.waarde != attr.waarde:
- 163 raise ValueError(f'attribute {attr.naam} already has a value in asset1 that is different {attribute.waarde}')
- 164 if attr.waarde is None:
- 165 continue
- 166 attribute.waarde = attr.waarde
- 167 return asset1
- 168
- 169def combine_assets(asset_list: list[OTLObject], allow_attribute_overrides: bool = True) -> list[OTLObject]:
- 170 if len(asset_list) < 2:
- 171 raise ValueError('asset_list has less than 2 assets')
- 172
- 173 grouped_list = {}
- 174 for asset in asset_list:
- 175 if asset.assetId.identificator not in grouped_list:
- 176 grouped_list[asset.assetId.identificator] = []
- 177 grouped_list[asset.assetId.identificator].append(asset)
- 178
- 179 combined_list = []
- 180 for assets in grouped_list.values():
- 181 asset1 = assets[0]
- 182 for asset in assets[1:]:
- 183 asset1 = combine_two_asset_instances(asset1, asset, allow_attribute_overrides)
- 184 combined_list.append(asset1)
- 185
- 186 return combined_list
- 187
+ 144
+ 145def combine_two_asset_instances(asset1: OTLObject, asset2: OTLObject, allow_attribute_overrides: bool = True) -> OTLObject:
+ 146 if asset1 is None:
+ 147 raise ValueError('asset1 is None')
+ 148 if asset2 is None:
+ 149 raise ValueError('asset2 is None')
+ 150 if asset1.assetId.identificator is None:
+ 151 raise ValueError('asset1 has no assetId.identificator')
+ 152 if asset2.assetId.identificator is None:
+ 153 raise ValueError('asset2 has no assetId.identificator')
+ 154 if asset1.assetId.identificator != asset2.assetId.identificator:
+ 155 raise ValueError('asset1 and asset2 have different assetId.identificator values')
+ 156
+ 157 for attr in asset2:
+ 158 attribute = get_attribute_by_name(asset1, attr.naam)
+ 159 if attribute is None:
+ 160 raise ValueError(f'attribute {attr.naam} does not exist in asset1')
+ 161 if attribute.objectUri == 'https://wegenenverkeer.data.vlaanderen.be/ns/implementatieelement#AIMObject.assetId':
+ 162 continue
+ 163 if attribute.waarde is not None and not allow_attribute_overrides and attribute.waarde != attr.waarde:
+ 164 raise ValueError(f'attribute {attr.naam} already has a value in asset1 that is different {attribute.waarde}')
+ 165 if attr.waarde is None:
+ 166 continue
+ 167 attribute.waarde = attr.waarde
+ 168 return asset1
+ 169
+ 170def combine_assets(asset_list: list[OTLObject], allow_attribute_overrides: bool = True) -> list[OTLObject]:
+ 171 if len(asset_list) < 2:
+ 172 raise ValueError('asset_list has less than 2 assets')
+ 173
+ 174 grouped_list = {}
+ 175 for asset in asset_list:
+ 176 if asset.assetId.identificator not in grouped_list:
+ 177 grouped_list[asset.assetId.identificator] = []
+ 178 grouped_list[asset.assetId.identificator].append(asset)
+ 179
+ 180 combined_list = []
+ 181 for assets in grouped_list.values():
+ 182 asset1 = assets[0]
+ 183 for asset in assets[1:]:
+ 184 asset1 = combine_two_asset_instances(asset1, asset, allow_attribute_overrides)
+ 185 combined_list.append(asset1)
+ 186
+ 187 return combined_list
188
- 189def is_aim_id(aim_id: str, model_directory: Path = None) -> bool:
- 190 """
- 191 Checks if the aim_id is valid. An aim_id is valid if it is a valid guid followed by a base64 encoded shortened uri.
- 192 :param aim_id: the aim_id to be validated
- 193 :type aim_id: str
- 194 :param model_directory: directory where the model is located, defaults to otlmow_model's own model
- 195 :type model_directory: Path
- 196 :return: 'True' if the aim_id is valid, 'False' otherwise
- 197 :rtype: bool
- 198 """
- 199 if not aim_id:
- 200 return False
- 201 if len(aim_id) < 38:
- 202 return False
- 203
- 204 uuid = aim_id[:36]
- 205 short_uri_encoded = aim_id[37:]
- 206
- 207 if not validate_guid(uuid):
- 208 return False
- 209 if not short_uri_encoded:
- 210 return False
- 211
- 212 if missing_padding := len(short_uri_encoded) % 4:
- 213 short_uri_encoded += '=' * (4 - missing_padding)
- 214
- 215 short_uri_bytes = base64.b64decode(short_uri_encoded)
- 216 try:
- 217 short_uri = short_uri_bytes.decode('ascii')
- 218 except UnicodeDecodeError:
- 219 return False
- 220 try:
- 221 if short_uri == 'purl:Agent':
- 222 return True
- 223 ns, name = short_uri.split('#')
- 224 instance = dynamic_create_instance_from_ns_and_name(ns, name, model_directory=model_directory)
- 225 return instance is not None
- 226 except ValueError:
- 227 return False
+ 189
+ 190def is_aim_id(aim_id: str, model_directory: Path = None) -> bool:
+ 191 """
+ 192 Checks if the aim_id is valid. An aim_id is valid if it is a valid guid followed by a base64 encoded shortened uri.
+ 193 :param aim_id: the aim_id to be validated
+ 194 :type aim_id: str
+ 195 :param model_directory: directory where the model is located, defaults to otlmow_model's own model
+ 196 :type model_directory: Path
+ 197 :return: 'True' if the aim_id is valid, 'False' otherwise
+ 198 :rtype: bool
+ 199 """
+ 200 if not aim_id:
+ 201 return False
+ 202 if len(aim_id) < 38:
+ 203 return False
+ 204
+ 205 uuid = aim_id[:36]
+ 206 short_uri_encoded = aim_id[37:]
+ 207
+ 208 if not validate_guid(uuid):
+ 209 return False
+ 210 if not short_uri_encoded:
+ 211 return False
+ 212
+ 213 if missing_padding := len(short_uri_encoded) % 4:
+ 214 short_uri_encoded += '=' * (4 - missing_padding)
+ 215
+ 216 short_uri_bytes = base64.b64decode(short_uri_encoded)
+ 217 try:
+ 218 short_uri = short_uri_bytes.decode('ascii')
+ 219 except UnicodeDecodeError:
+ 220 return False
+ 221 try:
+ 222 if short_uri == 'purl:Agent':
+ 223 return True
+ 224 ns, name = short_uri.split('#')
+ 225 instance = dynamic_create_instance_from_ns_and_name(ns, name, model_directory=model_directory)
+ 226 return instance is not None
+ 227 except ModuleNotFoundError:
+ 228 warnings.warn('Could not import the module for the given aim_id, did you forget the model_directory?', category=ImportWarning)
+ 229 return False
+ 230 except ValueError:
+ 231 return False