diff --git a/digital_land/cli.py b/digital_land/cli.py index 9a30f2df..9e47e9e5 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -238,26 +238,6 @@ def expectations_run_dataset_checkpoint(data_path, output_dir, specification_dir run_dataset_checkpoint(data_path, output_dir, dataset, typology) -@cli.command( - "expectations-converted-resource-checkpoint", - short_help="runs data quality expectations against a converted resource", -) -@click.option( - "--data-path", help="path to the converted resource to use", required=True -) -@click.option("--output-dir", help="path/name to sqlite3 dataset", required=True) -@click.option("--specification-dir", help="checkpoint to run", required=True) -@click.option("--dataset", help="checkpoint to run", required=True) -def expectations_run_converted_resource_checkpoint( - data_path, output_dir, specification_dir, dataset -): - from digital_land.expectations.commands import run_converted_resource_checkpoint - - spec = Specification(specification_dir) - typology = spec.get_dataset_typology(dataset) - run_converted_resource_checkpoint(data_path, output_dir, dataset, typology) - - # edit to add collection_name in @cli.command("add-endpoints-and-lookups") @click.argument("csv-path", nargs=1, type=click.Path()) diff --git a/digital_land/commands.py b/digital_land/commands.py index ad9d05b1..e0aaee14 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -19,6 +19,7 @@ from digital_land.phase.combine import FactCombinePhase from digital_land.phase.concat import ConcatFieldPhase from digital_land.phase.convert import ConvertPhase, execute +from digital_land.phase.post_conversion import PostConversionPhase from digital_land.phase.default import DefaultPhase from digital_land.phase.dump import DumpPhase from digital_land.phase.factor import FactorPhase @@ -194,6 +195,9 @@ def pipeline_run( ), OrganisationPhase(organisation=organisation, issues=issue_log), FieldPrunePhase(fields=specification.current_fieldnames(schema)), + PostConversionPhase( # Now badly named... + issues=issue_log, + ), EntityReferencePhase( dataset=dataset, prefix=specification.dataset_prefix(dataset), diff --git a/digital_land/expectations/checkpoints/base.py b/digital_land/expectations/checkpoints/base.py index f87857f3..658e97e3 100644 --- a/digital_land/expectations/checkpoints/base.py +++ b/digital_land/expectations/checkpoints/base.py @@ -16,7 +16,7 @@ def __init__(self, checkpoint, data_path): self.checkpoint = checkpoint self.data_path = data_path self.data_name = Path(data_path).stem - self.responses = [] + self.results = [] self.issues = [] # each issue is going to have different fields, so define here what all of them are # this will take some iterations to get right @@ -112,28 +112,25 @@ def run(self): self.failed_expectation_with_error_severity = 0 for expectation in self.expectations: - response = self.run_expectation(expectation) - self.responses.append(response) - self.issues.extend(response.issues) - self.failed_expectation_with_error_severity += response.act_on_failure() + result = self.run_expectation(expectation) + self.results.append(result) + self.issues.extend(result.issues) + self.failed_expectation_with_error_severity += result.act_on_failure() if self.failed_expectation_with_error_severity > 0: raise DataQualityException( "One or more expectations with severity RaiseError failed, see results for more details" ) - def save_results(self, responses, file_path, format="csv"): - + def save_results(self, results, file_path, format="csv"): os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "w") as f: if format == "csv": dictwriter = DictWriter(f, fieldnames=self.result_fieldnames) dictwriter.writeheader() - dictwriter.writerows( - [response.dict_for_export() for response in responses] - ) + dictwriter.writerows([result.dict_for_export() for result in results]) elif format == "json": - json.dump([response.to_dict() for response in responses], f) + json.dump([result.to_dict() for result in results], f) else: raise ValueError(f"format must be csv or json and cannot be {format}") diff --git a/digital_land/expectations/checkpoints/converted_resource.py b/digital_land/expectations/checkpoints/converted_resource.py deleted file mode 100644 index e735a41d..00000000 --- a/digital_land/expectations/checkpoints/converted_resource.py +++ /dev/null @@ -1,13 +0,0 @@ -# checkpoint needs to assemble class state -# it needs to validate inputs specific for that checkpoint -# it then needs to run expectations -# then it needs to be able to save those expectation resultts -# a checkpoint represents the moment in the process where we tell it the -# type of data it is validating and where the data is -# the primary different between checkpoints is how it loads expectations (i.e. where that are loaded from) -from .base import BaseCheckpoint - - -class CovertedResourceCheckpoint(BaseCheckpoint): - def load(): - pass diff --git a/digital_land/expectations/checkpoints/dataset.py b/digital_land/expectations/checkpoints/dataset.py index e7504cc3..6c42a3cf 100644 --- a/digital_land/expectations/checkpoints/dataset.py +++ b/digital_land/expectations/checkpoints/dataset.py @@ -78,7 +78,7 @@ def save(self, output_dir, format="csv"): ) self.save_results( - self.responses, + self.results, responses_file_path, format=format, ) diff --git a/digital_land/expectations/commands.py b/digital_land/expectations/commands.py index d16c6533..d75cf729 100644 --- a/digital_land/expectations/commands.py +++ b/digital_land/expectations/commands.py @@ -1,5 +1,4 @@ from .checkpoints.dataset import DatasetCheckpoint -from .checkpoints.converted_resource import CovertedResourceCheckpoint def run_dataset_checkpoint( @@ -18,21 +17,3 @@ def run_dataset_checkpoint( checkpoint.save(output_dir, format="csv") if act_on_critical_error: checkpoint.act_on_critical_error() - - -def run_converted_resource_checkpoint( - converted_resource_path, - output_dir, - dataset, - typology, - act_on_critical_error=False, -): - """ - Function to run the expectation checkpoint for a converted resource - """ - checkpoint = CovertedResourceCheckpoint(converted_resource_path, dataset, typology) - checkpoint.load() - checkpoint.run() - checkpoint.save(output_dir, format="csv") - if act_on_critical_error: - checkpoint.act_on_critical_error() diff --git a/digital_land/expectations/expectation_functions/resource_validations.py b/digital_land/expectations/expectation_functions/resource_validations.py new file mode 100644 index 00000000..6f6d0520 --- /dev/null +++ b/digital_land/expectations/expectation_functions/resource_validations.py @@ -0,0 +1,53 @@ +import csv + + +def check_for_duplicate_references(csv_path, **kwargs): + duplicates = {} + issues = [] + with csv_path.open(newline="") as csvfile: + reader = csv.DictReader(csvfile) + for row_number, row in enumerate(reader, start=1): + ref = row.get("reference") + if ref in duplicates: + duplicates[ref].append(row_number) + else: + duplicates[ref] = [row_number] + + for ref, rows in duplicates.items(): + if len(rows) > 1: + issues.append( + { + "scope": "row-group", + "message": f"Duplicate reference '{ref}' found on rows: {', '.join(map(str, rows))}", + "dataset": "dataset", + "table_name": "resource", + "rows": rows, + "row_id": str(rows[0]), + "organisation": "organisation", + } + ) + + return True, "Checked for duplicate references.", issues + + +def validate_references(csv_path, **kwargs): + issues = [] + with csv_path.open(newline="") as csvfile: + reader = csv.DictReader(csvfile) + for row_number, row in enumerate(reader, start=1): + ref = row.get("reference") + if not ref: # This will be True for both None and empty strings + issues.append( + { + "scope": "value", + "message": f"Reference is missing on row {row_number}.", + "dataset": "dataset", + "table_name": "resource", + "field_name": "reference", + "row_id": str(row_number), + "value": "Missing", + "organisation": "organisation", + } + ) + + return len(issues) == 0, "Checked for unpopulated references.", issues diff --git a/digital_land/expectations/issue.py b/digital_land/expectations/issue.py index 68cd0ae8..f68b3e47 100644 --- a/digital_land/expectations/issue.py +++ b/digital_land/expectations/issue.py @@ -120,8 +120,8 @@ class ValueIssue(Issue): scope: str dataset: str table_name: str = field(metadata=config(field_name="table-name")) - field_name: str - row_id: str + field_name: str = field(metadata=config(field_name="field-name")) + row_id: str = field(metadata=config(field_name="row-id")) value: str organisation: str diff --git a/digital_land/phase/convert.py b/digital_land/phase/convert.py index 091fa006..b57c22c1 100644 --- a/digital_land/phase/convert.py +++ b/digital_land/phase/convert.py @@ -193,6 +193,9 @@ def _read_text_file(self, input_path, encoding): return reader + def format_issue_message(self, issue): + return f"Checkpoint Issue: {issue['message']} at line {issue.get('line_number', 'N/A')} (Severity: {issue['severity']})" + def _find_zip_file(self, input_file, suffix=".gml"): zip_ = zipfile.ZipFile(input_file) files = zip_.namelist() diff --git a/digital_land/phase/post_conversion.py b/digital_land/phase/post_conversion.py new file mode 100644 index 00000000..5198d112 --- /dev/null +++ b/digital_land/phase/post_conversion.py @@ -0,0 +1,49 @@ +from digital_land.phase.phase import Phase + + +class PostConversionPhase(Phase): + def __init__( + self, + issues, + ): + self.issues = issues + self.duplicates = {} + + def process(self, stream): + for block in stream: + row = block.get("row", None) + if not row: + return + + reference = row.get("reference", None) + line_number = block.get("line-number", None) + + if reference and line_number: + self.validate_references(reference, line_number) + self.check_for_duplicate_references(reference, line_number) + yield block + + for ref, lines in self.duplicates.items(): + if len(lines) > 1: + self.issues.log_issue( + "reference", + "duplicate-reference", + ref, + f"Duplicate reference '{ref}' found on lines: {', '.join(map(str, lines))}", + ) + + def validate_references(self, reference, line_number): + if not reference: # This will be True for both None and empty strings + self.issues.log_issue( + "reference", + "missing-reference", + "", + "", + line_number, + ) + + def check_for_duplicate_references(self, reference, line_number): + if reference in self.duplicates: + self.duplicates[reference].append(line_number) + else: + self.duplicates[reference] = [line_number] diff --git a/tests/integration/expectations/test_checkpoint.py b/tests/integration/expectations/test_checkpoint.py index e885b8b1..2603ff7a 100644 --- a/tests/integration/expectations/test_checkpoint.py +++ b/tests/integration/expectations/test_checkpoint.py @@ -2,8 +2,12 @@ import os import spatialite import pandas as pd -from csv import DictReader +from csv import DictReader, DictWriter from digital_land.expectations.checkpoints.dataset import DatasetCheckpoint +from digital_land.expectations.expectation_functions.resource_validations import ( + check_for_duplicate_references, + validate_references, +) @pytest.fixture @@ -43,7 +47,23 @@ def sqlite3_with_entity_tables_path(tmp_path): return dataset_path -def test_run_checkpoint_success(tmp_path, sqlite3_with_entity_tables_path): +@pytest.fixture +def csv_path(tmp_path): + data = [ + {"reference": "REF-001", "name": "Test 1"}, + {"reference": "REF-002", "name": "Test 2"}, + {"reference": "REF-001", "name": "Test 3"}, # Duplicate + {"reference": "", "name": "Test 4"}, # Invalid format + ] + csv_file = tmp_path / "test_data.csv" + with csv_file.open(mode="w", newline="") as f: + writer = DictWriter(f, fieldnames=["reference", "name"]) + writer.writeheader() + writer.writerows(data) + return csv_file + + +def test_dataset_checkpoint_success(tmp_path, sqlite3_with_entity_tables_path): # load data test_entity_data = pd.DataFrame.from_dict({"entity": [1], "name": ["test1"]}) test_old_entity_data = pd.DataFrame.from_dict({"old_entity": [100], "entity": [10]}) @@ -74,7 +94,7 @@ def test_run_checkpoint_success(tmp_path, sqlite3_with_entity_tables_path): assert len(issues) == 0 -def test_run_checkpoint_failure(tmp_path, sqlite3_with_entity_tables_path): +def test_dataset_checkpoint_failure(tmp_path, sqlite3_with_entity_tables_path): # load data test_entity_data = pd.DataFrame.from_dict( { @@ -132,3 +152,27 @@ def test_run_checkpoint_failure(tmp_path, sqlite3_with_entity_tables_path): assert issues[0]["rows"] == "" assert issues[0]["row"] != "" # Just check it's there assert issues[0]["value"] == "" + + +def test_check_for_duplicate_references(csv_path): + _, _, issues = check_for_duplicate_references(csv_path) + + assert issues, "The function should successfully identify issues." + assert len(issues) == 1, "There should be one issue identified." + assert ( + issues[0]["scope"] == "row-group" + ), "The issue should be identified as a duplicate reference." + assert ( + "REF-001" in issues[0]["message"] + ), "REF-001 should be identified as a duplicate." + + +def test_validate_references(csv_path): + _, _, issues = validate_references(csv_path) + + assert issues, "The function should fail due to invalid references." + assert len(issues) == 1, "There should be one issue identified." + assert ( + issues[0]["scope"] == "value" + ), "The issue should be identified as an invalid reference." + assert "" in issues[0]["message"], " 4th value should be identified as invalid."