Skip to content

Commit

Permalink
feat(platform): adds side-effect report for rollbacks (datahub-projec…
Browse files Browse the repository at this point in the history
…t#4482)

Co-authored-by: Shirshanka Das <shirshanka@apache.org>
  • Loading branch information
2 people authored and maggiehays committed Aug 1, 2022
1 parent 2e91672 commit b658914
Show file tree
Hide file tree
Showing 12 changed files with 303 additions and 53 deletions.
13 changes: 8 additions & 5 deletions docs/how/delete-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,14 @@ datahub ingest rollback --run-id <run-id>

to rollback all aspects added with this run and all entities created by this run.

:::note
### Unsafe Entities and Rollback

Since datahub v0.8.29, the `rollback` endpoint will now perform a *soft delete* of the entities ingested by a given run `<run-id>`.
This was done to preserve potential changes that were made directly via DataHub's UI and not part of the ingestion run itself. Such that this information can be retrieved later on if a re-ingestion for the same deleted entity is done.
> **_NOTE:_** Preservation of unsafe entities has been added in datahub `0.8.32`. Read on to understand what it means and how it works.
If you wish to keep old behaviour (hard delete), please use the `--hard-delete` flag (short-hand: `-d`).
In some cases, entities that were initially ingested by a run might have had further modifications to their metadata (e.g. adding terms, tags, or documentation) through the UI or other means. During a roll back of the ingestion that initially created these entities (technically, if the key aspect for these entities are being rolled back), the ingestion process will analyse the metadata graph for aspects that will be left "dangling" and will:
1. Leave these aspects untouched in the database, and soft-delete the entity. A re-ingestion of these entities will result in this additional metadata becoming visible again in the UI, so you don't lose any of your work.
2. The datahub cli will save information about these unsafe entities as a CSV for operators to later review and decide on next steps (keep or remove).

:::
The rollback command will report how many entities have such aspects and save as a CSV the urns of these entities under a rollback reports directory, which defaults to `rollback_reports` under the current directory where the cli is run, and can be configured further using the `--reports-dir` command line arg.

The operator can use `datahub get --urn <>` to inspect the aspects that were left behind and either keep them (do nothing) or delete the entity (and its aspects) completely using `datahub delete --urn <urn> --hard`. If the operator wishes to remove all the metadata associated with these unsafe entities, they can re-issue the rollback command with the `--nuke` flag.
27 changes: 21 additions & 6 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,24 +234,29 @@ def parse_run_restli_response(response: requests.Response) -> dict:
def post_rollback_endpoint(
payload_obj: dict,
path: str,
) -> typing.Tuple[typing.List[typing.List[str]], int, int]:
) -> typing.Tuple[typing.List[typing.List[str]], int, int, int, int, typing.List[dict]]:
session, gms_host = get_session_and_host()
url = gms_host + path

payload = json.dumps(payload_obj)

response = session.post(url, payload)

summary = parse_run_restli_response(response)
rows = summary.get("aspectRowSummaries", [])
entities_affected = summary.get("entitiesAffected", 0)
aspects_reverted = summary.get("aspectsReverted", 0)
aspects_affected = summary.get("aspectsAffected", 0)
unsafe_entity_count = summary.get("unsafeEntitiesCount", 0)
unsafe_entities = summary.get("unsafeEntities", [])
rolled_back_aspects = list(
filter(lambda row: row["runId"] == payload_obj["runId"], rows)
)

if len(rows) == 0:
click.secho(f"No entities found. Payload used: {payload}", fg="yellow")

