diff --git a/docs/howto/conversion-data.md b/docs/howto/conversion-data.md index b3ed308..69adc2b 100644 --- a/docs/howto/conversion-data.md +++ b/docs/howto/conversion-data.md @@ -28,3 +28,45 @@ The equivalent function to the CLI described above can be used as ``` fhirflat.convert_data_to_flat("data_file_path", "sheet_id", "%Y-%m-%d", "Brazil/East") ``` + +## Conversion without validation + +If you wish to convert your data into FHIRflat, but not perform validation to check the +converted data conforms to the FHIR spec, you can add the `--no-validate` flag: + +```bash +fhirflat transform data-file google-sheet-id date-format timezone-name --no-validate +``` + +The equivalent library function is +```python +fhirflat.convert_data_to_flat(, , , , validate=False) +``` + +We strongly recommend you don't do this unless necessary for time constraints; some +errors in conversion can cause the parquet file to fail to save (e.g. if columns contain +mixed types due to errors which would be caught during validation). + +Data which is already in a FHIRflat format can be validated against the schema using + +```bash +fhirflat validate +``` + +where `folder_name` is the path to the folder containing your flat files. The files **must** +be named according to the corresponding FHIR resource, e.g. the folder containing flat +Encounter data must be named `encounter.parquet`. + +The folder can be provided in a compressed format, e.g. zipped; you can specifiy this +using +```bash +fhirflat validate -c "zip" +``` + +The output folder of validated data will be compressed using the same format. + +The equivalent library function is + +```python +fhirflat.validate(, compress_format="zip") +``` diff --git a/fhirflat/__main__.py b/fhirflat/__main__.py index 7832ca3..7dcec77 100644 --- a/fhirflat/__main__.py +++ b/fhirflat/__main__.py @@ -1,6 +1,7 @@ import sys from .ingest import main as ingest_to_flat +from .ingest import validate_cli as validate def main(): @@ -10,16 +11,19 @@ def main(): Available subcommands: transform - Convert raw data into FHIRflat files + validate - Validate FHIRflat files against FHIR schemas """ ) sys.exit(1) subcommand = sys.argv[1] - if subcommand not in ["transform"]: + if subcommand not in ["transform", "validate"]: print("fhirflat: unrecognised subcommand", subcommand) sys.exit(1) sys.argv = sys.argv[1:] if subcommand == "transform": ingest_to_flat() + elif subcommand == "validate": + validate() else: pass diff --git a/fhirflat/fhir2flat.py b/fhirflat/fhir2flat.py index 8b83d89..b8eb5f1 100644 --- a/fhirflat/fhir2flat.py +++ b/fhirflat/fhir2flat.py @@ -115,19 +115,19 @@ def single_or_list(x): return df.groupby(df.index).agg(single_or_list) -def expandCoding(df: pd.DataFrame, column_name: str) -> pd.DataFrame: +def condenseCoding(df: pd.DataFrame, column_name: str) -> pd.DataFrame: """ Turns a column containing a list of dictionaries with coding information into 2 columns containing a list of strings with the coding information, and the text. [ {"system": "http://loinc.org", "code": "1234", "display": "Test"} ] becomes - [ "http://loinc.org/1234" ], ["Test"] + [ "http://loinc.org|1234" ], ["Test"] If a "text" field has already been provided, this overrides the display. """ - def expand( + def condense( row: pd.Series, column_name: str, text_present: bool = False ) -> pd.Series: codes = row[column_name] @@ -148,7 +148,7 @@ def expand( if column_name.removesuffix(".coding") + ".text" in df.columns: text_present = True - df = df.apply(lambda x: expand(x, column_name, text_present), axis=1) + df = df.apply(lambda x: condense(x, column_name, text_present), axis=1) if not text_present: df.insert( @@ -291,7 +291,7 @@ def fhir2flat(resource: FHIRFlatBase, lists: list | None = None) -> pd.DataFrame # expand all instances of the "coding" list for coding in df.columns[df.columns.str.endswith("coding")]: - df = expandCoding(df, coding) + df = condenseCoding(df, coding) # condense all references for reference in df.columns[df.columns.str.endswith("reference")]: diff --git a/fhirflat/flat2fhir.py b/fhirflat/flat2fhir.py index 1eb85e0..20ec377 100644 --- a/fhirflat/flat2fhir.py +++ b/fhirflat/flat2fhir.py @@ -6,10 +6,10 @@ from fhir.resources.fhirprimitiveextension import FHIRPrimitiveExtension from fhir.resources.period import Period from fhir.resources.quantity import Quantity -from pydantic.v1 import BaseModel from pydantic.v1.error_wrappers import ValidationError from .util import ( + find_data_class, get_fhirtype, get_local_extension_type, group_keys, @@ -21,7 +21,7 @@ def create_codeable_concept( ) -> dict[str, list[str]]: """Re-creates a codeableConcept structure from the FHIRflat representation.""" - # for reading in from ingestion pipeline + # for creating backbone elements if name + ".code" in old_dict and name + ".system" in old_dict: raw_codes: str | float | list[str | None] = old_dict.get(name + ".code") if raw_codes is not None and not isinstance(raw_codes, list): @@ -29,7 +29,7 @@ def create_codeable_concept( raw_codes if isinstance(raw_codes, str) else str(int(raw_codes)) ) codes = [old_dict[name + ".system"] + "|" + formatted_code] - elif raw_codes is None: + elif not raw_codes: codes = raw_codes else: formatted_codes = [ @@ -174,48 +174,12 @@ def set_datatypes(k, v_dict, klass) -> dict: return {s.split(".", 1)[1]: v_dict[s] for s in v_dict} -def find_data_class(data_class: list[BaseModel] | BaseModel, k: str) -> BaseModel: - """ - Finds the type class for item k within the data class. - - Parameters - ---------- - data_class: list[BaseModel] or BaseModel - The data class to search within. If a list, the function will search for the - a class with a matching title to k. - k: str - The property to search for within the data class - """ - - if isinstance(data_class, list): - title_matches = [k.lower() == c.schema()["title"].lower() for c in data_class] - result = [x for x, y in zip(data_class, title_matches, strict=True) if y] - if len(result) == 1: - return get_fhirtype(k) - else: - raise ValueError(f"Couldn't find a matching class for {k} in {data_class}") - - else: - k_schema = data_class.schema()["properties"].get(k) - - base_class = ( - k_schema.get("items").get("type") - if k_schema.get("items") is not None - else k_schema.get("type") - ) - - if base_class is None: - assert k_schema.get("type") == "array" - - base_class = [opt.get("type") for opt in k_schema["items"]["anyOf"]] - return get_fhirtype(base_class) - - def expand_concepts(data: dict[str, str], data_class: type[_DomainResource]) -> dict: """ Combines columns containing flattened FHIR concepts back into JSON-like structures. """ + groups = group_keys(data.keys()) group_classes = {} diff --git a/fhirflat/ingest.py b/fhirflat/ingest.py index 8bd8801..7ff35ff 100644 --- a/fhirflat/ingest.py +++ b/fhirflat/ingest.py @@ -19,6 +19,7 @@ import dateutil.parser import numpy as np import pandas as pd +from pyarrow.lib import ArrowTypeError import fhirflat from fhirflat.util import get_local_resource, group_keys @@ -437,6 +438,7 @@ def convert_data_to_flat( mapping_files_types: tuple[dict, dict] | None = None, sheet_id: str | None = None, subject_id="subjid", + validate: bool = True, compress_format: None | str = None, ): """ @@ -465,6 +467,8 @@ def convert_data_to_flat( be named by resource, and contain the mapping for that resource. subject_id: str The name of the column containing the subject ID in the data file. + validate: bool + Whether to validate the FHIRflat files after creation. compress_format: optional str If the output folder should be zipped, and if so with what format. """ @@ -472,6 +476,13 @@ def convert_data_to_flat( if not mapping_files_types and not sheet_id: raise TypeError("Either mapping_files_types or sheet_id must be provided") + if not validate: + warnings.warn( + "Validation of the FHIRflat files has been disabled. ", + UserWarning, + stacklevel=2, + ) + if not os.path.exists(folder_name): os.makedirs(folder_name) @@ -522,10 +533,29 @@ def convert_data_to_flat( else: raise ValueError(f"Unknown mapping type {t}") - errors = resource.ingest_to_flat( - df, - os.path.join(folder_name, resource.__name__.lower()), - ) + flat_nonvalidated = resource.ingest_to_flat(df) + + if validate: + valid_flat, errors = resource.validate_fhirflat(flat_nonvalidated) + + valid_flat.to_parquet( + f"{os.path.join(folder_name, resource.__name__.lower())}.parquet" + ) + else: + errors = None + try: + flat_nonvalidated.to_parquet( + f"{os.path.join(folder_name, resource.__name__.lower())}.parquet" + ) + except ArrowTypeError as e: + warnings.warn( + f"Error writing {resource.__name__.lower()}.parquet: {e}\n" + "This is likely due to a validation error, re-run without " + "--no-validate.", + UserWarning, + stacklevel=2, + ) + continue end_time = timeit.default_timer() total_time = end_time - start_time @@ -550,6 +580,60 @@ def convert_data_to_flat( shutil.rmtree(folder_name) +def validate(folder_name: str, compress_format: str | None = None): + """ + Takes a folder containing (optionally compressed) FHIRflat files and validates them + against the FHIR. File names **must** correspond to the FHIR resource types they + represent. E.g. a file containing Patient resources must be named "patient.parquet". + + Parameters + ---------- + folder_name + The path to the folder containing the FHIRflat files, or compressed file. + compress_format + The format to compress the validated files into. + """ + + if Path(folder_name).is_file(): + directory = Path(folder_name).with_suffix("") + shutil.unpack_archive(folder_name, extract_dir=directory) + else: + directory = folder_name + + for file in Path(directory).glob("*.parquet"): + df = pd.read_parquet(file) + resource = file.stem + resource_type = get_local_resource(resource, case_insensitive=True) + + valid_flat, errors = resource_type.validate_fhirflat(df, return_frames=True) + + if errors is not None: + + valid_flat.to_parquet(os.path.join(directory, f"{resource}_valid.parquet")) + errors.to_csv( + os.path.join(directory, f"{resource}_errors.csv"), index=False + ) + error_length = len(errors) + print( + f"{error_length} rows in {file.name} have validation errors. " + f"Errors saved to {resource}_errors.csv. " + f"Valid rows saved to {resource}_valid.parquet" + ) + else: + print(f"{file.name} is valid") + print("Validation complete") + + if compress_format: + new_directory = str(directory) + "_validated" + shutil.make_archive( + new_directory, + format=compress_format, + root_dir=directory, + ) + shutil.rmtree(directory) + print(f"Validated files saved as {new_directory}.{compress_format}") + + def main(): parser = argparse.ArgumentParser( description="Convert data to FHIRflat parquet files", @@ -579,6 +663,13 @@ def main(): default="subjid", ) + parser.add_argument( + "--no-validate", + help="Do the data conversion without validation", + dest="validate", + action="store_false", + ) + parser.add_argument( "-c", "--compress", @@ -595,9 +686,32 @@ def main(): folder_name=args.output, sheet_id=args.sheet_id, subject_id=args.subject_id, + validate=args.validate, compress_format=args.compress, ) +def validate_cli(): + parser = argparse.ArgumentParser( + description="Validate FHIRflat parquet files against the FHIR schema", + prog="fhirflat validate", + ) + parser.add_argument("folder", help="File path to folder containing FHIRflat files") + + parser.add_argument( + "-c", + "--compress_format", + help="Format to compress the output into", + choices=["zip", "tar", "gztar", "bztar", "xztar"], + ) + + args = parser.parse_args() + + validate( + args.folder, + compress_format=args.compress_format, + ) + + if __name__ == "__main__": main() diff --git a/fhirflat/resources/base.py b/fhirflat/resources/base.py index b1d3f83..7477317 100644 --- a/fhirflat/resources/base.py +++ b/fhirflat/resources/base.py @@ -1,16 +1,14 @@ -# from pydantic import BaseModel from __future__ import annotations -import datetime import warnings from typing import ClassVar, TypeAlias -import numpy as np import orjson import pandas as pd from fhir.resources.domainresource import DomainResource as _DomainResource from pydantic.v1 import ValidationError +from fhirflat import util from fhirflat.fhir2flat import fhir2flat from fhirflat.flat2fhir import expand_concepts @@ -72,6 +70,8 @@ def create_fhir_resource( if not isinstance(data, dict): data: dict = orjson.loads(data) + data = {k: v for k, v in data.items() if v is not None} + data = cls.cleanup(data) data = expand_concepts(data, cls) @@ -86,6 +86,67 @@ def create_fhir_resource( except ValidationError as e: return e + @classmethod + def validate_fhirflat( + cls, df: pd.DataFrame, return_frames: bool = False + ) -> tuple[FHIRFlatBase | pd.DataFrame, None | pd.DataFrame]: + """ + Takes a FHIRflat dataframe and validates the data against the FHIR + schema. Returns a dataframe of valid resources and a dataframe of the + FHIRflat data that produced validation errors, with a `validation_error` + column describing the error. + + Parameters + ---------- + df + Pandas dataframe containing the FHIRflat data + return_frames + If True, returns the valid FHIR resources & errors as dataframes, + even if only one row is present in the source. + + Returns + ------- + valid_resources + A dataframe containing the valid FHIR resources + errors + A dataframe containing the flat_dict and validation errors. + + Raises + ------ + ValidationError + If a single FHIR resource is present and is invalid. + """ + + flat_df = df.copy() + + flat_df["fhir"] = flat_df.apply( + lambda row: row.to_json(date_format="iso", date_unit="s"), axis=1 + ).apply(lambda x: cls.create_fhir_resource(x)) + + if len(flat_df) == 1 and return_frames is False: + resource = flat_df["fhir"].iloc[0] + if isinstance(resource, ValidationError): + raise resource + else: + return resource, None + else: + resources = list(flat_df["fhir"]) + errors = None + if any(isinstance(r, ValidationError) for r in resources): + validation_error_mask = flat_df["fhir"].apply( + lambda x: isinstance(x, ValidationError) + ) + + errors = flat_df[validation_error_mask].copy() + errors.rename(columns={"fhir": "validation_error"}, inplace=True) + + valid_fhir = flat_df[~validation_error_mask] + valid_fhirflat = valid_fhir.drop(columns=["fhir"]) + else: + valid_fhirflat = flat_df.drop(columns=["fhir"]) + + return valid_fhirflat, errors + @classmethod def from_flat(cls, file: str) -> FHIRFlatBase | list[FHIRFlatBase]: """ @@ -93,7 +154,7 @@ def from_flat(cls, file: str) -> FHIRFlatBase | list[FHIRFlatBase]: Parameters ---------- - file: str + file Path to the parquet FHIRflat file containing clinical data Returns @@ -145,13 +206,9 @@ def ingest_backbone_elements(cls, mapped_data: pd.Series) -> pd.Series: Parameters ---------- - mapped_data: pd.Series + mapped_data Pandas series of FHIRflat-like dictionaries ready to be converted to FHIR format. - - Returns - ------- - pd.Series """ def fhir_format(row: pd.Series) -> pd.Series: @@ -186,73 +243,57 @@ def fhir_format(row: pd.Series) -> pd.Series: return condensed_mapped_data @classmethod - def ingest_to_flat(cls, data: pd.DataFrame, filename: str) -> pd.DataFrame | None: + def ingest_to_flat(cls, data: pd.DataFrame) -> pd.DataFrame | None: """ - Takes a pandas dataframe and populates the resource with the data. - Creates a FHIRflat parquet file for the resources. + Takes a pandas dataframe containing the populated mapping file and a dictionary + representing the FHIRflat resource and creates the FHIRflat parquet file. + Performs data formatting on the date and coding columns to account for + simplifications parquet makes when saving. Parameters ---------- - data: pd.DataFrame - Pandas dataframe containing the data - filename: str - Name of the parquet file to be generated. + data + Pandas dataframe containing the raw data Returns ------- - pd.DataFrame or None - A dataframe containing the flat_dict and validation errors. + A dataframe containing the FHIRflat data. """ data.loc[:, "flat_dict"] = cls.ingest_backbone_elements(data["flat_dict"]) - # Creates a columns of FHIR resource instances - data["fhir"] = data["flat_dict"].apply(lambda x: cls.create_fhir_resource(x)) - - validation_error_mask = data["fhir"].apply( - lambda x: isinstance(x, ValidationError) - ) - - valid_fhir = data[~validation_error_mask].copy() - - # flattens resources back out - flat_df = valid_fhir["fhir"].apply(lambda x: x.to_flat()) + flat_df = pd.json_normalize(data["flat_dict"]) if not flat_df.empty: - # create FHIR expected date format - for date_cols in [ - x - for x in flat_df.columns - if ("date" in x.lower() or "period" in x.lower() or "time" in x.lower()) - ]: - # replace nan with None - flat_df[date_cols] = flat_df[date_cols].replace(np.nan, None) - - # convert datetime objects to ISO strings - # (stops unwanted parquet conversions) - # but skips over extensions that have floats/strings rather than dates - flat_df[date_cols] = flat_df[date_cols].apply( - lambda x: ( - x.isoformat() - if isinstance(x, datetime.datetime) - or isinstance(x, datetime.date) - else x - ) + # apply the coding column formatting in here + system_columns = flat_df.columns[flat_df.columns.str.endswith(".system")] + for coding_col in system_columns: + col = coding_col.removesuffix(".system") + flat_df = flat_df.apply( + lambda x, c=col: util.condense_codes(x, c), axis=1 ) + flat_df.drop(columns=system_columns, inplace=True) + + potential_dense_cols = [ + x for x in cls.backbone_elements.keys() if x in flat_df.columns + ] - for coding_column in [ + long_list_cols = [ x - for x in flat_df.columns - if x.lower().endswith(".code") or x.lower().endswith(".text") - ]: - flat_df[coding_column] = flat_df[coding_column].apply( - lambda x: [x] if isinstance(x, str) else x + for x in potential_dense_cols + if any(flat_df[x].apply(lambda y: isinstance(y, list) and len(y) > 1)) + ] + + if long_list_cols: + flat_df.rename( + columns={x: x + "_dense" for x in long_list_cols}, inplace=True ) - flat_df.to_parquet(f"{filename}.parquet") - data_errors = data[validation_error_mask].copy() - data_errors.rename(columns={"fhir": "validation_error"}, inplace=True) - return data_errors if not data_errors.empty else None + # format dates and columns + flat_df = util.format_flat(flat_df, cls) + + return flat_df + return None @classmethod def fhir_bulk_import(cls, file: str) -> FHIRFlatBase | list[FHIRFlatBase]: @@ -262,12 +303,8 @@ def fhir_bulk_import(cls, file: str) -> FHIRFlatBase | list[FHIRFlatBase]: Parameters ---------- - file: str + file Path to the .ndjson file containing FHIR data - - Returns - ------- - FHIRFlatBase or list[FHIRFlatBase] """ resources = [] @@ -288,9 +325,9 @@ def fhir_file_to_flat(cls, source_file: str, output_name: str | None = None): Parameters ---------- - source_file: str + source_file Path to the FHIR resource file. - output_name: str (optional) + output_name Name of the parquet file to be generated, optional, defaults to {resource}.parquet """ @@ -323,7 +360,7 @@ def to_flat(self, filename: str | None = None) -> None | pd.Series: Parameters ---------- - filename: str + filename Name of the parquet file to be generated. """ diff --git a/fhirflat/util.py b/fhirflat/util.py index 32a5423..b475236 100644 --- a/fhirflat/util.py +++ b/fhirflat/util.py @@ -1,14 +1,23 @@ # Utility functions for FHIRflat +from __future__ import annotations + +import datetime import importlib import re from collections.abc import KeysView from itertools import groupby +from typing import TYPE_CHECKING import fhir.resources +import numpy as np +import pandas as pd import fhirflat from fhirflat.resources import extensions +if TYPE_CHECKING: + from .resources.base import FHIRFlatBase + def group_keys(data_keys: list[str] | KeysView) -> dict[str, list[str]]: """ @@ -68,5 +77,131 @@ def get_local_extension_type(t: str): raise AttributeError(f"Could not find {t} in fhirflat extensions") from ae -def get_local_resource(t: str): - return getattr(fhirflat, t) +def get_local_resource(t: str, case_insensitive: bool = False): + if case_insensitive is False: + return getattr(fhirflat, t) + else: + for a in dir(fhirflat): + if a.lower() == t.lower(): + return getattr(fhirflat, a) + + +def find_data_class( + data_class: FHIRFlatBase | list[FHIRFlatBase], k: str +) -> FHIRFlatBase: + """ + Finds the type class for item k within the data class. + + Parameters + ---------- + data_class: list[BaseModel] or BaseModel + The data class to search within. If a list, the function will search for the + a class with a matching title to k. + k: str + The property to search for within the data class + """ + + if isinstance(data_class, list): + title_matches = [k.lower() == c.schema()["title"].lower() for c in data_class] + result = [x for x, y in zip(data_class, title_matches, strict=True) if y] + if len(result) == 1: + return get_fhirtype(k) + else: + raise ValueError(f"Couldn't find a matching class for {k} in {data_class}") + + else: + k_schema = data_class.schema()["properties"].get(k) + + base_class = ( + k_schema.get("items").get("type") + if k_schema.get("items") is not None + else k_schema.get("type") + ) + + if base_class is None: + assert k_schema.get("type") == "array" + + base_class = [opt.get("type") for opt in k_schema["items"]["anyOf"]] + return get_fhirtype(base_class) + + +def code_or_codeable_concept( + col_name: str, resource: FHIRFlatBase | list[FHIRFlatBase] +) -> bool: + search_terms = col_name.split(".") + fhir_type = find_data_class(resource, search_terms[0]) + + if len(search_terms) == 2: # e.g. "code.code", "age.code" + schema = fhir_type.schema()["properties"] + codeable_concepts = [ + key + for key in schema.keys() + if "codeableconcept" in key.lower() or "coding" in key.lower() + ] + if codeable_concepts: + return True + else: + return False + else: + return code_or_codeable_concept(".".join(search_terms[1:]), fhir_type) + + +def format_flat(flat_df: pd.DataFrame, resource: FHIRFlatBase) -> pd.DataFrame: + """ + Performs formatting on dates/lists in FHIRflat resources. + """ + + for date_cols in [ + x + for x in flat_df.columns + if ("date" in x.lower() or "period" in x.lower() or "time" in x.lower()) + ]: + # replace nan with None + flat_df[date_cols] = flat_df[date_cols].replace(np.nan, None) + + # convert datetime objects to ISO strings + # (stops unwanted parquet conversions) + # but skips over extensions that have floats/strings rather than dates + flat_df[date_cols] = flat_df[date_cols].apply( + lambda x: ( + x.isoformat() + if isinstance(x, datetime.datetime) or isinstance(x, datetime.date) + else x + ) + ) + + for coding_column in [ + x + for x in flat_df.columns + if ( + (x.lower().endswith(".code") or x.lower().endswith(".text")) + and code_or_codeable_concept(x, resource) + ) + ]: + flat_df[coding_column] = flat_df[coding_column].apply( + lambda x: [x] if isinstance(x, str) else x + ) + + return flat_df + + +def condense_codes(row: pd.Series, code_col: str) -> pd.Series: + raw_codes = row[(code_col + ".code")] + if isinstance(raw_codes, (str, int, float)) and raw_codes == raw_codes: + formatted_code = ( + raw_codes if isinstance(raw_codes, str) else str(int(raw_codes)) + ) + codes = row[code_col + ".system"] + "|" + formatted_code + elif isinstance(raw_codes, list): + formatted_codes = [ + c if (isinstance(c, str) or c is None) else str(int(c)) for c in raw_codes + ] + codes = [ + s + "|" + c + for s, c in zip(row[code_col + ".system"], formatted_codes, strict=True) + ] + else: + codes = None + + row[code_col + ".code"] = codes + return row diff --git a/tests/data/invalid_flat_bundle/condition.parquet b/tests/data/invalid_flat_bundle/condition.parquet new file mode 100644 index 0000000..f92c847 Binary files /dev/null and b/tests/data/invalid_flat_bundle/condition.parquet differ diff --git a/tests/data/invalid_flat_bundle/encounter.parquet b/tests/data/invalid_flat_bundle/encounter.parquet new file mode 100644 index 0000000..c743369 Binary files /dev/null and b/tests/data/invalid_flat_bundle/encounter.parquet differ diff --git a/tests/data/valid_flat_bundle/condition.parquet b/tests/data/valid_flat_bundle/condition.parquet new file mode 100644 index 0000000..f213484 Binary files /dev/null and b/tests/data/valid_flat_bundle/condition.parquet differ diff --git a/tests/data/valid_flat_bundle/encounter.parquet b/tests/data/valid_flat_bundle/encounter.parquet new file mode 100644 index 0000000..a1cdf7a Binary files /dev/null and b/tests/data/valid_flat_bundle/encounter.parquet differ diff --git a/tests/data/valid_flat_bundle/patient.parquet b/tests/data/valid_flat_bundle/patient.parquet new file mode 100644 index 0000000..c27c24b Binary files /dev/null and b/tests/data/valid_flat_bundle/patient.parquet differ diff --git a/tests/data/valid_flat_compressed.zip b/tests/data/valid_flat_compressed.zip new file mode 100644 index 0000000..c172605 Binary files /dev/null and b/tests/data/valid_flat_compressed.zip differ diff --git a/tests/dummy_data/combined_dummy_data_parquet_error.csv b/tests/dummy_data/combined_dummy_data_parquet_error.csv new file mode 100644 index 0000000..13ceff9 --- /dev/null +++ b/tests/dummy_data/combined_dummy_data_parquet_error.csv @@ -0,0 +1,3 @@ +subjid,visitid,dates_enrolment,dates_adm,dates_admdate,dates_admtime,non_encounter_field,outco_denguediag,outco_denguediag_main,outco_denguediag_class,outco_not_dengue,outco_secondiag_oth,outco_date,outco_outcome,daily_date,vital_highesttem_c,vital_hr,vital_rr,vital_systolicbp,vital_diastolicbp,vital_spo2,vital_fio2spo2_02110,vital_fio2spo2_pcnt,vital_capillaryr,vital_avpu,vital_gcs,vital_urineflow +1,10,2020-05-01,0,,,,,,,cough,,,7,2020-01-01,36.2,120.0,30.0,70.0,120.0,5.0,,75.0,1.0,1.0,1.0,150.0 +2,11,2021-04-01,1,,,fish,1.0,,2.0,,,2021-04-10,1,2021-02-02,37.0,100.0,40.0,80.0,130.0,6.0,10.0,85.0,0.0,2.0,1.0,200.0 diff --git a/tests/test_base.py b/tests/test_base.py new file mode 100644 index 0000000..88c523c --- /dev/null +++ b/tests/test_base.py @@ -0,0 +1,178 @@ +from fhirflat.resources.encounter import Encounter +import fhirflat +import pandas as pd +import pytest +from pydantic.v1 import ValidationError +from pathlib import Path + + +def test_flat_fields(): + p = fhirflat.Patient() + ff = p.flat_fields() + + assert ff == [ + "id", + "extension", + "gender", + "birthDate", + "deceasedBoolean", + "deceasedDateTime", + "maritalStatus", + "multipleBirthBoolean", + "multipleBirthInteger", + "generalPractitioner", + "managingOrganization", + ] + + +def test_to_flat_no_filename(): + PATIENT_DICT_INPUT = { + "id": "f001", + "active": True, + "name": [{"text": "Micky Mouse"}], + "gender": "male", + "deceasedBoolean": False, + "address": [{"country": "Switzerland"}], + "birthDate": "1996-05-30", + } + + p = fhirflat.Patient(**PATIENT_DICT_INPUT) + flat_p = p.to_flat() + + assert isinstance(flat_p, pd.Series) + + +def test_validate_fhirflat_single_resource_errors(): + df = pd.DataFrame( + { + "subjid": [2], + "flat_dict": [ + { + "subject": "Patient/2", + "id": 11, + "actualPeriod.start": "NOT A DATE", + "actualPeriod.end": "2021-04-10", + "extension.timingPhase.system": "https://snomed.info/sct", + "extension.timingPhase.code": 278307001.0, + "extension.timingPhase.text": "On admission (qualifier value)", + "class.system": "https://snomed.info/sct", + "class.code": 32485007.0, + "class.text": "Hospital admission (procedure)", + "diagnosis.condition.concept.system": [ + "https://snomed.info/sct", + "https://snomed.info/sct", + ], + "diagnosis.condition.concept.code": [38362002.0, 722863008.0], + "diagnosis.condition.concept.text": [ + "Dengue (disorder)", + "Dengue with warning signs (disorder)", + ], + "diagnosis.use.system": [ + "https://snomed.info/sct", + "https://snomed.info/sct", + ], + "diagnosis.use.code": [89100005.0, 89100005.0], + "diagnosis.use.text": [ + "Final diagnosis (discharge) (contextual qualifier) (qualifier value)", # noqa: E501 + "Final diagnosis (discharge) (contextual qualifier) (qualifier value)", # noqa: E501 + ], + "admission.dischargeDisposition.system": "https://snomed.info/sct", + "admission.dischargeDisposition.code": 371827001.0, + "admission.dischargeDisposition.text": "Patient discharged alive (finding)", # noqa: E501 + } + ], + }, + index=[0], + ) + + flat_df = Encounter.ingest_to_flat(df) + with pytest.raises(ValidationError, match="invalid datetime format"): + _, _ = Encounter.validate_fhirflat(flat_df) + + +def test_validate_fhirflat_multi_resource_errors(): + df = pd.DataFrame( + { + "subjid": [1, 2], + "flat_dict": [ + { + "subject": "Patient/1", + "id": 11, + "actualPeriod.start": "2021-04-01", + "actualPeriod.end": "2021-04-10", + "extension.timingPhase.system": "https://snomed.info/sct", + "extension.timingPhase.code": 278307001.0, + "extension.timingPhase.text": "On admission (qualifier value)", + "class.system": "https://snomed.info/sct", + "class.code": 32485007.0, + "class.text": "Hospital admission (procedure)", + "diagnosis.condition.concept.system": [ + "https://snomed.info/sct", + "https://snomed.info/sct", + ], + "diagnosis.condition.concept.code": [38362002.0, 722863008.0], + "diagnosis.condition.concept.text": [ + "Dengue (disorder)", + "Dengue with warning signs (disorder)", + ], + "diagnosis.use.system": [ + "https://snomed.info/sct", + "https://snomed.info/sct", + ], + "diagnosis.use.code": [89100005.0, 89100005.0], + "diagnosis.use.text": [ + "Final diagnosis (discharge) (contextual qualifier) (qualifier value)", # noqa: E501 + "Final diagnosis (discharge) (contextual qualifier) (qualifier value)", # noqa: E501 + ], + "admission.dischargeDisposition.system": "https://snomed.info/sct", + "admission.dischargeDisposition.code": 371827001.0, + "admission.dischargeDisposition.text": "Patient discharged alive (finding)", # noqa: E501 + }, + { + "subject": "Patient/2", + "id": 12, + "actualPeriod.start": ["2021-04-01", None], + "actualPeriod.end": [None, "2021-04-10"], + "extension.timingPhase.system": "https://snomed.info/sct", + "extension.timingPhase.code": 278307001.0, + "extension.timingPhase.text": "On admission (qualifier value)", + "class.system": "https://snomed.info/sct", + "class.code": 32485007.0, + "class.text": "Hospital admission (procedure)", + "diagnosis.condition.concept.system": [ + "https://snomed.info/sct", + "https://snomed.info/sct", + ], + "diagnosis.condition.concept.code": [38362002.0, 722863008.0], + "diagnosis.condition.concept.text": [ + "Dengue (disorder)", + "Dengue with warning signs (disorder)", + ], + "diagnosis.use.system": [ + "https://snomed.info/sct", + "https://snomed.info/sct", + ], + "diagnosis.use.code": [89100005.0, 89100005.0], + "diagnosis.use.text": [ + "Final diagnosis (discharge) (contextual qualifier) (qualifier value)", # noqa: E501 + "Final diagnosis (discharge) (contextual qualifier) (qualifier value)", # noqa: E501 + ], + "admission.dischargeDisposition.system": "https://snomed.info/sct", + "admission.dischargeDisposition.code": 371827001.0, + "admission.dischargeDisposition.text": "Patient discharged alive (finding)", # noqa: E501 + }, + ], + }, + ) + flat_df = Encounter.ingest_to_flat(df) + + assert "diagnosis_dense" in flat_df.columns + + valid, errors = Encounter.validate_fhirflat(flat_df) + + assert len(valid) == 1 + assert len(errors) == 1 + assert ( + repr(errors["validation_error"][1].errors()) + == "[{'loc': ('actualPeriod', 'end'), 'msg': 'invalid type; expected datetime, string, bytes, int or float', 'type': 'type_error'}, {'loc': ('actualPeriod', 'start'), 'msg': 'invalid type; expected datetime, string, bytes, int or float', 'type': 'type_error'}]" # noqa: E501 + ) diff --git a/tests/test_condition_resource.py b/tests/test_condition_resource.py index cb68b88..e8b94c1 100644 --- a/tests/test_condition_resource.py +++ b/tests/test_condition_resource.py @@ -106,28 +106,26 @@ } CONDITION_FLAT = { - "resourceType": ["Condition"], + "resourceType": "Condition", "extension.presenceAbsence.code": ["http://snomed.info/sct|410605003"], "extension.presenceAbsence.text": ["Present"], - "extension.prespecifiedQuery": [True], + "extension.prespecifiedQuery": True, "category.code": [ - [ - "http://snomed.info/sct|55607006", - "http://terminology.hl7.org/CodeSystem/condition-category|problem-list-item", # noqa: E501 - ] + "http://snomed.info/sct|55607006", + "http://terminology.hl7.org/CodeSystem/condition-category|problem-list-item", # noqa: E501 ], - "category.text": [["Problem", None]], + "category.text": ["Problem", None], "bodySite.code": ["http://snomed.info/sct|38266002"], - "bodySite.text": ["whole body"], - "onsetDateTime": [datetime.date(2013, 4, 2)], - "abatementString": ["around April 9, 2013"], - "recordedDate": [datetime.date(2013, 4, 4)], + "bodySite.text": "whole body", + "onsetDateTime": datetime.date(2013, 4, 2), + "abatementString": "around April 9, 2013", + "recordedDate": datetime.date(2013, 4, 4), "severity.code": ["http://snomed.info/sct|255604002"], "severity.text": ["Mild"], "code.code": ["http://snomed.info/sct|386661006"], - "code.text": ["Fever"], - "subject": ["Patient/f201"], - "encounter": ["Encounter/f201"], + "code.text": "Fever", + "subject": "Patient/f201", + "encounter": "Encounter/f201", } CONDITION_DICT_OUT = { @@ -209,11 +207,12 @@ def test_condition_to_flat(): fever.to_flat("test_condition.parquet") - assert_frame_equal( - pd.read_parquet("test_condition.parquet"), - pd.DataFrame(CONDITION_FLAT), - check_like=True, - ) + fever_flat = pd.read_parquet("test_condition.parquet") + expected = pd.DataFrame([CONDITION_FLAT], index=[0]) + expected = expected.reindex(sorted(expected.columns), axis=1) + # v, e = Condition.validate_fhirflat(expected) + + assert_frame_equal(fever_flat, expected) os.remove("test_condition.parquet") diff --git a/tests/test_fhir2flat_units.py b/tests/test_fhir2flat_units.py index 2ca5e38..61ff6b4 100644 --- a/tests/test_fhir2flat_units.py +++ b/tests/test_fhir2flat_units.py @@ -118,12 +118,12 @@ def test_explode_and_flatten_no_multiples(data_lists, expected): ), ], ) -def test_expandCoding(data, expected): +def test_condenseCoding(data, expected): # Create a mock DataFrame df = pd.DataFrame(data) # Call the function - result = f2f.expandCoding(df, "code.coding") + result = f2f.condenseCoding(df, "code.coding") # Check the result expected = pd.DataFrame(expected) diff --git a/tests/test_ingest.py b/tests/test_ingest.py index 90843bf..ae272a4 100644 --- a/tests/test_ingest.py +++ b/tests/test_ingest.py @@ -9,6 +9,8 @@ write_metadata, checksum, main, + validate, + validate_cli, ) from fhirflat.resources.encounter import Encounter from fhirflat.resources.observation import Observation @@ -18,7 +20,6 @@ import sys import shutil from pathlib import Path -from decimal import Decimal import numpy as np import pytest @@ -449,10 +450,9 @@ def test_create_dict_one_to_one_dense_freetext(file, expected): ENCOUNTER_SINGLE_ROW_FLAT = { - "resourceType": "Encounter", - "id": "11", - "class.code": "https://snomed.info/sct|32485007", - "class.text": "Hospital admission (procedure)", + "id": 11, + "class.code": ["https://snomed.info/sct|32485007"], + "class.text": ["Hospital admission (procedure)"], "diagnosis_dense": [ { "condition": [ @@ -510,8 +510,8 @@ def test_create_dict_one_to_one_dense_freetext(file, expected): "subject": "Patient/2", "actualPeriod.start": "2021-04-01T18:00:00-03:00", "actualPeriod.end": "2021-04-10", - "admission.dischargeDisposition.code": "https://snomed.info/sct|371827001", - "admission.dischargeDisposition.text": "Patient discharged alive (finding)", + "admission.dischargeDisposition.code": ["https://snomed.info/sct|371827001"], + "admission.dischargeDisposition.text": ["Patient discharged alive (finding)"], "extension.timingPhase.code": ["https://snomed.info/sct|278307001"], "extension.timingPhase.text": ["On admission (qualifier value)"], } @@ -528,14 +528,19 @@ def test_load_data_one_to_one_single_row(): ) assert df is not None - Encounter.ingest_to_flat(df, "encounter_ingestion_single") + + flat_encounter = Encounter.ingest_to_flat(df) + expected_encounter = pd.DataFrame([ENCOUNTER_SINGLE_ROW_FLAT], index=[0]) + + _, error = Encounter.validate_fhirflat(flat_encounter) + + assert error is None assert_frame_equal( - pd.read_parquet("encounter_ingestion_single.parquet"), - pd.DataFrame([ENCOUNTER_SINGLE_ROW_FLAT], index=[0]), - check_dtype=False, + flat_encounter, + expected_encounter, + check_like=True, ) - os.remove("encounter_ingestion_single.parquet") def test_load_data_one_to_one_dense_single_row(): @@ -549,13 +554,15 @@ def test_load_data_one_to_one_dense_single_row(): ) assert df is not None - Encounter.ingest_to_flat(df, "encounter_ingestion_dense") - df_parquet = pd.read_parquet("encounter_ingestion_dense.parquet") + flat_diagnosis = Encounter.ingest_to_flat(df) + + _, e = Encounter.validate_fhirflat(flat_diagnosis) + assert e is None expected_diagnosis = [ { - "condition": [{"concept": {"coding": None, "text": "sepsis"}}], + "condition": [{"concept": {"text": "sepsis"}}], "use": [ { "coding": [ @@ -579,7 +586,6 @@ def test_load_data_one_to_one_dense_single_row(): "system": "https://snomed.info/sct", } ], - "text": None, } } ], @@ -597,23 +603,21 @@ def test_load_data_one_to_one_dense_single_row(): }, ] - assert all(df_parquet["diagnosis_dense"][0] == expected_diagnosis) - os.remove("encounter_ingestion_dense.parquet") + assert flat_diagnosis["diagnosis_dense"].iloc[0] == expected_diagnosis ENCOUNTER_SINGLE_ROW_MULTI = { - "resourceType": ["Encounter", "Encounter", "Encounter", "Encounter"], "class.code": [ - "https://snomed.info/sct|371883000", - "https://snomed.info/sct|32485007", - "https://snomed.info/sct|32485007", - "https://snomed.info/sct|32485007", + ["https://snomed.info/sct|371883000"], + ["https://snomed.info/sct|32485007"], + ["https://snomed.info/sct|32485007"], + ["https://snomed.info/sct|32485007"], ], "class.text": [ - "Outpatient procedure (procedure)", - "Hospital admission (procedure)", - "Hospital admission (procedure)", - "Hospital admission (procedure)", + ["Outpatient procedure (procedure)"], + ["Hospital admission (procedure)"], + ["Hospital admission (procedure)"], + ["Hospital admission (procedure)"], ], "diagnosis_dense": [ None, @@ -746,7 +750,7 @@ def test_load_data_one_to_one_dense_single_row(): ["Final diagnosis (discharge) (contextual qualifier) (qualifier value)"], ], "subject": ["Patient/1", "Patient/2", "Patient/3", "Patient/4"], - "id": ["10", "11", "12", "13"], + "id": [10, 11, 12, 13], "actualPeriod.start": [ "2020-05-01", "2021-04-01T18:00:00-03:00", @@ -760,16 +764,16 @@ def test_load_data_one_to_one_dense_single_row(): "2022-06-20", ], "admission.dischargeDisposition.code": [ - "https://snomed.info/sct|371827001", - "https://snomed.info/sct|371827001", - "https://snomed.info/sct|419099009", - "https://snomed.info/sct|32485007", + ["https://snomed.info/sct|371827001"], + ["https://snomed.info/sct|371827001"], + ["https://snomed.info/sct|419099009"], + ["https://snomed.info/sct|32485007"], ], "admission.dischargeDisposition.text": [ - "Patient discharged alive (finding)", - "Patient discharged alive (finding)", - "Dead (finding)", - "Hospital admission (procedure)", + ["Patient discharged alive (finding)"], + ["Patient discharged alive (finding)"], + ["Dead (finding)"], + ["Hospital admission (procedure)"], ], "extension.timingPhase.code": [ ["https://snomed.info/sct|281379000"], @@ -797,38 +801,34 @@ def test_load_data_one_to_one_multi_row(): ) assert df is not None - Encounter.ingest_to_flat(df, "encounter_ingestion_multi") + + flat_encounter = Encounter.ingest_to_flat(df) + + _, e = Encounter.validate_fhirflat(flat_encounter) + assert e is None assert_frame_equal( - pd.read_parquet("encounter_ingestion_multi.parquet"), + flat_encounter, pd.DataFrame(ENCOUNTER_SINGLE_ROW_MULTI), check_dtype=False, check_like=True, ) - os.remove("encounter_ingestion_multi.parquet") OBS_FLAT = { - "resourceType": [ - "Observation", - "Observation", - "Observation", - "Observation", - "Observation", - ], "category.code": [ - "http://terminology.hl7.org/CodeSystem/observation-category|vital-signs", - "http://terminology.hl7.org/CodeSystem/observation-category|vital-signs", - "http://terminology.hl7.org/CodeSystem/observation-category|vital-signs", - "http://terminology.hl7.org/CodeSystem/observation-category|vital-signs", - "http://terminology.hl7.org/CodeSystem/observation-category|vital-signs", + ["http://terminology.hl7.org/CodeSystem/observation-category|vital-signs"], + ["http://terminology.hl7.org/CodeSystem/observation-category|vital-signs"], + ["http://terminology.hl7.org/CodeSystem/observation-category|vital-signs"], + ["http://terminology.hl7.org/CodeSystem/observation-category|vital-signs"], + ["http://terminology.hl7.org/CodeSystem/observation-category|vital-signs"], ], "category.text": [ - "Vital Signs", - "Vital Signs", - "Vital Signs", - "Vital Signs", - "Vital Signs", + ["Vital Signs"], + ["Vital Signs"], + ["Vital Signs"], + ["Vital Signs"], + ["Vital Signs"], ], "effectiveDateTime": [ "2020-01-01", @@ -838,18 +838,18 @@ def test_load_data_one_to_one_multi_row(): "2021-02-02", ], "code.code": [ - "https://loinc.org|8310-5", - "https://loinc.org|8310-5", - "https://loinc.org|8310-5", - "https://loinc.org|8867-4", - "https://loinc.org|8867-4", + ["https://loinc.org|8310-5"], + ["https://loinc.org|8310-5"], + ["https://loinc.org|8310-5"], + ["https://loinc.org|8867-4"], + ["https://loinc.org|8867-4"], ], "code.text": [ - "Body temperature", - "Body temperature", - "Body temperature", - "Heart rate", - "Heart rate", + ["Body temperature"], + ["Body temperature"], + ["Body temperature"], + ["Heart rate"], + ["Heart rate"], ], "subject": ["Patient/1", "Patient/2", "Patient/3", "Patient/1", "Patient/2"], "encounter": [ @@ -859,7 +859,7 @@ def test_load_data_one_to_one_multi_row(): "Encounter/10", "Encounter/11", ], - "valueQuantity.value": [Decimal("36.2"), 37.0, 35.5, 120.0, 100.0], + "valueQuantity.value": [36.2, 37.0, 35.5, 120.0, 100.0], "valueQuantity.unit": [ "DegreesCelsius", "DegreesCelsius", @@ -892,13 +892,15 @@ def test_load_data_one_to_many_multi_row(): assert df is not None clean_df = df.dropna().copy() - Observation.ingest_to_flat(clean_df, "observation_ingestion") - full_df = pd.read_parquet("observation_ingestion.parquet") + flat_obs = Observation.ingest_to_flat(clean_df) - assert len(full_df) == 33 + _, e = Observation.validate_fhirflat(flat_obs) + assert e is None - df_head = full_df.head(5) + assert len(flat_obs) == 33 + + df_head = flat_obs.head(5) assert_frame_equal( df_head, @@ -906,7 +908,6 @@ def test_load_data_one_to_many_multi_row(): check_dtype=False, check_like=True, ) - os.remove("observation_ingestion.parquet") def test_convert_data_to_flat_missing_mapping_error(): @@ -937,6 +938,44 @@ def test_convert_data_to_flat_wrong_mapping_type_error(): ) +def test_convert_data_to_flat_no_validation_warning(): + mappings = { + Encounter: "tests/dummy_data/encounter_dummy_mapping.csv", + } + resource_types = {"Encounter": "one-to-one"} + + with pytest.warns( + UserWarning, match="Validation of the FHIRflat files has been disabled" + ): + convert_data_to_flat( + "tests/dummy_data/combined_dummy_data.csv", + folder_name="tests/ingestion_output", + date_format="%Y-%m-%d", + timezone="Brazil/East", + mapping_files_types=(mappings, resource_types), + validate=False, + ) + shutil.rmtree("tests/ingestion_output") + + +def test_convert_data_to_flat_no_validation_invalid_file_warning(): + mappings = { + Encounter: "tests/dummy_data/encounter_dummy_mapping.csv", + } + resource_types = {"Encounter": "one-to-one"} + + with pytest.warns(UserWarning, match="This is likely due to a validation error"): + convert_data_to_flat( + "tests/dummy_data/combined_dummy_data_parquet_error.csv", + folder_name="tests/ingestion_output_errors", + date_format="%Y-%m-%d", + timezone="Brazil/East", + mapping_files_types=(mappings, resource_types), + validate=False, + ) + shutil.rmtree("tests/ingestion_output_errors") + + def test_generate_metadata(): meta = generate_metadata("tests/bundle") assert meta[0]["checksum"] == METADATA_CHECKSUM @@ -1020,6 +1059,9 @@ def test_convert_data_to_flat_local_mapping_zipped(): os.remove("tests/ingestion_output.zip") +# This doesn't run intermittantly - because of the "#NAME" error i get with the google +# sheets +# Turns out this is an issue with custom functions in Google Sheets, not a Python thing. def test_main(capsys, monkeypatch): # Simulate command line arguments monkeypatch.setattr( @@ -1041,55 +1083,17 @@ def test_main(capsys, monkeypatch): shutil.rmtree("fhirflat_output") -def test_ingest_to_flat_validation_errors(): - df = pd.DataFrame( - { - "subjid": [2], - "flat_dict": [ - { - "subject": "Patient/2", - "id": 11, - "actualPeriod.start": "NOT A DATE", - "actualPeriod.end": "2021-04-10", - "extension.timingPhase.system": "https://snomed.info/sct", - "extension.timingPhase.code": 278307001.0, - "extension.timingPhase.text": "On admission (qualifier value)", - "class.system": "https://snomed.info/sct", - "class.code": 32485007.0, - "class.text": "Hospital admission (procedure)", - "diagnosis.condition.concept.system": [ - "https://snomed.info/sct", - "https://snomed.info/sct", - ], - "diagnosis.condition.concept.code": [38362002.0, 722863008.0], - "diagnosis.condition.concept.text": [ - "Dengue (disorder)", - "Dengue with warning signs (disorder)", - ], - "diagnosis.use.system": [ - "https://snomed.info/sct", - "https://snomed.info/sct", - ], - "diagnosis.use.code": [89100005.0, 89100005.0], - "diagnosis.use.text": [ - "Final diagnosis (discharge) (contextual qualifier) (qualifier value)", # noqa: E501 - "Final diagnosis (discharge) (contextual qualifier) (qualifier value)", # noqa: E501 - ], - "admission.dischargeDisposition.system": "https://snomed.info/sct", - "admission.dischargeDisposition.code": 371827001.0, - "admission.dischargeDisposition.text": "Patient discharged alive (finding)", # noqa: E501 - } - ], - }, - index=[0], - ) - - error_df = Encounter.ingest_to_flat(df, "encounter_date_error") - assert len(error_df) == 1 - assert ( - repr(error_df["validation_error"][0].errors()) - == "[{'loc': ('actualPeriod', 'start'), 'msg': 'invalid datetime format', 'type': 'value_error.datetime'}]" # noqa: E501 +def test_validate_cli(capsys, monkeypatch): + # Simulate command line arguments + monkeypatch.setattr( + "sys.argv", + ["ingest.py", "tests/data/valid_flat_bundle"], ) + validate_cli() + captured = capsys.readouterr() + assert "encounter.parquet is valid" in captured.out + assert "condition.parquet is valid" in captured.out + assert "validation errors" not in captured.out def test_convert_data_to_flat_local_mapping_errors(): @@ -1112,9 +1116,7 @@ def test_convert_data_to_flat_local_mapping_errors(): encounter_df = pd.read_parquet("tests/ingestion_output_errors/encounter.parquet") obs_df = pd.read_parquet("tests/ingestion_output_errors/observation.parquet") - expected_encounter_minus_errors = ( - pd.DataFrame(ENCOUNTER_SINGLE_ROW_MULTI).iloc[:-1].dropna(axis=1, how="all") - ) + expected_encounter_minus_errors = pd.DataFrame(ENCOUNTER_SINGLE_ROW_MULTI).iloc[:-1] assert_frame_equal( encounter_df, @@ -1142,3 +1144,44 @@ def test_convert_data_to_flat_local_mapping_errors(): ) shutil.rmtree(output_folder) + + +def test_validate_valid(capsys): + folder = "tests/data/valid_flat_bundle" + + validate(folder) + + captured = capsys.readouterr() + assert "encounter.parquet is valid" in captured.out + assert "condition.parquet is valid" in captured.out + assert "patient.parquet is valid" in captured.out + assert "Validation complete" in captured.out + + +def test_validate_compress(capsys): + folder = "tests/data/valid_flat_compressed.zip" + + validate(folder, compress_format="zip") + + captured = capsys.readouterr() + assert "patient.parquet is valid" in captured.out + assert "Validation complete" in captured.out + + assert Path("tests/data/valid_flat_compressed_validated.zip").exists() + Path.unlink("tests/data/valid_flat_compressed_validated.zip") + + +def test_validate_invalid(capsys): + folder = "tests/data/invalid_flat_bundle" + + validate(folder) + + captured = capsys.readouterr() + assert "encounter.parquet have validation errors" in captured.out + assert "condition.parquet have validation errors" in captured.out + assert "Validation complete" in captured.out + + Path.unlink(os.path.join(folder, "encounter_errors.csv")) + Path.unlink(os.path.join(folder, "encounter_valid.parquet")) + Path.unlink(os.path.join(folder, "condition_errors.csv")) + Path.unlink(os.path.join(folder, "condition_valid.parquet")) diff --git a/tests/test_utils.py b/tests/test_utils.py index 163420a..872d4f2 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -6,12 +6,16 @@ get_fhirtype, get_local_extension_type, get_local_resource, + condense_codes, ) from fhir.resources.quantity import Quantity from fhir.resources.codeableconcept import CodeableConcept from fhir.resources.medicationstatement import MedicationStatementAdherence from fhirflat.resources.extensions import dateTimeExtension, Duration +from fhirflat import MedicationStatement +import pandas as pd +import numpy as np def test_group_keys(): @@ -78,3 +82,80 @@ def test_get_local_extension_type_raises(): def test_get_local_resource(): result = get_local_resource("Patient") assert result == fhirflat.Patient + + +def test_get_local_resource_case_insensitive(): + result = get_local_resource("medicationstatement", case_insensitive=True) + assert result == MedicationStatement + + +@pytest.mark.parametrize( + "input, expected", + [ + ( + ( + pd.Series( + { + "test.system": "http://loinc.org", + "test.code": "1234", + "test.display": "Test", + } + ), + "test", + ), + pd.Series( + { + "test.system": "http://loinc.org", + "test.code": "http://loinc.org|1234", + "test.display": "Test", + } + ), + ), + ( + ( + pd.Series( + { + "test.system": "http://loinc.org", + "test.code": np.nan, + "test.display": "Test", + } + ), + "test", + ), + pd.Series( + { + "test.system": "http://loinc.org", + "test.code": None, + "test.display": "Test", + } + ), + ), + ( + ( + pd.Series( + { + "test.system": ["http://loinc.org", "http://snomed.info/sct"], + "test.code": ["1234", 5678], + "test.display": "Test", + } + ), + "test", + ), + pd.Series( + { + "test.system": ["http://loinc.org", "http://snomed.info/sct"], + "test.code": [ + "http://loinc.org|1234", + "http://snomed.info/sct|5678", + ], + "test.display": "Test", + } + ), + ), + ], +) +def test_condense_codes(input, expected): + row, col = input + result = condense_codes(row, col) + + pd.testing.assert_series_equal(result, expected)