diff --git a/hed/errors/exceptions.py b/hed/errors/exceptions.py index 63e676c5..120b75d6 100644 --- a/hed/errors/exceptions.py +++ b/hed/errors/exceptions.py @@ -40,6 +40,7 @@ class HedExceptions: HED_SCHEMA_NODE_NAME_INVALID = 'HED_SCHEMA_NODE_NAME_INVALID' SCHEMA_DUPLICATE_PREFIX = 'schemaDuplicatePrefix' + SCHEMA_DUPLICATE_LIBRARY = "SCHEMA_LIBRARY_INVALID" BAD_COLUMN_NAMES = 'BAD_COLUMN_NAMES' diff --git a/hed/schema/hed_schema.py b/hed/schema/hed_schema.py index d688f728..cde9ea52 100644 --- a/hed/schema/hed_schema.py +++ b/hed/schema/hed_schema.py @@ -59,10 +59,19 @@ def library(self): Returns: str: Library name if any. - """ return self.header_attributes.get(constants.LIBRARY_ATTRIBUTE, "") + def can_save(self): + """ Returns if it's legal to save this schema. + + You cannot save schemas loaded as merged from multiple library schemas. + + Returns: + bool: True if this can be saved + """ + return not self.library or "," not in self.library + @property def with_standard(self): """ The version of the base schema this is extended from, if it exists.. @@ -738,10 +747,14 @@ def _get_attributes_for_section(self, key_class): def _add_tag_to_dict(self, long_tag_name, new_entry, key_class): # Add the InLibrary attribute to any library schemas as they are loaded # These are later removed when they are saved out, if saving unmerged - if self.library and (not self.with_standard or (not self.merged and self.with_standard)): - # only add it if not already present - This is a rare case - if not new_entry.has_attribute(HedKey.InLibrary): - new_entry._set_attribute_value(HedKey.InLibrary, self.library) + # if self.library and (not self.with_standard or (not self.merged and self.with_standard)): + # # only add it if not already present - This is a rare case + # Todo ian: I think this should be moved up one level for parity with the other loading changes + # .library will be updated to potentially be a list + # Cannot save schema if .library is a list + # + # if not new_entry.has_attribute(HedKey.InLibrary): + # new_entry._set_attribute_value(HedKey.InLibrary, self.library) section = self._sections[key_class] return section._add_to_dict(long_tag_name, new_entry) diff --git a/hed/schema/hed_schema_io.py b/hed/schema/hed_schema_io.py index 8f6bf62f..e7e2370d 100644 --- a/hed/schema/hed_schema_io.py +++ b/hed/schema/hed_schema_io.py @@ -10,18 +10,21 @@ from hed.schema.schema_io import schema_util from hed.schema.hed_schema_group import HedSchemaGroup from hed.schema.schema_validation_util import validate_version_string +from collections import defaultdict -MAX_MEMORY_CACHE = 20 +MAX_MEMORY_CACHE = 40 -def from_string(schema_string, schema_format=".xml", schema_namespace=None): +def from_string(schema_string, schema_format=".xml", schema_namespace=None, schema=None): """ Create a schema from the given string. Parameters: schema_string (str): An XML or mediawiki file as a single long string. schema_format (str): The schema format of the source schema string. schema_namespace (str, None): The name_prefix all tags in this schema will accept. + schema(HedSchema or None): A hed schema to merge this new file into + It must be a with-standard schema with the same value. Returns: (HedSchema): The loaded schema. @@ -39,9 +42,9 @@ def from_string(schema_string, schema_format=".xml", schema_namespace=None): filename=schema_string) if schema_format.endswith(".xml"): - hed_schema = SchemaLoaderXML.load(schema_as_string=schema_string) + hed_schema = SchemaLoaderXML.load(schema_as_string=schema_string, schema=schema) elif schema_format.endswith(".mediawiki"): - hed_schema = SchemaLoaderWiki.load(schema_as_string=schema_string) + hed_schema = SchemaLoaderWiki.load(schema_as_string=schema_string, schema=schema) else: raise HedFileError(HedExceptions.INVALID_EXTENSION, "Unknown schema extension", filename=schema_format) @@ -51,12 +54,14 @@ def from_string(schema_string, schema_format=".xml", schema_namespace=None): return hed_schema -def load_schema(hed_path=None, schema_namespace=None): +def load_schema(hed_path=None, schema_namespace=None, schema=None): """ Load a schema from the given file or URL path. Parameters: hed_path (str or None): A filepath or url to open a schema from. schema_namespace (str or None): The name_prefix all tags in this schema will accept. + schema(HedSchema or None): A hed schema to merge this new file into + It must be a with-standard schema with the same value. Returns: HedSchema: The loaded schema. @@ -77,9 +82,9 @@ def load_schema(hed_path=None, schema_namespace=None): file_as_string = schema_util.url_to_string(hed_path) hed_schema = from_string(file_as_string, schema_format=os.path.splitext(hed_path.lower())[1]) elif hed_path.lower().endswith(".xml"): - hed_schema = SchemaLoaderXML.load(hed_path) + hed_schema = SchemaLoaderXML.load(hed_path, schema=schema) elif hed_path.lower().endswith(".mediawiki"): - hed_schema = SchemaLoaderWiki.load(hed_path) + hed_schema = SchemaLoaderWiki.load(hed_path, schema=schema) else: raise HedFileError(HedExceptions.INVALID_EXTENSION, "Unknown schema extension", filename=hed_path) @@ -111,7 +116,11 @@ def _load_schema_version(xml_version=None, xml_folder=None): """ Return specified version or latest if not specified. Parameters: - xml_version (str): HED version format string. Expected format: '[schema_namespace:][library_name_]X.Y.Z'. + xml_version (str): HED version format string. Expected format: '[schema_namespace:][library_name_][X.Y.Z]' + Further versions can be added comma separated after the version number/library name. + e.g. "lib:library,otherlibrary" will load "library" and "otherlibrary" into "lib:" + The schema namespace must be the same and not repeated if loading multiple merged schemas. + xml_folder (str): Path to a folder containing schema. Returns: @@ -124,10 +133,44 @@ def _load_schema_version(xml_version=None, xml_folder=None): - The prefix is invalid """ schema_namespace = "" - library_name = None if xml_version: if ":" in xml_version: schema_namespace, _, xml_version = xml_version.partition(":") + + if xml_version: + xml_versions = xml_version.split(",") + # Add a blank entry if we have no xml version + else: + xml_versions = [""] + + first_schema = _load_schema_version_sub(schema_namespace, xml_versions[0], xml_folder=xml_folder) + for version in xml_versions[1:]: + _load_schema_version_sub(schema_namespace, version, xml_folder=xml_folder, schema=first_schema) + return first_schema + + +def _load_schema_version_sub(schema_namespace="", xml_version=None, xml_folder=None, schema=None): + """ Return specified version or latest if not specified. + + Parameters: + xml_version (str): HED version format string. Expected format: '[schema_namespace:][library_name_][X.Y.Z]' + + xml_folder (str): Path to a folder containing schema. + schema(HedSchema or None): A hed schema to merge this new file into + It must be a with-standard schema with the same value. + + Returns: + HedSchema: The requested HedSchema object. + + :raises HedFileError: + - The xml_version is not valid. + - The specified version cannot be found or loaded + - Other fatal errors loading the schema (These are unlikely if you are not editing them locally) + - The prefix is invalid + """ + library_name = None + + if xml_version: if "_" in xml_version: library_name, _, xml_version = xml_version.rpartition("_") elif validate_version_string(xml_version): @@ -138,7 +181,7 @@ def _load_schema_version(xml_version=None, xml_folder=None): if not final_hed_xml_file: hed_cache.cache_local_versions(xml_folder) final_hed_xml_file = hed_cache.get_hed_version_path(xml_version, library_name, xml_folder) - hed_schema = load_schema(final_hed_xml_file) + hed_schema = load_schema(final_hed_xml_file, schema=schema) except HedFileError as e: if e.code == HedExceptions.FILE_NOT_FOUND: hed_cache.cache_xml_versions(cache_folder=xml_folder) @@ -147,7 +190,7 @@ def _load_schema_version(xml_version=None, xml_folder=None): raise HedFileError(HedExceptions.FILE_NOT_FOUND, f"HED version '{xml_version}' not found in cache: {hed_cache.get_cache_directory()}", filename=xml_folder) - hed_schema = load_schema(final_hed_xml_file) + hed_schema = load_schema(final_hed_xml_file, schema=schema) else: raise e @@ -158,14 +201,14 @@ def _load_schema_version(xml_version=None, xml_folder=None): def load_schema_version(xml_version=None, xml_folder=None): - """ Return a HedSchema or HedSchemaGroup extracted from xml_version field. + """ Return a HedSchema or HedSchemaGroup extracted from xml_version Parameters: xml_version (str or list or None): List or str specifying which official HED schemas to use. An empty string returns the latest version A json str format is also supported, based on the output of HedSchema.get_formatted_version - Basic format: '[schema_namespace:][library_name_]X.Y.Z'. + Basic format: '[schema_namespace:][library_name_][X.Y.Z]'. xml_folder (str): Path to a folder containing schema. Returns: @@ -185,10 +228,41 @@ def load_schema_version(xml_version=None, xml_folder=None): except json.decoder.JSONDecodeError as e: raise HedFileError(HedExceptions.CANNOT_PARSE_JSON, str(e), xml_version) from e if xml_version and isinstance(xml_version, list): - schemas = [_load_schema_version(xml_version=version, xml_folder=xml_folder) for version in xml_version] + xml_versions = parse_version_list(xml_version) + schemas = [_load_schema_version(xml_version=version, xml_folder=xml_folder) for version in xml_versions.values()] if len(schemas) == 1: return schemas[0] return HedSchemaGroup(schemas) else: return _load_schema_version(xml_version=xml_version, xml_folder=xml_folder) + + +def parse_version_list(xml_version_list): + """Takes a list of xml versions and returns a dictionary split by prefix + + e.g. ["score", "testlib"] will return {"": "score, testlib"} + e.g. ["score", "testlib", "ol:otherlib"] will return {"": "score, testlib", "ol:": "otherlib"} + + Parameters: + xml_version_list (list): List of str specifying which hed schemas to use + + Returns: + HedSchema or HedSchemaGroup: The schema or schema group extracted. + """ + out_versions = defaultdict(list) + for version in xml_version_list: + schema_namespace = "" + if version and ":" in version: + schema_namespace, _, version = version.partition(":") + + if version is None: + version = "" + if version in out_versions[schema_namespace]: + raise HedFileError(HedExceptions.SCHEMA_DUPLICATE_LIBRARY, f"Attempting to load the same library '{version}' twice: {out_versions[schema_namespace]}", + filename=None) + out_versions[schema_namespace].append(version) + + out_versions = {key: ",".join(value) if not key else f"{key}:" + ",".join(value) for key, value in out_versions.items()} + + return out_versions \ No newline at end of file diff --git a/hed/schema/schema_attribute_validators.py b/hed/schema/schema_attribute_validators.py index f53157bb..c08a11a2 100644 --- a/hed/schema/schema_attribute_validators.py +++ b/hed/schema/schema_attribute_validators.py @@ -208,7 +208,7 @@ def in_library_check(hed_schema, tag_entry, attribute_name): issues = [] library = tag_entry.attributes.get(attribute_name, "") - if hed_schema.library != library: + if library not in hed_schema.library.split(","): issues += ErrorHandler.format_error(SchemaAttributeErrors.SCHEMA_IN_LIBRARY_INVALID, tag_entry.name, library) diff --git a/hed/schema/schema_io/base2schema.py b/hed/schema/schema_io/base2schema.py index e3c4a351..3bbac873 100644 --- a/hed/schema/schema_io/base2schema.py +++ b/hed/schema/schema_io/base2schema.py @@ -1,8 +1,10 @@ import copy from hed.errors.exceptions import HedFileError, HedExceptions from hed.schema import HedSchema +from hed.schema.hed_schema_constants import HedKey from abc import abstractmethod, ABC from hed.schema import schema_validation_util +from hed.schema import hed_schema_constants class SchemaLoader(ABC): @@ -12,20 +14,21 @@ class SchemaLoader(ABC): SchemaLoaderXML(filename) will load just the header_attributes """ - def __init__(self, filename, schema_as_string=None): + def __init__(self, filename, schema_as_string=None, schema=None): """Loads the given schema from one of the two parameters. Parameters: filename(str or None): A valid filepath or None schema_as_string(str or None): A full schema as text or None + schema(HedSchema or None): A hed schema to merge this new file into + It must be a with-standard schema with the same value. """ if schema_as_string and filename: raise HedFileError(HedExceptions.BAD_PARAMETERS, "Invalid parameters to schema creation.", filename) - self.filename = filename self.schema_as_string = schema_as_string - + self.appending_to_schema = False try: self.input_data = self._open_file() except OSError as e: @@ -34,11 +37,28 @@ def __init__(self, filename, schema_as_string=None): raise HedFileError(HedExceptions.FILE_NOT_FOUND, str(e), filename) except ValueError as e: raise HedFileError(HedExceptions.FILE_NOT_FOUND, str(e), filename) - - self._schema = HedSchema() - self._schema.filename = filename + + # self._schema.filename = filename hed_attributes = self._get_header_attributes(self.input_data) schema_validation_util.validate_attributes(hed_attributes, filename=self.filename) + + withStandard = hed_attributes.get(hed_schema_constants.WITH_STANDARD_ATTRIBUTE, "") + self.library = hed_attributes.get(hed_schema_constants.LIBRARY_ATTRIBUTE, "") + if not schema: + self._schema = HedSchema() + else: + self._schema = schema + self.appending_to_schema = True + if not self._schema.with_standard: + raise HedFileError(HedExceptions.SCHEMA_DUPLICATE_PREFIX, + "Trying to load multiple normal schemas as a merged one with the same namespace. " + "Ensure schemas have the withStandard header attribute set", + self.filename) + elif withStandard != self._schema.with_standard: + raise HedFileError(HedExceptions.BAD_WITH_STANDARD_VERSION, + "When merging two schemas without a schema namespace, you they must have the same withStandard value.", self.filename) + hed_attributes[hed_schema_constants.LIBRARY_ATTRIBUTE] = self._schema.library + f",{self.library}" + self._schema.filename = filename self._schema.header_attributes = hed_attributes self._loading_merged = False @@ -48,16 +68,19 @@ def schema(self): return self._schema @classmethod - def load(cls, filename=None, schema_as_string=None): + def load(cls, filename=None, schema_as_string=None, schema=None): """ Loads and returns the schema, including partnered schema if applicable. Parameters: filename(str or None): A valid filepath or None schema_as_string(str or None): A full schema as text or None + schema(HedSchema or None): A hed schema to merge this new file into + It must be a with-standard schema with the same value. + Returns: schema(HedSchema): The new schema """ - loader = cls(filename, schema_as_string) + loader = cls(filename, schema_as_string, schema) return loader._load() def _load(self): @@ -68,7 +91,7 @@ def _load(self): """ self._loading_merged = True # Do a full load of the standard schema if this is a partnered schema - if self._schema.with_standard and not self._schema.merged: + if not self.appending_to_schema and self._schema.with_standard and not self._schema.merged: from hed.schema.hed_schema_io import load_schema_version saved_attr = self._schema.header_attributes try: @@ -102,3 +125,14 @@ def _get_header_attributes(self, input_data): def _parse_data(self): """Puts the input data into the new schema""" pass + + def _add_to_dict_base(self, entry, key_class): + if not entry.has_attribute(HedKey.InLibrary) and self.appending_to_schema and self._schema.merged: + return None + + if self.library and (not self._schema.with_standard or (not self._schema.merged and self._schema.with_standard)): + # only add it if not already present - This is a rare case + if not entry.has_attribute(HedKey.InLibrary): + entry._set_attribute_value(HedKey.InLibrary, self.library) + + return self._schema._add_tag_to_dict(entry.name, entry, key_class) \ No newline at end of file diff --git a/hed/schema/schema_io/schema2base.py b/hed/schema/schema_io/schema2base.py index d9d082a1..c54e9b97 100644 --- a/hed/schema/schema_io/schema2base.py +++ b/hed/schema/schema_io/schema2base.py @@ -1,5 +1,6 @@ """Baseclass for mediawiki/xml writers""" from hed.schema.hed_schema_constants import HedSectionKey, HedKey +from hed.errors.exceptions import HedFileError, HedExceptions class Schema2Base: @@ -29,6 +30,10 @@ def process_schema(cls, hed_schema, save_merged=False): Varies based on inherited class """ + if not hed_schema.can_save(): + raise HedFileError(HedExceptions.SCHEMA_LIBRARY_INVALID, + "Cannot save a schema merged from multiple library schemas", + hed_schema.filename) saver = cls() saver._save_lib = False saver._save_base = False diff --git a/hed/schema/schema_io/wiki2schema.py b/hed/schema/schema_io/wiki2schema.py index de18f9d6..b4547831 100644 --- a/hed/schema/schema_io/wiki2schema.py +++ b/hed/schema/schema_io/wiki2schema.py @@ -21,8 +21,6 @@ no_wiki_start_tag = '' no_wiki_end_tag = '' - - required_sections = [ HedWikiSection.Prologue, HedWikiSection.Schema, @@ -44,8 +42,9 @@ class SchemaLoaderWiki(SchemaLoader): SchemaLoaderWiki(filename) will load just the header_attributes """ - def __init__(self, filename, schema_as_string=None): - super().__init__(filename, schema_as_string) + + def __init__(self, filename, schema_as_string=None, schema=None): + super().__init__(filename, schema_as_string, schema) self.fatal_errors = [] def _open_file(self): @@ -114,7 +113,7 @@ def _read_header_section(self, lines): for line_number, line in lines: if line.strip(): msg = f"Extra content [{line}] between HED line and other sections" - raise HedFileError(HedExceptions.SCHEMA_HEADER_INVALID, msg, filename=self.filename) + raise HedFileError(HedExceptions.SCHEMA_HEADER_INVALID, msg, filename=self.filename) def _read_text_block(self, lines): text = "" @@ -325,13 +324,6 @@ def _get_header_attributes_internal_old(self, version_line): return final_attributes - def _add_to_dict(self, line_number, line, entry, key_class): - if entry.has_attribute(HedKey.InLibrary) and not self._loading_merged: - self._add_fatal_error(line_number, line, - f"Library tag in unmerged schema has InLibrary attribute", - HedExceptions.IN_LIBRARY_IN_UNMERGED) - return self._schema._add_tag_to_dict(entry.name, entry, key_class) - @staticmethod def _get_tag_level(tag_line): """ Get the tag level from a line in a wiki file. @@ -598,3 +590,11 @@ def _split_lines_into_sections(self, wiki_lines): strings_for_section[current_section].append((line_number + 1, line)) return strings_for_section + + def _add_to_dict(self, line_number, line, entry, key_class): + if entry.has_attribute(HedKey.InLibrary) and not self._loading_merged and not self.appending_to_schema: + self._add_fatal_error(line_number, line, + f"Library tag in unmerged schema has InLibrary attribute", + HedExceptions.IN_LIBRARY_IN_UNMERGED) + + return self._add_to_dict_base(entry, key_class) diff --git a/hed/schema/schema_io/xml2schema.py b/hed/schema/schema_io/xml2schema.py index c300439e..91ba402e 100644 --- a/hed/schema/schema_io/xml2schema.py +++ b/hed/schema/schema_io/xml2schema.py @@ -21,8 +21,8 @@ class SchemaLoaderXML(SchemaLoader): SchemaLoaderXML(filename) will load just the header_attributes """ - def __init__(self, filename, schema_as_string=None): - super().__init__(filename, schema_as_string) + def __init__(self, filename, schema_as_string=None, schema=None): + super().__init__(filename, schema_as_string, schema) self._root_element = None self._parent_map = {} @@ -166,6 +166,8 @@ class default units. for unit_class_element in unit_class_elements: unit_class_entry = self._parse_node(unit_class_element, HedSectionKey.UnitClasses) unit_class_entry = self._add_to_dict(unit_class_entry, HedSectionKey.UnitClasses) + if unit_class_entry is None: + continue element_units = self._get_elements_by_name(xml_constants.UNIT_CLASS_UNIT_ELEMENT, unit_class_element) element_unit_names = [self._get_element_tag_value(element) for element in element_units] @@ -256,8 +258,9 @@ def _get_elements_by_name(self, element_name='node', parent_element=None): return elements def _add_to_dict(self, entry, key_class): - if entry.has_attribute(HedKey.InLibrary) and not self._loading_merged: + if entry.has_attribute(HedKey.InLibrary) and not self._loading_merged and not self.appending_to_schema: raise HedFileError(HedExceptions.IN_LIBRARY_IN_UNMERGED, f"Library tag in unmerged schema has InLibrary attribute", self._schema.filename) - return self._schema._add_tag_to_dict(entry.name, entry, key_class) + + return self._add_to_dict_base(entry, key_class) diff --git a/tests/schema/test_hed_cache.py b/tests/schema/test_hed_cache.py index 3a33155b..377eb41f 100644 --- a/tests/schema/test_hed_cache.py +++ b/tests/schema/test_hed_cache.py @@ -146,7 +146,7 @@ def test_schema_load_schema_version_invalid(self): with self.assertRaises(HedFileError) as context3: load_schema_version(["", None]) - self.assertEqual(context3.exception.args[0], 'schemaDuplicatePrefix') + self.assertEqual(context3.exception.args[0], 'SCHEMA_LIBRARY_INVALID') with self.assertRaises(HedFileError) as context4: load_schema_version(["8.0.0", "score_1.0.0"]) diff --git a/tests/schema/test_hed_schema_io.py b/tests/schema/test_hed_schema_io.py index a96d61f6..87e84f64 100644 --- a/tests/schema/test_hed_schema_io.py +++ b/tests/schema/test_hed_schema_io.py @@ -3,9 +3,15 @@ from hed.errors import HedFileError from hed.errors.error_types import SchemaErrors from hed.schema import load_schema, HedSchemaGroup, load_schema_version, HedSchema +from hed.schema.hed_schema_io import parse_version_list, _load_schema_version + + import os from hed.errors import HedExceptions from hed.schema import HedKey +from hed.schema import hed_cache +from hed import schema +import shutil # todo: speed up these tests @@ -73,6 +79,56 @@ def test_load_schema_version(self): self.assertTrue(schemas3.version_number, "load_schema_version has the right version with namespace") self.assertEqual(schemas3._namespace, "base:", "load_schema_version has the right version with namespace") + def test_load_schema_version_merged(self): + ver4 = ["testlib_2.0.0", "score_1.1.0"] + schemas3 = load_schema_version(ver4) + issues = schemas3.check_compliance() + self.assertIsInstance(schemas3, HedSchema, "load_schema_version returns HedSchema version+namespace") + self.assertTrue(schemas3.version_number, "load_schema_version has the right version with namespace") + self.assertEqual(schemas3._namespace, "", "load_schema_version has the right version with namespace") + # Deprecated tag warnings + self.assertEqual(len(issues), 11) + + # Verify this cannot be saved + with self.assertRaises(HedFileError): + schemas3.save_as_mediawiki() + + def test_load_and_verify_tags(self): + # Load 'testlib' by itself + testlib = load_schema_version('testlib_2.0.0') + + # Load 'score' by itself + score = load_schema_version('score_1.1.0') + + # Load both 'testlib' and 'score' together + schemas3 = load_schema_version(["testlib_2.0.0", "score_1.1.0"]) + + # Extract the tag names from each library + testlib_tags = set(testlib.tags.all_names.keys()) + score_tags = set(score.tags.all_names.keys()) + merged_tags = set(schemas3.tags.all_names.keys()) + + # Verify that all tags in 'testlib' and 'score' are in the merged library + for tag in testlib_tags: + self.assertIn(tag, merged_tags, f"Tag {tag} from testlib is missing in the merged schema.") + + for tag in score_tags: + self.assertIn(tag, merged_tags, f"Tag {tag} from score is missing in the merged schema.") + + # Negative test cases + # Ensure merged_tags is not a subset of testlib_tags or score_tags + self.assertFalse(merged_tags.issubset(testlib_tags), "The merged tags should not be a subset of testlib tags.") + self.assertFalse(merged_tags.issubset(score_tags), "The merged tags should not be a subset of score tags.") + + # Ensure there are tags that came uniquely from each library + unique_testlib_tags = testlib_tags - score_tags + unique_score_tags = score_tags - testlib_tags + + self.assertTrue(any(tag in merged_tags for tag in unique_testlib_tags), + "There should be unique tags from testlib in the merged schema.") + self.assertTrue(any(tag in merged_tags for tag in unique_score_tags), + "There should be unique tags from score in the merged schema.") + def test_load_schema_version_libraries(self): ver1 = "score_1.0.0" schemas1 = load_schema_version(ver1) @@ -130,23 +186,96 @@ def test_load_schema_version_libraries(self): with self.assertRaises(HedFileError) as context: load_schema_version("sc1:") - # def test_load_schema_version_empty(self): - # schemas = load_schema_version("") - # self.assertIsInstance(schemas, HedSchema, "load_schema_version for empty string returns latest version") - # self.assertTrue(schemas.version_number, "load_schema_version for empty string has a version") - # self.assertFalse(schemas.library, "load_schema_version for empty string is not a library") - # schemas = load_schema_version(None) - # self.assertIsInstance(schemas, HedSchema, "load_schema_version for None returns latest version") - # self.assertTrue(schemas.version_number, "load_schema_version for empty string has a version") - # self.assertFalse(schemas.library, "load_schema_version for empty string is not a library") - # schemas = load_schema_version([""]) - # self.assertIsInstance(schemas, HedSchema, "load_schema_version list with blank entry returns latest version") - # self.assertTrue(schemas.version_number, "load_schema_version for empty string has a version") - # self.assertFalse(schemas.library, "load_schema_version for empty string is not a library") - # schemas = load_schema_version([]) - # self.assertIsInstance(schemas, HedSchema, "load_schema_version list with blank entry returns latest version") - # self.assertTrue(schemas.version_number, "load_schema_version for empty string has a version") - # self.assertFalse(schemas.library, "load_schema_version for empty string is not a library") + + +class TestHedSchemaUnmerged(unittest.TestCase): + # Verify the hed cache can handle loading unmerged with_standard schemas in case they are ever used + @classmethod + def setUpClass(cls): + hed_cache_dir = os.path.join(os.path.dirname(os.path.realpath(__file__)), '../schema_cache_test_local_unmerged/') + if os.path.exists(hed_cache_dir) and os.path.isdir(hed_cache_dir): + shutil.rmtree(hed_cache_dir) + _load_schema_version.cache_clear() + cls.hed_cache_dir = hed_cache_dir + cls.saved_cache_folder = hed_cache.HED_CACHE_DIRECTORY + schema.set_cache_directory(cls.hed_cache_dir) + + for filename in os.listdir(hed_cache.INSTALLED_CACHE_LOCATION): + loaded_schema = schema.load_schema(os.path.join(hed_cache.INSTALLED_CACHE_LOCATION, filename)) + loaded_schema.save_as_xml(os.path.join(cls.hed_cache_dir, filename), save_merged=False) + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.hed_cache_dir) + schema.set_cache_directory(cls.saved_cache_folder) + _load_schema_version.cache_clear() + + def test_load_schema_version(self): + ver1 = "8.0.0" + schemas1 = load_schema_version(ver1) + self.assertIsInstance(schemas1, HedSchema, "load_schema_version returns a HedSchema if a string version") + self.assertEqual(schemas1.version_number, "8.0.0", "load_schema_version has the right version") + self.assertEqual(schemas1.library, "", "load_schema_version standard schema has no library") + ver2 = "base:8.0.0" + schemas2 = load_schema_version(ver2) + self.assertIsInstance(schemas2, HedSchema, "load_schema_version returns HedSchema version+namespace") + self.assertEqual(schemas2.version_number, "8.0.0", "load_schema_version has the right version with namespace") + self.assertEqual(schemas2._namespace, "base:", "load_schema_version has the right version with namespace") + ver3 = ["base:8.0.0"] + schemas3 = load_schema_version(ver3) + self.assertIsInstance(schemas3, HedSchema, "load_schema_version returns HedSchema version+namespace") + self.assertEqual(schemas3.version_number, "8.0.0", "load_schema_version has the right version with namespace") + self.assertEqual(schemas3._namespace, "base:", "load_schema_version has the right version with namespace") + ver3 = ["base:"] + schemas3 = load_schema_version(ver3) + self.assertIsInstance(schemas3, HedSchema, "load_schema_version returns HedSchema version+namespace") + self.assertTrue(schemas3.version_number, "load_schema_version has the right version with namespace") + self.assertEqual(schemas3._namespace, "base:", "load_schema_version has the right version with namespace") + + def test_load_schema_version_merged(self): + ver4 = ["testlib_2.0.0", "score_1.1.0"] + schemas3 = load_schema_version(ver4) + issues = schemas3.check_compliance() + self.assertIsInstance(schemas3, HedSchema, "load_schema_version returns HedSchema version+namespace") + self.assertTrue(schemas3.version_number, "load_schema_version has the right version with namespace") + self.assertEqual(schemas3._namespace, "", "load_schema_version has the right version with namespace") + self.assertEqual(len(issues), 11) + + def test_load_and_verify_tags(self): + # Load 'testlib' by itself + testlib = load_schema_version('testlib_2.0.0') + + # Load 'score' by itself + score = load_schema_version('score_1.1.0') + + # Load both 'testlib' and 'score' together + schemas3 = load_schema_version(["testlib_2.0.0", "score_1.1.0"]) + + # Extract the tag names from each library + testlib_tags = set(testlib.tags.all_names.keys()) + score_tags = set(score.tags.all_names.keys()) + merged_tags = set(schemas3.tags.all_names.keys()) + + # Verify that all tags in 'testlib' and 'score' are in the merged library + for tag in testlib_tags: + self.assertIn(tag, merged_tags, f"Tag {tag} from testlib is missing in the merged schema.") + + for tag in score_tags: + self.assertIn(tag, merged_tags, f"Tag {tag} from score is missing in the merged schema.") + + # Negative test cases + # Ensure merged_tags is not a subset of testlib_tags or score_tags + self.assertFalse(merged_tags.issubset(testlib_tags), "The merged tags should not be a subset of testlib tags.") + self.assertFalse(merged_tags.issubset(score_tags), "The merged tags should not be a subset of score tags.") + + # Ensure there are tags that came uniquely from each library + unique_testlib_tags = testlib_tags - score_tags + unique_score_tags = score_tags - testlib_tags + + self.assertTrue(any(tag in merged_tags for tag in unique_testlib_tags), + "There should be unique tags from testlib in the merged schema.") + self.assertTrue(any(tag in merged_tags for tag in unique_score_tags), + "There should be unique tags from score in the merged schema.") class TestHedSchemaMerging(unittest.TestCase): @@ -392,3 +521,32 @@ def test_saving_in_library_xml(self): score_count = schema_string.count("inLibrary") # One extra because this also finds the attribute definition, whereas in wiki it's a different format. self.assertEqual(score_count, 854, "There should be 854 in library entries in the saved score schema") + + +class TestParseVersionList(unittest.TestCase): + def test_empty_and_single_library(self): + """Test that an empty list returns an empty dictionary and a single library is handled correctly.""" + self.assertEqual(parse_version_list([]), {}) + self.assertEqual(parse_version_list(["score"]), {"": "score"}) + + def test_multiple_libraries_without_and_with_prefix(self): + """Test that multiple libraries without a prefix and with the same prefix are handled correctly.""" + self.assertEqual(parse_version_list(["score", "testlib"]), {"": "score,testlib"}) + self.assertEqual(parse_version_list(["test:score", "test:testlib"]), {"test": "test:score,testlib"}) + + def test_single_and_multiple_libraries_with_different_prefixes(self): + """Test that a single library with a prefix and multiple libraries with different prefixes are handled correctly.""" + self.assertEqual(parse_version_list(["ol:otherlib"]), {"ol": "ol:otherlib"}) + self.assertEqual(parse_version_list(["score", "ol:otherlib", "ul:anotherlib"]), {"": "score", "ol": "ol:otherlib", "ul": "ul:anotherlib"}) + + def test_duplicate_library_raises_error(self): + """Test that duplicate libraries raise the correct error.""" + with self.assertRaises(HedFileError): + parse_version_list(["score", "score"]) + with self.assertRaises(HedFileError): + parse_version_list(["ol:otherlib", "ol:otherlib"]) + + def test_triple_prefixes(self): + """Test that libraries with triple prefixes are handled correctly.""" + self.assertEqual(parse_version_list(["test:score", "ol:otherlib", "test:testlib", "abc:anotherlib"]), + {"test": "test:score,testlib", "ol": "ol:otherlib", "abc": "abc:anotherlib"})