local_timezone = datetime.now().astimezone().tzinfo
structured_rows = [
structured_rolled_back_results = [
[
row.get("urn"),
row.get("aspectName"),
Expand All @@ -260,10 +265,17 @@ def post_rollback_endpoint(
)
+ f" ({local_timezone})",
]
for row in rows
for row in rolled_back_aspects
]

return structured_rows, entities_affected, aspects_affected
return (
structured_rolled_back_results,
entities_affected,
aspects_reverted,
aspects_affected,
unsafe_entity_count,
unsafe_entities,
)


def post_delete_endpoint(
Expand Down Expand Up @@ -707,4 +719,7 @@ def get_aspects_for_entity(
except Exception as e:
log.error(f"Error on {json.dumps(aspect_dict)}", e)

return {k: v for (k, v) in aspect_map.items() if k in aspects}
if aspects:
return {k: v for (k, v) in aspect_map.items() if k in aspects}
else:
return {k: v for (k, v) in aspect_map.items()}
3 changes: 3 additions & 0 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ def delete_for_registry(
structured_rows,
entities_affected,
aspects_affected,
unsafe_aspects,
unsafe_entity_count,
unsafe_entities,
) = cli_utils.post_rollback_endpoint(registry_delete, "/entities?action=deleteAll")
deletion_result.num_entities = entities_affected
deletion_result.num_records = aspects_affected
Expand Down
87 changes: 71 additions & 16 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import csv
import json
import logging
import os
import pathlib
import sys
from datetime import datetime
Expand Down Expand Up @@ -182,24 +184,29 @@ def show(run_id: str) -> None:
"""Describe a provided ingestion run to datahub"""

payload_obj = {"runId": run_id, "dryRun": True, "hardDelete": True}
structured_rows, entities_affected, aspects_affected = post_rollback_endpoint(
payload_obj, "/runs?action=rollback"
)

if aspects_affected >= ELASTIC_MAX_PAGE_SIZE:
(
structured_rows,
entities_affected,
aspects_modified,
aspects_affected,
unsafe_entity_count,
unsafe_entities,
) = post_rollback_endpoint(payload_obj, "/runs?action=rollback")

if aspects_modified >= ELASTIC_MAX_PAGE_SIZE:
click.echo(
f"this run created at least {entities_affected} new entities and updated at least {aspects_affected} aspects"
f"this run created at least {entities_affected} new entities and updated at least {aspects_modified} aspects"
)
else:
click.echo(
f"this run created {entities_affected} new entities and updated {aspects_affected} aspects"
f"this run created {entities_affected} new entities and updated {aspects_modified} aspects"
)
click.echo(
"rolling back will delete the entities created and revert the updated aspects"
)
click.echo()
click.echo(
f"showing first {len(structured_rows)} of {aspects_affected} aspects touched by this run"
f"showing first {len(structured_rows)} of {aspects_modified} aspects touched by this run"
)
click.echo(tabulate(structured_rows, RUN_TABLE_COLUMNS, tablefmt="grid"))

Expand All @@ -208,9 +215,18 @@ def show(run_id: str) -> None:
@click.option("--run-id", required=True, type=str)
@click.option("-f", "--force", required=False, is_flag=True)
@click.option("--dry-run", "-n", required=False, is_flag=True, default=False)
@click.option("--hard-delete", "-d", required=False, is_flag=True, default=False)
@click.option("--safe/--nuke", required=False, is_flag=True, default=True)
@click.option(
"--report-dir",
required=False,
type=str,
default="./rollback-reports",
help="Path to directory where rollback reports will be saved to",
)
@telemetry.with_telemetry
def rollback(run_id: str, force: bool, dry_run: bool, hard_delete: bool) -> None:
def rollback(
run_id: str, force: bool, dry_run: bool, safe: bool, report_dir: str
) -> None:
"""Rollback a provided ingestion run to datahub"""

cli_utils.test_connectivity_complain_exit("ingest")
Expand All @@ -221,18 +237,57 @@ def rollback(run_id: str, force: bool, dry_run: bool, hard_delete: bool) -> None
abort=True,
)

payload_obj = {"runId": run_id, "dryRun": dry_run, "hardDelete": hard_delete}
structured_rows, entities_affected, aspects_affected = post_rollback_endpoint(
payload_obj, "/runs?action=rollback"
)
payload_obj = {"runId": run_id, "dryRun": dry_run, "safe": safe}
(
structured_rows,
entities_affected,
aspects_reverted,
aspects_affected,
unsafe_entity_count,
unsafe_entities,
) = post_rollback_endpoint(payload_obj, "/runs?action=rollback")

click.echo(
"Rolling back deletes the entities created by a run and reverts the updated aspects"
)
click.echo(
f"This rollback {'will' if dry_run else ''} {'delete' if dry_run else 'deleted'} {entities_affected} entities and {'will roll' if dry_run else 'rolled'} back {aspects_affected} aspects"
f"This rollback {'will' if dry_run else ''} {'delete' if dry_run else 'deleted'} {entities_affected} entities and {'will roll' if dry_run else 'rolled'} back {aspects_reverted} aspects"
)

click.echo(
f"showing first {len(structured_rows)} of {aspects_affected} aspects {'that will be' if dry_run else ''} reverted by this run"
f"showing first {len(structured_rows)} of {aspects_reverted} aspects {'that will be ' if dry_run else ''}reverted by this run"
)
click.echo(tabulate(structured_rows, RUN_TABLE_COLUMNS, tablefmt="grid"))

if aspects_affected > 0:
if safe:
click.echo(
f"WARNING: This rollback {'will hide' if dry_run else 'has hidden'} {aspects_affected} aspects related to {unsafe_entity_count} entities being rolled back that are not part ingestion run id."
)
else:
click.echo(
f"WARNING: This rollback {'will delete' if dry_run else 'has deleted'} {aspects_affected} aspects related to {unsafe_entity_count} entities being rolled back that are not part ingestion run id."
)

if unsafe_entity_count > 0:
now = datetime.now()
current_time = now.strftime("%Y-%m-%d %H:%M:%S")

try:
folder_name = report_dir + "/" + current_time

ingestion_config_file_name = folder_name + "/config.json"
os.makedirs(os.path.dirname(ingestion_config_file_name), exist_ok=True)
with open(ingestion_config_file_name, "w") as file_handle:
json.dump({"run_id": run_id}, file_handle)

csv_file_name = folder_name + "/unsafe_entities.csv"
with open(csv_file_name, "w") as file_handle:
writer = csv.writer(file_handle)
writer.writerow(["urn"])
for row in unsafe_entities:
writer.writerow([row.get("urn")])

except IOError as e:
print(e)
sys.exit("Unable to write reports to " + report_dir)
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ public List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDelet
return findByParams(Collections.singletonMap(FIELD_RUNID, runId), includeSoftDeleted);
}

private List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted) {
@Override
public List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap(FIELD_URN, urn), includeSoftDeleted);
}

