Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Post conversion expectations #188

Open
wants to merge 60 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
977a8a8
Updated
JbannisterScottLogic Apr 5, 2024
dec636c
Added unit tests and integrated into convert
JbannisterScottLogic Apr 9, 2024
113dbef
Updated verification
JbannisterScottLogic Apr 10, 2024
77b6cbc
Adjust issue factory
JbannisterScottLogic Apr 10, 2024
0b1f12f
Issue adjustments
JbannisterScottLogic Apr 10, 2024
9f68310
Changed value
JbannisterScottLogic Apr 10, 2024
b71b479
Value changes
JbannisterScottLogic Apr 10, 2024
d8ef949
Adjust convert.py
JbannisterScottLogic Apr 10, 2024
9fde2ae
Test fixes
JbannisterScottLogic Apr 10, 2024
45b0e11
Chanegs to issues
JbannisterScottLogic Apr 15, 2024
24594c2
Change to reference
JbannisterScottLogic Apr 15, 2024
adddaa4
Separate functions and correct tests
JbannisterScottLogic Apr 15, 2024
dab0d77
Changes back to helpers
JbannisterScottLogic Apr 15, 2024
f162dcf
Fix
JbannisterScottLogic Apr 15, 2024
c1f9081
Core changes
JbannisterScottLogic Apr 16, 2024
c1c218c
Import change
JbannisterScottLogic Apr 16, 2024
a046e7d
Parameter changes
JbannisterScottLogic Apr 16, 2024
7769264
Changes to convert
JbannisterScottLogic Apr 16, 2024
cc64e30
Fix
JbannisterScottLogic Apr 16, 2024
e131164
Typology change
JbannisterScottLogic Apr 16, 2024
cc983ce
Add Process
JbannisterScottLogic Apr 16, 2024
d26369f
Add process parameter
JbannisterScottLogic Apr 16, 2024
fceb81a
Query runner adjustments
JbannisterScottLogic Apr 16, 2024
324b2c1
Fix converted resource
JbannisterScottLogic Apr 16, 2024
1c5d640
Change pathing
JbannisterScottLogic Apr 17, 2024
92a4ae4
Updated
JbannisterScottLogic Apr 5, 2024
f1e0d7a
Added unit tests and integrated into convert
JbannisterScottLogic Apr 9, 2024
d4c98c0
Updated verification
JbannisterScottLogic Apr 10, 2024
2666a59
Adjust issue factory
JbannisterScottLogic Apr 10, 2024
518196a
Issue adjustments
JbannisterScottLogic Apr 10, 2024
febdbac
Changed value
JbannisterScottLogic Apr 10, 2024
4efc16d
Value changes
JbannisterScottLogic Apr 10, 2024
91e5c18
Adjust convert.py
JbannisterScottLogic Apr 10, 2024
238607f
Test fixes
JbannisterScottLogic Apr 10, 2024
d3ecda3
Chanegs to issues
JbannisterScottLogic Apr 15, 2024
e22412f
Change to reference
JbannisterScottLogic Apr 15, 2024
4bc8119
Separate functions and correct tests
JbannisterScottLogic Apr 15, 2024
4b0a437
Changes back to helpers
JbannisterScottLogic Apr 15, 2024
568f456
Fix
JbannisterScottLogic Apr 15, 2024
2eb2134
Core changes
JbannisterScottLogic Apr 16, 2024
4338c8b
Import change
JbannisterScottLogic Apr 16, 2024
0851420
Parameter changes
JbannisterScottLogic Apr 16, 2024
0db28da
Changes to convert
JbannisterScottLogic Apr 16, 2024
e552ff5
Fix
JbannisterScottLogic Apr 16, 2024
965d1bc
Typology change
JbannisterScottLogic Apr 16, 2024
13df751
Add Process
JbannisterScottLogic Apr 16, 2024
eb3b67e
Add process parameter
JbannisterScottLogic Apr 16, 2024
d7fc4f7
Query runner adjustments
JbannisterScottLogic Apr 16, 2024
b5ebc71
Fix converted resource
JbannisterScottLogic Apr 16, 2024
7b60741
Change pathing
JbannisterScottLogic Apr 17, 2024
9ce9e0a
Merge branch 'post-conversion-expectations' of github.com:digital-lan…
cjohns-scottlogic Apr 18, 2024
954735a
Set field name of items in ValueIssue. Small fixes to PostConversionP…
cjohns-scottlogic Apr 18, 2024
18b9b34
Converted file to unix format (so they diff easier with main)
cjohns-scottlogic Apr 18, 2024
2cfb750
Renamed dataset checkpoint test names to make them a bit clearer.
cjohns-scottlogic Apr 18, 2024
f6c2ce0
WIP
cjohns-scottlogic Apr 22, 2024
7e1371f
Merge branch 'main' into post-conversion-expectations
cjohns-scottlogic Apr 22, 2024
979e6e4
Post-merge fixes.
cjohns-scottlogic Apr 22, 2024
8284c8a
Updated PostConversionPhase to output to issues instead.
cjohns-scottlogic Apr 23, 2024
2a19aae
Removed converted resource expectation.
cjohns-scottlogic Apr 23, 2024
77bbff5
WIP: Run the ckecks on the pipeline data.
cjohns-scottlogic Apr 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 0 additions & 20 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,26 +238,6 @@ def expectations_run_dataset_checkpoint(data_path, output_dir, specification_dir
run_dataset_checkpoint(data_path, output_dir, dataset, typology)


@cli.command(
"expectations-converted-resource-checkpoint",
short_help="runs data quality expectations against a converted resource",
)
@click.option(
"--data-path", help="path to the converted resource to use", required=True
)
@click.option("--output-dir", help="path/name to sqlite3 dataset", required=True)
@click.option("--specification-dir", help="checkpoint to run", required=True)
@click.option("--dataset", help="checkpoint to run", required=True)
def expectations_run_converted_resource_checkpoint(
data_path, output_dir, specification_dir, dataset
):
from digital_land.expectations.commands import run_converted_resource_checkpoint

spec = Specification(specification_dir)
typology = spec.get_dataset_typology(dataset)
run_converted_resource_checkpoint(data_path, output_dir, dataset, typology)


# edit to add collection_name in
@cli.command("add-endpoints-and-lookups")
@click.argument("csv-path", nargs=1, type=click.Path())
Expand Down
4 changes: 4 additions & 0 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from digital_land.phase.combine import FactCombinePhase
from digital_land.phase.concat import ConcatFieldPhase
from digital_land.phase.convert import ConvertPhase, execute
from digital_land.phase.post_conversion import PostConversionPhase
from digital_land.phase.default import DefaultPhase
from digital_land.phase.dump import DumpPhase
from digital_land.phase.factor import FactorPhase
Expand Down Expand Up @@ -194,6 +195,9 @@ def pipeline_run(
),
OrganisationPhase(organisation=organisation, issues=issue_log),
FieldPrunePhase(fields=specification.current_fieldnames(schema)),
PostConversionPhase( # Now badly named...
issues=issue_log,
),
EntityReferencePhase(
dataset=dataset,
prefix=specification.dataset_prefix(dataset),
Expand Down
19 changes: 8 additions & 11 deletions digital_land/expectations/checkpoints/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, checkpoint, data_path):
self.checkpoint = checkpoint
self.data_path = data_path
self.data_name = Path(data_path).stem
self.responses = []
self.results = []
self.issues = []
# each issue is going to have different fields, so define here what all of them are
# this will take some iterations to get right
Expand Down Expand Up @@ -112,28 +112,25 @@ def run(self):
self.failed_expectation_with_error_severity = 0

for expectation in self.expectations:
response = self.run_expectation(expectation)
self.responses.append(response)
self.issues.extend(response.issues)
self.failed_expectation_with_error_severity += response.act_on_failure()
result = self.run_expectation(expectation)
self.results.append(result)
self.issues.extend(result.issues)
self.failed_expectation_with_error_severity += result.act_on_failure()

if self.failed_expectation_with_error_severity > 0:
raise DataQualityException(
"One or more expectations with severity RaiseError failed, see results for more details"
)

def save_results(self, responses, file_path, format="csv"):

def save_results(self, results, file_path, format="csv"):
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as f:
if format == "csv":
dictwriter = DictWriter(f, fieldnames=self.result_fieldnames)
dictwriter.writeheader()
dictwriter.writerows(
[response.dict_for_export() for response in responses]
)
dictwriter.writerows([result.dict_for_export() for result in results])
elif format == "json":
json.dump([response.to_dict() for response in responses], f)
json.dump([result.to_dict() for result in results], f)
else:
raise ValueError(f"format must be csv or json and cannot be {format}")

Expand Down
13 changes: 0 additions & 13 deletions digital_land/expectations/checkpoints/converted_resource.py

This file was deleted.

2 changes: 1 addition & 1 deletion digital_land/expectations/checkpoints/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def save(self, output_dir, format="csv"):
)

