Skip to content

Commit

Permalink
Merge 343fe9d into aa37211
Browse files Browse the repository at this point in the history
  • Loading branch information
erosselli committed Sep 12, 2024
2 parents aa37211 + 343fe9d commit 3a60219
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ The types of changes are:
- Validate no path in `server_host` var for CLI config; if there is one then take only up until the first forward slash
- Update the Datamap report's Data categories column to support better expand/collapse behavior [#5265](https://github.com/ethyca/fides/pull/5265)
- Rename/refactor Privacy Notice Properties to support performance improvements [#5259](https://github.com/ethyca/fides/pull/5259)
- Improved logging and error visibility for TraversalErrors [#5263](https://github.com/ethyca/fides/pull/5263)

### Developer Experience
- Added performance mark timings to debug logs for fides.js [#5245](https://github.com/ethyca/fides/pull/5245)
Expand Down
10 changes: 9 additions & 1 deletion src/fides/api/common_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,15 @@ def __init__(self, message: str, errors: List[str] = []):


class TraversalError(FidesopsException):
"""Fidesops error with the names of all nodes that could not be reached."""
"""Error with the names of all nodes that could not be reached."""


class UnreachableNodesError(TraversalError):
"""Error with the names of all nodes that could not be reached, inherits from TraversalError."""


class UnreachableEdgesError(TraversalError):
"""Error with the names of all edges that could not be reached, inherits from TraversalError."""


class ValidationError(FidesopsException):
Expand Down
77 changes: 70 additions & 7 deletions src/fides/api/graph/traversal.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
import pydash.collections
from fideslang.validation import FidesKey
from loguru import logger
from sqlalchemy.orm import Session

from fides.api.common_exceptions import TraversalError
from fides.api.common_exceptions import (
TraversalError,
UnreachableEdgesError,
UnreachableNodesError,
)
from fides.api.graph.config import (
ROOT_COLLECTION_ADDRESS,
TERMINATOR_ADDRESS,
Expand All @@ -19,7 +24,12 @@
)
from fides.api.graph.execution import ExecutionNode
from fides.api.graph.graph import DatasetGraph, Edge, Node
from fides.api.models.privacy_request import RequestTask, TraversalDetails
from fides.api.models.privacy_request import (
PrivacyRequest,
RequestTask,
TraversalDetails,
)
from fides.api.schemas.policy import ActionType
from fides.api.util.collection_util import Row, append, partition
from fides.api.util.logger_context_utils import Contextualizable, LoggerContextKeys
from fides.api.util.matching_queue import MatchingQueue
Expand Down Expand Up @@ -377,7 +387,7 @@ def edge_ends_with_collection(_edge: Edge) -> bool:
)
raise TraversalError(
f"""Node could not be reached given the specified ordering:
[{','.join([str(tn.address) for tn in running_node_queue.data])}]"""
[{','.join([str(tn.address) for tn in running_node_queue.data])}]""",
)

# Remove nodes that have custom request fields, since we don't care if these are reachable or not.
Expand All @@ -396,8 +406,9 @@ def edge_ends_with_collection(_edge: Edge) -> bool:
"Some nodes were not reachable: {}",
",".join([str(x) for x in remaining_node_keys]),
)
raise TraversalError(
f"Some nodes were not reachable: {','.join([str(x) for x in remaining_node_keys])}"
raise UnreachableNodesError(
f"Some nodes were not reachable: {','.join([str(x) for x in remaining_node_keys])}",
[key.value for key in remaining_node_keys],
)

# error if there are edges that have not been visited
Expand All @@ -406,8 +417,9 @@ def edge_ends_with_collection(_edge: Edge) -> bool:
"Some edges were not reachable: {}",
",".join([str(x) for x in remaining_edges]),
)
raise TraversalError(
f"Some edges were not reachable: {','.join([str(x) for x in remaining_edges])}"
raise UnreachableEdgesError(
f"Some edges were not reachable: {','.join([str(x) for x in remaining_edges])}",
[f"{edge}" for edge in remaining_edges],
)

end_nodes = [
Expand All @@ -416,3 +428,54 @@ def edge_ends_with_collection(_edge: Edge) -> bool:
if environment:
logger.debug("Found {} end nodes: {}", len(end_nodes), end_nodes)
return end_nodes


def log_traversal_error_and_update_privacy_request(
privacy_request: PrivacyRequest, session: Session, err: TraversalError
) -> None:
"""
Logs the provided traversal error with the privacy request id, creates the corresponding
ExecutionLog instances, and marks the privacy request as errored.
If the error is a generic TraversalError, a generic error execution log is created.
If the error is an UnreachableNodesError or UnreachableEdgesError, an execution log is created
for each node / edge on the "errors" list of the exception.
"""
logger.error(
"TraversalError encountered for privacy request {}. Error: {}",
privacy_request.id,
err,
)

# For generic TraversalErrors, we log a generic error execution log
if not isinstance(err, UnreachableNodesError) and not isinstance(
err, UnreachableEdgesError
):
privacy_request.add_error_execution_log(
session,
connection_key=None,
dataset_name=None,
collection_name=None,
message=str(err),
action_type=ActionType.access,
)

# For specific ones, we iterate over each error in the list
for error in err.errors:
dataset, collection = (
error.split(":")
if isinstance(
err, UnreachableNodesError
) # For unreachable nodes, we can get the dataset and collection from the node
else (None, None) # But not for edges
)
message = f"{'Node' if isinstance(err, UnreachableNodesError) else 'Edge'} {error} is not reachable"
privacy_request.add_error_execution_log(
session,
connection_key=None,
dataset_name=dataset,
collection_name=collection,
message=message,
action_type=ActionType.access,
)
privacy_request.error_processing(session)
23 changes: 23 additions & 0 deletions src/fides/api/models/privacy_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,29 @@ def get_filtered_final_upload(self) -> Dict[str, Dict[str, List[Row]]]:
"""Fetched the same filtered access results we uploaded to the user"""
return self.filtered_final_upload or {}

def add_error_execution_log(
self,
db: Session,
connection_key: Optional[str],
dataset_name: Optional[str],
collection_name: Optional[str],
message: str,
action_type: ActionType,
) -> ExecutionLog:
execution_log = ExecutionLog.create(
db=db,
data={
"privacy_request_id": self.id,
"connection_key": connection_key,
"dataset_name": dataset_name,
"collection_name": collection_name,
"status": ExecutionLogStatus.error,
"message": message,
"action_type": action_type,
},
)
return execution_log


class PrivacyRequestError(Base):
"""The DB ORM model to track PrivacyRequests error message status."""
Expand Down
77 changes: 44 additions & 33 deletions src/fides/api/task/create_request_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@
FieldAddress,
)
from fides.api.graph.graph import DatasetGraph
from fides.api.graph.traversal import ARTIFICIAL_NODES, Traversal, TraversalNode
from fides.api.graph.traversal import (
ARTIFICIAL_NODES,
Traversal,
TraversalNode,
log_traversal_error_and_update_privacy_request,
)
from fides.api.models.connectionconfig import ConnectionConfig
from fides.api.models.policy import Policy
from fides.api.models.privacy_request import (
Expand Down Expand Up @@ -443,41 +448,47 @@ def run_access_request(
session, privacy_request, ActionType.access
)
else:
logger.info("Building access graph for {}", privacy_request.id)
traversal: Traversal = Traversal(graph, identity)

# Traversal.traverse populates traversal_nodes in place, adding parents and children to each traversal_node.
traversal_nodes: Dict[CollectionAddress, TraversalNode] = {}
end_nodes: List[CollectionAddress] = traversal.traverse(
traversal_nodes, collect_tasks_fn
)
# Save Access Request Tasks to the database
ready_tasks = persist_new_access_request_tasks(
session, privacy_request, traversal, traversal_nodes, end_nodes, graph
)

if (
policy.get_rules_for_action(action_type=ActionType.erasure)
and not privacy_request.erasure_tasks.count()
):
# If applicable, go ahead and save Erasure Request Tasks to the Database.
# These erasure tasks aren't ready to run until the access graph is completed
# in full, but this makes sure the nodes in the graphs match.
erasure_end_nodes: List[CollectionAddress] = list(graph.nodes.keys())
persist_initial_erasure_request_tasks(
session, privacy_request, traversal_nodes, erasure_end_nodes, graph
try:
logger.info("Building access graph for {}", privacy_request.id)
traversal: Traversal = Traversal(graph, identity)

# Traversal.traverse populates traversal_nodes in place, adding parents and children to each traversal_node.
traversal_nodes: Dict[CollectionAddress, TraversalNode] = {}
end_nodes: List[CollectionAddress] = traversal.traverse(
traversal_nodes, collect_tasks_fn
)
# Save Access Request Tasks to the database
ready_tasks = persist_new_access_request_tasks(
session, privacy_request, traversal, traversal_nodes, end_nodes, graph
)

# cache a map of collections -> data uses for the output package of access requests
privacy_request.cache_data_use_map(
format_data_use_map_for_caching(
{
coll_address: tn.node.dataset.connection_key
for (coll_address, tn) in traversal_nodes.items()
},
connection_configs,
if (
policy.get_rules_for_action(action_type=ActionType.erasure)
and not privacy_request.erasure_tasks.count()
):
# If applicable, go ahead and save Erasure Request Tasks to the Database.
# These erasure tasks aren't ready to run until the access graph is completed
# in full, but this makes sure the nodes in the graphs match.
erasure_end_nodes: List[CollectionAddress] = list(graph.nodes.keys())
persist_initial_erasure_request_tasks(
session, privacy_request, traversal_nodes, erasure_end_nodes, graph
)

# cache a map of collections -> data uses for the output package of access requests
privacy_request.cache_data_use_map(
format_data_use_map_for_caching(
{
coll_address: tn.node.dataset.connection_key
for (coll_address, tn) in traversal_nodes.items()
},
connection_configs,
)
)
)
except TraversalError as err:
log_traversal_error_and_update_privacy_request(
privacy_request, session, err
)
raise err

for task in ready_tasks:
log_task_queued(task, "main runner")
Expand Down
29 changes: 27 additions & 2 deletions src/fides/api/task/deprecated_graph_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dask import delayed # type: ignore[attr-defined]
from dask.core import getcycle
from dask.threaded import get
from loguru import logger
from sqlalchemy.orm import Session

from fides.api.common_exceptions import TraversalError
Expand All @@ -14,11 +15,16 @@
CollectionAddress,
)
from fides.api.graph.graph import DatasetGraph
from fides.api.graph.traversal import Traversal, TraversalNode
from fides.api.graph.traversal import (
Traversal,
TraversalNode,
log_traversal_error_and_update_privacy_request,
)
from fides.api.models.connectionconfig import ConnectionConfig
from fides.api.models.policy import Policy
from fides.api.models.privacy_request import PrivacyRequest
from fides.api.models.sql_models import System # type: ignore[attr-defined]
from fides.api.schemas.policy import ActionType
from fides.api.task.graph_task import EMPTY_REQUEST_TASK, GraphTask
from fides.api.task.task_resources import TaskResources
from fides.api.util.collection_util import Row
Expand Down Expand Up @@ -108,7 +114,12 @@ def run_access_request_deprecated(
session: Session,
) -> Dict[str, List[Row]]:
"""Deprecated: Run the access request sequentially in-memory using Dask"""
traversal: Traversal = Traversal(graph, identity)
try:
traversal: Traversal = Traversal(graph, identity)
except TraversalError as err:
log_traversal_error_and_update_privacy_request(privacy_request, session, err)
raise err

with TaskResources(
privacy_request, policy, connection_configs, EMPTY_REQUEST_TASK, session
) as resources:
Expand Down Expand Up @@ -234,6 +245,20 @@ def termination_fn(*dependent_values: int) -> Dict[str, int]:
# using an existing function from dask.core to detect cycles in the generated graph
collection_cycle = getcycle(dsk, None)
if collection_cycle:
logger.error(
"TraversalError encountered for privacy request {}. Error: The values for the `erase_after` fields caused a cycle in the following collections {}",
privacy_request.id,
collection_cycle,
)
privacy_request.add_error_execution_log(
db=session,
connection_key=None,
collection_name=None,
dataset_name=None,
message=f"The values for the `erase_after` fields caused a cycle in the following collections {collection_cycle}",
action_type=ActionType.erasure,
)
privacy_request.error_processing(session)
raise TraversalError(
f"The values for the `erase_after` fields caused a cycle in the following collections {collection_cycle}"
)
Expand Down
2 changes: 0 additions & 2 deletions src/fides/cli/commands/ungrouped.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def init(ctx: click.Context, fides_dir: str, opt_in: bool) -> None:
@click.command()
@click.pass_context
@with_analytics
@with_server_health_check
def status(ctx: click.Context) -> None:
"""
Check Fides server availability.
Expand Down Expand Up @@ -183,7 +182,6 @@ def webserver(ctx: click.Context, port: int = 8080) -> None:
@click.command()
@click.pass_context
@with_analytics
@with_server_health_check
def worker(ctx: click.Context) -> None:
"""
Start a Celery worker for the Fides webserver.
Expand Down
Loading

0 comments on commit 3a60219

Please sign in to comment.