@Override
public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted) {
SearchResponse searchResponse = _esDAO.findByParams(systemMetaParams, includeSoftDeleted);
if (searchResponse != null) {
SearchHits hits = searchResponse.getHits();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.metadata.run.IngestionRunSummary;
import com.linkedin.mxe.SystemMetadata;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;


Expand All @@ -24,6 +25,10 @@ public interface SystemMetadataService {

List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted);

List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted);

List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted);

List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted);

List<IngestionRunSummary> listRuns(Integer pageOffset, Integer pageSize, boolean includeSoftDeleted);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@ record RollbackResponse {
entitiesAffected: long
aspectsAffected: long
entitiesDeleted: optional long
aspectsReverted: optional long
unsafeEntitiesCount: optional long
unsafeEntities: array[UnsafeEntityInfo]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace com.linkedin.metadata.run

/**
* This record serves as a holder of information for entities that were unsafe to fully delete as part of a rollback operation.
*/
record UnsafeEntityInfo {
/**
* Name of the entity this aspect information instance refers to.
*/
urn: string
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,16 @@
"type" : "boolean",
"optional" : true
}, {
"annotations" : {
"deprecated" : { }
},
"name" : "hardDelete",
"type" : "boolean",
"optional" : true
}, {
"name" : "safe",
"type" : "boolean",
"optional" : true
} ],
"returns" : "com.linkedin.metadata.run.RollbackResponse"
} ],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5178,8 +5178,31 @@
"name" : "entitiesDeleted",
"type" : "long",
"optional" : true
}, {
"name" : "aspectsReverted",
"type" : "long",
"optional" : true
}, {
"name" : "unsafeEntitiesCount",
"type" : "long",
"optional" : true
}, {
"name" : "unsafeEntities",
"type" : {
"type" : "array",
"items" : {
"type" : "record",
"name" : "UnsafeEntityInfo",
"doc" : " This record serves as a holder of information for entities that were unsafe to fully delete as part of a rollback operation.",
"fields" : [ {
"name" : "urn",
"type" : "string",
"doc" : "Name of the entity this aspect information instance refers to."
} ]
}
}
} ]
}, {
}, "com.linkedin.metadata.run.UnsafeEntityInfo", {
"type" : "record",
"name" : "AggregationMetadata",
"namespace" : "com.linkedin.metadata.search",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3201,8 +3201,31 @@
"name" : "entitiesDeleted",
"type" : "long",
"optional" : true
}, {
"name" : "aspectsReverted",
"type" : "long",
"optional" : true
}, {
"name" : "unsafeEntitiesCount",
"type" : "long",
"optional" : true
}, {
"name" : "unsafeEntities",
"type" : {
"type" : "array",
"items" : {
"type" : "record",
"name" : "UnsafeEntityInfo",
"doc" : " This record serves as a holder of information for entities that were unsafe to fully delete as part of a rollback operation.",
"fields" : [ {
"name" : "urn",
"type" : "string",
"doc" : "Name of the entity this aspect information instance refers to."
} ]
}
}
} ]
}, "com.linkedin.ml.metadata.BaseData", "com.linkedin.ml.metadata.CaveatDetails", "com.linkedin.ml.metadata.CaveatsAndRecommendations", "com.linkedin.ml.metadata.EthicalConsiderations", "com.linkedin.ml.metadata.EvaluationData", "com.linkedin.ml.metadata.HyperParameterValueType", "com.linkedin.ml.metadata.IntendedUse", "com.linkedin.ml.metadata.IntendedUserType", "com.linkedin.ml.metadata.MLFeatureProperties", "com.linkedin.ml.metadata.MLHyperParam", "com.linkedin.ml.metadata.MLMetric", "com.linkedin.ml.metadata.MLModelFactorPrompts", "com.linkedin.ml.metadata.MLModelFactors", "com.linkedin.ml.metadata.MLModelProperties", "com.linkedin.ml.metadata.Metrics", "com.linkedin.ml.metadata.QuantitativeAnalyses", "com.linkedin.ml.metadata.ResultsType", "com.linkedin.ml.metadata.SourceCode", "com.linkedin.ml.metadata.SourceCodeUrl", "com.linkedin.ml.metadata.SourceCodeUrlType", "com.linkedin.ml.metadata.TrainingData", "com.linkedin.schema.ArrayType", "com.linkedin.schema.BinaryJsonSchema", "com.linkedin.schema.BooleanType", "com.linkedin.schema.BytesType", "com.linkedin.schema.DatasetFieldForeignKey", "com.linkedin.schema.DateType", "com.linkedin.schema.EditableSchemaFieldInfo", "com.linkedin.schema.EditableSchemaMetadata", "com.linkedin.schema.EnumType", "com.linkedin.schema.EspressoSchema", "com.linkedin.schema.FixedType", "com.linkedin.schema.ForeignKeyConstraint", "com.linkedin.schema.ForeignKeySpec", "com.linkedin.schema.KafkaSchema", "com.linkedin.schema.KeyValueSchema", "com.linkedin.schema.MapType", "com.linkedin.schema.MySqlDDL", "com.linkedin.schema.NullType", "com.linkedin.schema.NumberType", "com.linkedin.schema.OracleDDL", "com.linkedin.schema.OrcSchema", "com.linkedin.schema.OtherSchema", "com.linkedin.schema.PrestoDDL", "com.linkedin.schema.RecordType", "com.linkedin.schema.SchemaField", "com.linkedin.schema.SchemaFieldDataType", "com.linkedin.schema.SchemaMetadata", "com.linkedin.schema.SchemaMetadataKey", "com.linkedin.schema.Schemaless", "com.linkedin.schema.StringType", "com.linkedin.schema.TimeType", "com.linkedin.schema.UnionType", "com.linkedin.schema.UrnForeignKey", "com.linkedin.tag.TagProperties" ],
}, "com.linkedin.metadata.run.UnsafeEntityInfo", "com.linkedin.ml.metadata.BaseData", "com.linkedin.ml.metadata.CaveatDetails", "com.linkedin.ml.metadata.CaveatsAndRecommendations", "com.linkedin.ml.metadata.EthicalConsiderations", "com.linkedin.ml.metadata.EvaluationData", "com.linkedin.ml.metadata.HyperParameterValueType", "com.linkedin.ml.metadata.IntendedUse", "com.linkedin.ml.metadata.IntendedUserType", "com.linkedin.ml.metadata.MLFeatureProperties", "com.linkedin.ml.metadata.MLHyperParam", "com.linkedin.ml.metadata.MLMetric", "com.linkedin.ml.metadata.MLModelFactorPrompts", "com.linkedin.ml.metadata.MLModelFactors", "com.linkedin.ml.metadata.MLModelProperties", "com.linkedin.ml.metadata.Metrics", "com.linkedin.ml.metadata.QuantitativeAnalyses", "com.linkedin.ml.metadata.ResultsType", "com.linkedin.ml.metadata.SourceCode", "com.linkedin.ml.metadata.SourceCodeUrl", "com.linkedin.ml.metadata.SourceCodeUrlType", "com.linkedin.ml.metadata.TrainingData", "com.linkedin.schema.ArrayType", "com.linkedin.schema.BinaryJsonSchema", "com.linkedin.schema.BooleanType", "com.linkedin.schema.BytesType", "com.linkedin.schema.DatasetFieldForeignKey", "com.linkedin.schema.DateType", "com.linkedin.schema.EditableSchemaFieldInfo", "com.linkedin.schema.EditableSchemaMetadata", "com.linkedin.schema.EnumType", "com.linkedin.schema.EspressoSchema", "com.linkedin.schema.FixedType", "com.linkedin.schema.ForeignKeyConstraint", "com.linkedin.schema.ForeignKeySpec", "com.linkedin.schema.KafkaSchema", "com.linkedin.schema.KeyValueSchema", "com.linkedin.schema.MapType", "com.linkedin.schema.MySqlDDL", "com.linkedin.schema.NullType", "com.linkedin.schema.NumberType", "com.linkedin.schema.OracleDDL", "com.linkedin.schema.OrcSchema", "com.linkedin.schema.OtherSchema", "com.linkedin.schema.PrestoDDL", "com.linkedin.schema.RecordType", "com.linkedin.schema.SchemaField", "com.linkedin.schema.SchemaFieldDataType", "com.linkedin.schema.SchemaMetadata", "com.linkedin.schema.SchemaMetadataKey", "com.linkedin.schema.Schemaless", "com.linkedin.schema.StringType", "com.linkedin.schema.TimeType", "com.linkedin.schema.UnionType", "com.linkedin.schema.UrnForeignKey", "com.linkedin.tag.TagProperties" ],
"schema" : {
"name" : "runs",
"namespace" : "com.linkedin.entity",
Expand Down Expand Up @@ -3243,9 +3266,16 @@
"type" : "boolean",
"optional" : true
}, {
"annotations" : {
"deprecated" : { }
},
"name" : "hardDelete",
"type" : "boolean",
"optional" : true
}, {
"name" : "safe",
"type" : "boolean",
"optional" : true
} ],
"returns" : "com.linkedin.metadata.run.RollbackResponse"
} ],
Expand Down
Loading

0 comments on commit b658914

Please sign in to comment.