self.save_results(
self.responses,
self.results,
responses_file_path,
format=format,
)
Expand Down
19 changes: 0 additions & 19 deletions digital_land/expectations/commands.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .checkpoints.dataset import DatasetCheckpoint
from .checkpoints.converted_resource import CovertedResourceCheckpoint


def run_dataset_checkpoint(
Expand All @@ -18,21 +17,3 @@ def run_dataset_checkpoint(
checkpoint.save(output_dir, format="csv")
if act_on_critical_error:
checkpoint.act_on_critical_error()


def run_converted_resource_checkpoint(
converted_resource_path,
output_dir,
dataset,
typology,
act_on_critical_error=False,
):
"""
Function to run the expectation checkpoint for a converted resource
"""
checkpoint = CovertedResourceCheckpoint(converted_resource_path, dataset, typology)
checkpoint.load()
checkpoint.run()
checkpoint.save(output_dir, format="csv")
if act_on_critical_error:
checkpoint.act_on_critical_error()
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import csv


def check_for_duplicate_references(csv_path, **kwargs):
duplicates = {}
issues = []
with csv_path.open(newline="") as csvfile:
reader = csv.DictReader(csvfile)
for row_number, row in enumerate(reader, start=1):
ref = row.get("reference")
if ref in duplicates:
duplicates[ref].append(row_number)
else:
duplicates[ref] = [row_number]

for ref, rows in duplicates.items():
if len(rows) > 1:
issues.append(
{
"scope": "row-group",
"message": f"Duplicate reference '{ref}' found on rows: {', '.join(map(str, rows))}",
"dataset": "dataset",
"table_name": "resource",
"rows": rows,
"row_id": str(rows[0]),
"organisation": "organisation",
}
)

return True, "Checked for duplicate references.", issues


def validate_references(csv_path, **kwargs):
issues = []
with csv_path.open(newline="") as csvfile:
reader = csv.DictReader(csvfile)
for row_number, row in enumerate(reader, start=1):
ref = row.get("reference")
if not ref: # This will be True for both None and empty strings
issues.append(
{
"scope": "value",
"message": f"Reference is missing on row {row_number}.",
"dataset": "dataset",
"table_name": "resource",
"field_name": "reference",
"row_id": str(row_number),
"value": "Missing",
"organisation": "organisation",
}
)

return len(issues) == 0, "Checked for unpopulated references.", issues
4 changes: 2 additions & 2 deletions digital_land/expectations/issue.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ class ValueIssue(Issue):
scope: str
dataset: str
table_name: str = field(metadata=config(field_name="table-name"))
field_name: str
row_id: str
field_name: str = field(metadata=config(field_name="field-name"))
row_id: str = field(metadata=config(field_name="row-id"))
value: str
organisation: str

Expand Down
3 changes: 3 additions & 0 deletions digital_land/phase/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ def _read_text_file(self, input_path, encoding):

return reader

def format_issue_message(self, issue):
return f"Checkpoint Issue: {issue['message']} at line {issue.get('line_number', 'N/A')} (Severity: {issue['severity']})"

def _find_zip_file(self, input_file, suffix=".gml"):
zip_ = zipfile.ZipFile(input_file)
files = zip_.namelist()
Expand Down
49 changes: 49 additions & 0 deletions digital_land/phase/post_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from digital_land.phase.phase import Phase


class PostConversionPhase(Phase):
def __init__(
self,
issues,
):
self.issues = issues
self.duplicates = {}

def process(self, stream):
for block in stream:
row = block.get("row", None)
if not row:
return

reference = row.get("reference", None)
line_number = block.get("line-number", None)

if reference and line_number:
self.validate_references(reference, line_number)
self.check_for_duplicate_references(reference, line_number)
yield block

for ref, lines in self.duplicates.items():
if len(lines) > 1:
self.issues.log_issue(
"reference",
"duplicate-reference",
ref,
f"Duplicate reference '{ref}' found on lines: {', '.join(map(str, lines))}",
)

def validate_references(self, reference, line_number):
if not reference: # This will be True for both None and empty strings
self.issues.log_issue(
"reference",
"missing-reference",
"",
"",
line_number,
)

def check_for_duplicate_references(self, reference, line_number):
if reference in self.duplicates:
self.duplicates[reference].append(line_number)
else:
self.duplicates[reference] = [line_number]
50 changes: 47 additions & 3 deletions tests/integration/expectations/test_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
import os
import spatialite
import pandas as pd
from csv import DictReader
from csv import DictReader, DictWriter
from digital_land.expectations.checkpoints.dataset import DatasetCheckpoint
from digital_land.expectations.expectation_functions.resource_validations import (
check_for_duplicate_references,
validate_references,
)


@pytest.fixture
Expand Down Expand Up @@ -43,7 +47,23 @@ def sqlite3_with_entity_tables_path(tmp_path):
return dataset_path


def test_run_checkpoint_success(tmp_path, sqlite3_with_entity_tables_path):
@pytest.fixture
def csv_path(tmp_path):
data = [
{"reference": "REF-001", "name": "Test 1"},
{"reference": "REF-002", "name": "Test 2"},
{"reference": "REF-001", "name": "Test 3"}, # Duplicate
{"reference": "", "name": "Test 4"}, # Invalid format
]
csv_file = tmp_path / "test_data.csv"
with csv_file.open(mode="w", newline="") as f:
writer = DictWriter(f, fieldnames=["reference", "name"])
writer.writeheader()
writer.writerows(data)
return csv_file


def test_dataset_checkpoint_success(tmp_path, sqlite3_with_entity_tables_path):
# load data
test_entity_data = pd.DataFrame.from_dict({"entity": [1], "name": ["test1"]})
test_old_entity_data = pd.DataFrame.from_dict({"old_entity": [100], "entity": [10]})
Expand Down Expand Up @@ -74,7 +94,7 @@ def test_run_checkpoint_success(tmp_path, sqlite3_with_entity_tables_path):
assert len(issues) == 0


def test_run_checkpoint_failure(tmp_path, sqlite3_with_entity_tables_path):
def test_dataset_checkpoint_failure(tmp_path, sqlite3_with_entity_tables_path):
# load data
test_entity_data = pd.DataFrame.from_dict(
{
Expand Down Expand Up @@ -132,3 +152,27 @@ def test_run_checkpoint_failure(tmp_path, sqlite3_with_entity_tables_path):
assert issues[0]["rows"] == ""
assert issues[0]["row"] != "" # Just check it's there
assert issues[0]["value"] == ""


def test_check_for_duplicate_references(csv_path):
_, _, issues = check_for_duplicate_references(csv_path)

assert issues, "The function should successfully identify issues."
assert len(issues) == 1, "There should be one issue identified."
assert (
issues[0]["scope"] == "row-group"
), "The issue should be identified as a duplicate reference."
assert (
"REF-001" in issues[0]["message"]
), "REF-001 should be identified as a duplicate."


def test_validate_references(csv_path):
_, _, issues = validate_references(csv_path)

assert issues, "The function should fail due to invalid references."
assert len(issues) == 1, "There should be one issue identified."
assert (
issues[0]["scope"] == "value"
), "The issue should be identified as an invalid reference."
assert "" in issues[0]["message"], " 4th value should be identified as invalid."
Loading