diff --git a/docs/how/delete-metadata.md b/docs/how/delete-metadata.md index 5a414c92959f88..d0498e5552c8d6 100644 --- a/docs/how/delete-metadata.md +++ b/docs/how/delete-metadata.md @@ -101,11 +101,14 @@ datahub ingest rollback --run-id to rollback all aspects added with this run and all entities created by this run. -:::note +### Unsafe Entities and Rollback -Since datahub v0.8.29, the `rollback` endpoint will now perform a *soft delete* of the entities ingested by a given run ``. -This was done to preserve potential changes that were made directly via DataHub's UI and not part of the ingestion run itself. Such that this information can be retrieved later on if a re-ingestion for the same deleted entity is done. +> **_NOTE:_** Preservation of unsafe entities has been added in datahub `0.8.32`. Read on to understand what it means and how it works. -If you wish to keep old behaviour (hard delete), please use the `--hard-delete` flag (short-hand: `-d`). +In some cases, entities that were initially ingested by a run might have had further modifications to their metadata (e.g. adding terms, tags, or documentation) through the UI or other means. During a roll back of the ingestion that initially created these entities (technically, if the key aspect for these entities are being rolled back), the ingestion process will analyse the metadata graph for aspects that will be left "dangling" and will: +1. Leave these aspects untouched in the database, and soft-delete the entity. A re-ingestion of these entities will result in this additional metadata becoming visible again in the UI, so you don't lose any of your work. +2. The datahub cli will save information about these unsafe entities as a CSV for operators to later review and decide on next steps (keep or remove). -::: +The rollback command will report how many entities have such aspects and save as a CSV the urns of these entities under a rollback reports directory, which defaults to `rollback_reports` under the current directory where the cli is run, and can be configured further using the `--reports-dir` command line arg. + +The operator can use `datahub get --urn <>` to inspect the aspects that were left behind and either keep them (do nothing) or delete the entity (and its aspects) completely using `datahub delete --urn --hard`. If the operator wishes to remove all the metadata associated with these unsafe entities, they can re-issue the rollback command with the `--nuke` flag. diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index 9ef44f85d843b2..d76b5f85663d94 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -234,24 +234,29 @@ def parse_run_restli_response(response: requests.Response) -> dict: def post_rollback_endpoint( payload_obj: dict, path: str, -) -> typing.Tuple[typing.List[typing.List[str]], int, int]: +) -> typing.Tuple[typing.List[typing.List[str]], int, int, int, int, typing.List[dict]]: session, gms_host = get_session_and_host() url = gms_host + path payload = json.dumps(payload_obj) - response = session.post(url, payload) summary = parse_run_restli_response(response) rows = summary.get("aspectRowSummaries", []) entities_affected = summary.get("entitiesAffected", 0) + aspects_reverted = summary.get("aspectsReverted", 0) aspects_affected = summary.get("aspectsAffected", 0) + unsafe_entity_count = summary.get("unsafeEntitiesCount", 0) + unsafe_entities = summary.get("unsafeEntities", []) + rolled_back_aspects = list( + filter(lambda row: row["runId"] == payload_obj["runId"], rows) + ) if len(rows) == 0: click.secho(f"No entities found. Payload used: {payload}", fg="yellow") local_timezone = datetime.now().astimezone().tzinfo - structured_rows = [ + structured_rolled_back_results = [ [ row.get("urn"), row.get("aspectName"), @@ -260,10 +265,17 @@ def post_rollback_endpoint( ) + f" ({local_timezone})", ] - for row in rows + for row in rolled_back_aspects ] - return structured_rows, entities_affected, aspects_affected + return ( + structured_rolled_back_results, + entities_affected, + aspects_reverted, + aspects_affected, + unsafe_entity_count, + unsafe_entities, + ) def post_delete_endpoint( @@ -707,4 +719,7 @@ def get_aspects_for_entity( except Exception as e: log.error(f"Error on {json.dumps(aspect_dict)}", e) - return {k: v for (k, v) in aspect_map.items() if k in aspects} + if aspects: + return {k: v for (k, v) in aspect_map.items() if k in aspects} + else: + return {k: v for (k, v) in aspect_map.items()} diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index 33dc18c523eca4..19924167e8ebf0 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -72,6 +72,9 @@ def delete_for_registry( structured_rows, entities_affected, aspects_affected, + unsafe_aspects, + unsafe_entity_count, + unsafe_entities, ) = cli_utils.post_rollback_endpoint(registry_delete, "/entities?action=deleteAll") deletion_result.num_entities = entities_affected deletion_result.num_records = aspects_affected diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 7091c5fa3931dd..830b65007d3206 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -1,5 +1,7 @@ +import csv import json import logging +import os import pathlib import sys from datetime import datetime @@ -182,24 +184,29 @@ def show(run_id: str) -> None: """Describe a provided ingestion run to datahub""" payload_obj = {"runId": run_id, "dryRun": True, "hardDelete": True} - structured_rows, entities_affected, aspects_affected = post_rollback_endpoint( - payload_obj, "/runs?action=rollback" - ) - - if aspects_affected >= ELASTIC_MAX_PAGE_SIZE: + ( + structured_rows, + entities_affected, + aspects_modified, + aspects_affected, + unsafe_entity_count, + unsafe_entities, + ) = post_rollback_endpoint(payload_obj, "/runs?action=rollback") + + if aspects_modified >= ELASTIC_MAX_PAGE_SIZE: click.echo( - f"this run created at least {entities_affected} new entities and updated at least {aspects_affected} aspects" + f"this run created at least {entities_affected} new entities and updated at least {aspects_modified} aspects" ) else: click.echo( - f"this run created {entities_affected} new entities and updated {aspects_affected} aspects" + f"this run created {entities_affected} new entities and updated {aspects_modified} aspects" ) click.echo( "rolling back will delete the entities created and revert the updated aspects" ) click.echo() click.echo( - f"showing first {len(structured_rows)} of {aspects_affected} aspects touched by this run" + f"showing first {len(structured_rows)} of {aspects_modified} aspects touched by this run" ) click.echo(tabulate(structured_rows, RUN_TABLE_COLUMNS, tablefmt="grid")) @@ -208,9 +215,18 @@ def show(run_id: str) -> None: @click.option("--run-id", required=True, type=str) @click.option("-f", "--force", required=False, is_flag=True) @click.option("--dry-run", "-n", required=False, is_flag=True, default=False) -@click.option("--hard-delete", "-d", required=False, is_flag=True, default=False) +@click.option("--safe/--nuke", required=False, is_flag=True, default=True) +@click.option( + "--report-dir", + required=False, + type=str, + default="./rollback-reports", + help="Path to directory where rollback reports will be saved to", +) @telemetry.with_telemetry -def rollback(run_id: str, force: bool, dry_run: bool, hard_delete: bool) -> None: +def rollback( + run_id: str, force: bool, dry_run: bool, safe: bool, report_dir: str +) -> None: """Rollback a provided ingestion run to datahub""" cli_utils.test_connectivity_complain_exit("ingest") @@ -221,18 +237,57 @@ def rollback(run_id: str, force: bool, dry_run: bool, hard_delete: bool) -> None abort=True, ) - payload_obj = {"runId": run_id, "dryRun": dry_run, "hardDelete": hard_delete} - structured_rows, entities_affected, aspects_affected = post_rollback_endpoint( - payload_obj, "/runs?action=rollback" - ) + payload_obj = {"runId": run_id, "dryRun": dry_run, "safe": safe} + ( + structured_rows, + entities_affected, + aspects_reverted, + aspects_affected, + unsafe_entity_count, + unsafe_entities, + ) = post_rollback_endpoint(payload_obj, "/runs?action=rollback") click.echo( "Rolling back deletes the entities created by a run and reverts the updated aspects" ) click.echo( - f"This rollback {'will' if dry_run else ''} {'delete' if dry_run else 'deleted'} {entities_affected} entities and {'will roll' if dry_run else 'rolled'} back {aspects_affected} aspects" + f"This rollback {'will' if dry_run else ''} {'delete' if dry_run else 'deleted'} {entities_affected} entities and {'will roll' if dry_run else 'rolled'} back {aspects_reverted} aspects" ) + click.echo( - f"showing first {len(structured_rows)} of {aspects_affected} aspects {'that will be' if dry_run else ''} reverted by this run" + f"showing first {len(structured_rows)} of {aspects_reverted} aspects {'that will be ' if dry_run else ''}reverted by this run" ) click.echo(tabulate(structured_rows, RUN_TABLE_COLUMNS, tablefmt="grid")) + + if aspects_affected > 0: + if safe: + click.echo( + f"WARNING: This rollback {'will hide' if dry_run else 'has hidden'} {aspects_affected} aspects related to {unsafe_entity_count} entities being rolled back that are not part ingestion run id." + ) + else: + click.echo( + f"WARNING: This rollback {'will delete' if dry_run else 'has deleted'} {aspects_affected} aspects related to {unsafe_entity_count} entities being rolled back that are not part ingestion run id." + ) + + if unsafe_entity_count > 0: + now = datetime.now() + current_time = now.strftime("%Y-%m-%d %H:%M:%S") + + try: + folder_name = report_dir + "/" + current_time + + ingestion_config_file_name = folder_name + "/config.json" + os.makedirs(os.path.dirname(ingestion_config_file_name), exist_ok=True) + with open(ingestion_config_file_name, "w") as file_handle: + json.dump({"run_id": run_id}, file_handle) + + csv_file_name = folder_name + "/unsafe_entities.csv" + with open(csv_file_name, "w") as file_handle: + writer = csv.writer(file_handle) + writer.writerow(["urn"]) + for row in unsafe_entities: + writer.writerow([row.get("urn")]) + + except IOError as e: + print(e) + sys.exit("Unable to write reports to " + report_dir) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java index a1fe2efa7f0c5f..6493fb6a50d7bd 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/ElasticSearchSystemMetadataService.java @@ -127,7 +127,13 @@ public List findByRunId(String runId, boolean includeSoftDelet return findByParams(Collections.singletonMap(FIELD_RUNID, runId), includeSoftDeleted); } - private List findByParams(Map systemMetaParams, boolean includeSoftDeleted) { + @Override + public List findByUrn(String urn, boolean includeSoftDeleted) { + return findByParams(Collections.singletonMap(FIELD_URN, urn), includeSoftDeleted); + } + + @Override + public List findByParams(Map systemMetaParams, boolean includeSoftDeleted) { SearchResponse searchResponse = _esDAO.findByParams(systemMetaParams, includeSoftDeleted); if (searchResponse != null) { SearchHits hits = searchResponse.getHits(); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/SystemMetadataService.java b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/SystemMetadataService.java index 065de46876146b..8ca9b771e3be27 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/SystemMetadataService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/systemmetadata/SystemMetadataService.java @@ -4,6 +4,7 @@ import com.linkedin.metadata.run.IngestionRunSummary; import com.linkedin.mxe.SystemMetadata; import java.util.List; +import java.util.Map; import javax.annotation.Nullable; @@ -24,6 +25,10 @@ public interface SystemMetadataService { List findByRunId(String runId, boolean includeSoftDeleted); + List findByUrn(String urn, boolean includeSoftDeleted); + + List findByParams(Map systemMetaParams, boolean includeSoftDeleted); + List findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted); List listRuns(Integer pageOffset, Integer pageSize, boolean includeSoftDeleted); diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/run/RollbackResponse.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/run/RollbackResponse.pdl index 2d261b9e6ccfb5..edf1cbe4717160 100644 --- a/metadata-models/src/main/pegasus/com/linkedin/metadata/run/RollbackResponse.pdl +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/run/RollbackResponse.pdl @@ -5,4 +5,7 @@ record RollbackResponse { entitiesAffected: long aspectsAffected: long entitiesDeleted: optional long + aspectsReverted: optional long + unsafeEntitiesCount: optional long + unsafeEntities: array[UnsafeEntityInfo] } diff --git a/metadata-models/src/main/pegasus/com/linkedin/metadata/run/UnsafeEntityInfo.pdl b/metadata-models/src/main/pegasus/com/linkedin/metadata/run/UnsafeEntityInfo.pdl new file mode 100644 index 00000000000000..75e7f4363af74c --- /dev/null +++ b/metadata-models/src/main/pegasus/com/linkedin/metadata/run/UnsafeEntityInfo.pdl @@ -0,0 +1,11 @@ +namespace com.linkedin.metadata.run + +/** + * This record serves as a holder of information for entities that were unsafe to fully delete as part of a rollback operation. + */ +record UnsafeEntityInfo { + /** + * Name of the entity this aspect information instance refers to. + */ + urn: string +} diff --git a/metadata-service/restli-api/src/main/idl/com.linkedin.entity.runs.restspec.json b/metadata-service/restli-api/src/main/idl/com.linkedin.entity.runs.restspec.json index 501b1cbab8f1af..e200ed35a8868d 100644 --- a/metadata-service/restli-api/src/main/idl/com.linkedin.entity.runs.restspec.json +++ b/metadata-service/restli-api/src/main/idl/com.linkedin.entity.runs.restspec.json @@ -38,9 +38,16 @@ "type" : "boolean", "optional" : true }, { + "annotations" : { + "deprecated" : { } + }, "name" : "hardDelete", "type" : "boolean", "optional" : true + }, { + "name" : "safe", + "type" : "boolean", + "optional" : true } ], "returns" : "com.linkedin.metadata.run.RollbackResponse" } ], diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index 0bae968c8fde92..5b66b369325ddc 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -5178,8 +5178,31 @@ "name" : "entitiesDeleted", "type" : "long", "optional" : true + }, { + "name" : "aspectsReverted", + "type" : "long", + "optional" : true + }, { + "name" : "unsafeEntitiesCount", + "type" : "long", + "optional" : true + }, { + "name" : "unsafeEntities", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "UnsafeEntityInfo", + "doc" : " This record serves as a holder of information for entities that were unsafe to fully delete as part of a rollback operation.", + "fields" : [ { + "name" : "urn", + "type" : "string", + "doc" : "Name of the entity this aspect information instance refers to." + } ] + } + } } ] - }, { + }, "com.linkedin.metadata.run.UnsafeEntityInfo", { "type" : "record", "name" : "AggregationMetadata", "namespace" : "com.linkedin.metadata.search", diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json index 0a0c428211b779..cac06791126693 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.runs.snapshot.json @@ -3201,8 +3201,31 @@ "name" : "entitiesDeleted", "type" : "long", "optional" : true + }, { + "name" : "aspectsReverted", + "type" : "long", + "optional" : true + }, { + "name" : "unsafeEntitiesCount", + "type" : "long", + "optional" : true + }, { + "name" : "unsafeEntities", + "type" : { + "type" : "array", + "items" : { + "type" : "record", + "name" : "UnsafeEntityInfo", + "doc" : " This record serves as a holder of information for entities that were unsafe to fully delete as part of a rollback operation.", + "fields" : [ { + "name" : "urn", + "type" : "string", + "doc" : "Name of the entity this aspect information instance refers to." + } ] + } + } } ] - }, "com.linkedin.ml.metadata.BaseData", "com.linkedin.ml.metadata.CaveatDetails", "com.linkedin.ml.metadata.CaveatsAndRecommendations", "com.linkedin.ml.metadata.EthicalConsiderations", "com.linkedin.ml.metadata.EvaluationData", "com.linkedin.ml.metadata.HyperParameterValueType", "com.linkedin.ml.metadata.IntendedUse", "com.linkedin.ml.metadata.IntendedUserType", "com.linkedin.ml.metadata.MLFeatureProperties", "com.linkedin.ml.metadata.MLHyperParam", "com.linkedin.ml.metadata.MLMetric", "com.linkedin.ml.metadata.MLModelFactorPrompts", "com.linkedin.ml.metadata.MLModelFactors", "com.linkedin.ml.metadata.MLModelProperties", "com.linkedin.ml.metadata.Metrics", "com.linkedin.ml.metadata.QuantitativeAnalyses", "com.linkedin.ml.metadata.ResultsType", "com.linkedin.ml.metadata.SourceCode", "com.linkedin.ml.metadata.SourceCodeUrl", "com.linkedin.ml.metadata.SourceCodeUrlType", "com.linkedin.ml.metadata.TrainingData", "com.linkedin.schema.ArrayType", "com.linkedin.schema.BinaryJsonSchema", "com.linkedin.schema.BooleanType", "com.linkedin.schema.BytesType", "com.linkedin.schema.DatasetFieldForeignKey", "com.linkedin.schema.DateType", "com.linkedin.schema.EditableSchemaFieldInfo", "com.linkedin.schema.EditableSchemaMetadata", "com.linkedin.schema.EnumType", "com.linkedin.schema.EspressoSchema", "com.linkedin.schema.FixedType", "com.linkedin.schema.ForeignKeyConstraint", "com.linkedin.schema.ForeignKeySpec", "com.linkedin.schema.KafkaSchema", "com.linkedin.schema.KeyValueSchema", "com.linkedin.schema.MapType", "com.linkedin.schema.MySqlDDL", "com.linkedin.schema.NullType", "com.linkedin.schema.NumberType", "com.linkedin.schema.OracleDDL", "com.linkedin.schema.OrcSchema", "com.linkedin.schema.OtherSchema", "com.linkedin.schema.PrestoDDL", "com.linkedin.schema.RecordType", "com.linkedin.schema.SchemaField", "com.linkedin.schema.SchemaFieldDataType", "com.linkedin.schema.SchemaMetadata", "com.linkedin.schema.SchemaMetadataKey", "com.linkedin.schema.Schemaless", "com.linkedin.schema.StringType", "com.linkedin.schema.TimeType", "com.linkedin.schema.UnionType", "com.linkedin.schema.UrnForeignKey", "com.linkedin.tag.TagProperties" ], + }, "com.linkedin.metadata.run.UnsafeEntityInfo", "com.linkedin.ml.metadata.BaseData", "com.linkedin.ml.metadata.CaveatDetails", "com.linkedin.ml.metadata.CaveatsAndRecommendations", "com.linkedin.ml.metadata.EthicalConsiderations", "com.linkedin.ml.metadata.EvaluationData", "com.linkedin.ml.metadata.HyperParameterValueType", "com.linkedin.ml.metadata.IntendedUse", "com.linkedin.ml.metadata.IntendedUserType", "com.linkedin.ml.metadata.MLFeatureProperties", "com.linkedin.ml.metadata.MLHyperParam", "com.linkedin.ml.metadata.MLMetric", "com.linkedin.ml.metadata.MLModelFactorPrompts", "com.linkedin.ml.metadata.MLModelFactors", "com.linkedin.ml.metadata.MLModelProperties", "com.linkedin.ml.metadata.Metrics", "com.linkedin.ml.metadata.QuantitativeAnalyses", "com.linkedin.ml.metadata.ResultsType", "com.linkedin.ml.metadata.SourceCode", "com.linkedin.ml.metadata.SourceCodeUrl", "com.linkedin.ml.metadata.SourceCodeUrlType", "com.linkedin.ml.metadata.TrainingData", "com.linkedin.schema.ArrayType", "com.linkedin.schema.BinaryJsonSchema", "com.linkedin.schema.BooleanType", "com.linkedin.schema.BytesType", "com.linkedin.schema.DatasetFieldForeignKey", "com.linkedin.schema.DateType", "com.linkedin.schema.EditableSchemaFieldInfo", "com.linkedin.schema.EditableSchemaMetadata", "com.linkedin.schema.EnumType", "com.linkedin.schema.EspressoSchema", "com.linkedin.schema.FixedType", "com.linkedin.schema.ForeignKeyConstraint", "com.linkedin.schema.ForeignKeySpec", "com.linkedin.schema.KafkaSchema", "com.linkedin.schema.KeyValueSchema", "com.linkedin.schema.MapType", "com.linkedin.schema.MySqlDDL", "com.linkedin.schema.NullType", "com.linkedin.schema.NumberType", "com.linkedin.schema.OracleDDL", "com.linkedin.schema.OrcSchema", "com.linkedin.schema.OtherSchema", "com.linkedin.schema.PrestoDDL", "com.linkedin.schema.RecordType", "com.linkedin.schema.SchemaField", "com.linkedin.schema.SchemaFieldDataType", "com.linkedin.schema.SchemaMetadata", "com.linkedin.schema.SchemaMetadataKey", "com.linkedin.schema.Schemaless", "com.linkedin.schema.StringType", "com.linkedin.schema.TimeType", "com.linkedin.schema.UnionType", "com.linkedin.schema.UrnForeignKey", "com.linkedin.tag.TagProperties" ], "schema" : { "name" : "runs", "namespace" : "com.linkedin.entity", @@ -3243,9 +3266,16 @@ "type" : "boolean", "optional" : true }, { + "annotations" : { + "deprecated" : { } + }, "name" : "hardDelete", "type" : "boolean", "optional" : true + }, { + "name" : "safe", + "type" : "boolean", + "optional" : true } ], "returns" : "com.linkedin.metadata.run.RollbackResponse" } ], diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java index 90f1eb94dcd0c7..11f44889646d20 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/BatchIngestionRunResource.java @@ -5,6 +5,8 @@ import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.entity.RollbackRunResult; import com.linkedin.metadata.restli.RestliUtil; +import com.linkedin.metadata.run.UnsafeEntityInfo; +import com.linkedin.metadata.run.UnsafeEntityInfoArray; import com.linkedin.metadata.run.AspectRowSummary; import com.linkedin.metadata.run.AspectRowSummaryArray; import com.linkedin.metadata.run.IngestionRunSummary; @@ -18,14 +20,16 @@ import com.linkedin.restli.server.annotations.RestLiCollection; import com.linkedin.restli.server.resources.CollectionResourceTaskTemplate; import io.opentelemetry.extension.annotations.WithSpan; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; + import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Named; -import lombok.extern.slf4j.Slf4j; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** @@ -37,6 +41,7 @@ public class BatchIngestionRunResource extends CollectionResourceTaskTemplate rollback(@ActionParam("runId") @Nonnull String runId, - @ActionParam("dryRun") @Optional Boolean dryRun, @ActionParam("hardDelete") @Optional Boolean hardDelete) { + @ActionParam("dryRun") @Optional Boolean dryRun, + @Deprecated @ActionParam("hardDelete") @Optional Boolean hardDelete, + @ActionParam("safe") @Optional Boolean safe) { log.info("ROLLBACK RUN runId: {} dry run: {}", runId, dryRun); - boolean doHardDelete = hardDelete != null ? hardDelete : DEFAULT_HARD_DELETE; + boolean doHardDelete = safe != null ? !safe : hardDelete != null ? hardDelete : DEFAULT_HARD_DELETE; + + if (safe != null && hardDelete != null) { + log.warn("Both Safe & hardDelete flags were defined, honouring safe flag as hardDelete is deprecated"); + } return RestliUtil.toTask(() -> { if (runId.equals(EntityService.DEFAULT_RUN_ID)) { @@ -75,22 +86,59 @@ public Task rollback(@ActionParam("runId") @Nonnull String run log.info("found {} rows to delete...", stringifyRowCount(aspectRowsToDelete.size())); if (dryRun) { + final Map> aspectsSplitByIsKeyAspects = aspectRowsToDelete.stream() + .collect(Collectors.partitioningBy(AspectRowSummary::isKeyAspect)); + + final List keyAspects = aspectsSplitByIsKeyAspects.get(true); + + long entitiesDeleted = keyAspects.size(); + long aspectsReverted = aspectRowsToDelete.size(); + + final long affectedEntities = aspectRowsToDelete.stream() + .collect(Collectors.groupingBy(AspectRowSummary::getUrn)).keySet().size(); + + final AspectRowSummaryArray rowSummaries = new AspectRowSummaryArray( + aspectRowsToDelete.subList(0, Math.min(100, aspectRowsToDelete.size()))); + + // If we are soft deleting, remove key aspects from count of aspects being deleted if (!doHardDelete) { - aspectRowsToDelete.removeIf(AspectRowSummary::isKeyAspect); + aspectsReverted -= keyAspects.size(); + rowSummaries.removeIf(AspectRowSummary::isKeyAspect); } - - response.setAspectsAffected(aspectRowsToDelete.size()); - response.setEntitiesAffected( - aspectRowsToDelete.stream().collect(Collectors.groupingBy(AspectRowSummary::getUrn)).keySet().size()); - response.setEntitiesDeleted(aspectRowsToDelete.stream().filter(row -> row.isKeyAspect()).count()); - response.setAspectRowSummaries( - new AspectRowSummaryArray(aspectRowsToDelete.subList(0, Math.min(100, aspectRowsToDelete.size())))); - return response; + // Compute the aspects that exist referencing the key aspects we are deleting + final List affectedAspectsList = keyAspects.stream() + .map((AspectRowSummary urn) -> _systemMetadataService.findByUrn(urn.getUrn(), false)) + .flatMap(List::stream) + .filter(row -> !row.getRunId().equals(runId)) + .collect(Collectors.toList()); + + long affectedAspects = affectedAspectsList.size(); + long unsafeEntitiesCount = affectedAspectsList.stream() + .collect(Collectors.groupingBy(AspectRowSummary::getUrn)).keySet().size(); + + final List unsafeEntityInfos = affectedAspectsList.stream().map(AspectRowSummary::getUrn) + .distinct() + .map(urn -> { + UnsafeEntityInfo unsafeEntityInfo = new UnsafeEntityInfo(); + unsafeEntityInfo.setUrn(urn); + return unsafeEntityInfo; + }) + // Return at most 1 million rows + .limit(DEFAULT_UNSAFE_ENTITIES_PAGE_SIZE) + .collect(Collectors.toList()); + + return response.setAspectsAffected(affectedAspects) + .setAspectsReverted(aspectsReverted) + .setEntitiesAffected(affectedEntities) + .setEntitiesDeleted(entitiesDeleted) + .setUnsafeEntitiesCount(unsafeEntitiesCount) + .setUnsafeEntities(new UnsafeEntityInfoArray(unsafeEntityInfos)) + .setAspectRowSummaries(rowSummaries); } RollbackRunResult rollbackRunResult = _entityService.rollbackRun(aspectRowsToDelete, runId, doHardDelete); - List deletedRows = rollbackRunResult.getRowsRolledBack(); - Integer rowsDeletedFromEntityDeletion = rollbackRunResult.getRowsDeletedFromEntityDeletion(); + final List deletedRows = rollbackRunResult.getRowsRolledBack(); + int rowsDeletedFromEntityDeletion = rollbackRunResult.getRowsDeletedFromEntityDeletion(); // since elastic limits how many rows we can access at once, we need to iteratively delete while (aspectRowsToDelete.size() >= ELASTIC_MAX_PAGE_SIZE) { @@ -104,11 +152,52 @@ public Task rollback(@ActionParam("runId") @Nonnull String run } log.info("finished deleting {} rows", deletedRows.size()); - response.setAspectsAffected(deletedRows.size() + rowsDeletedFromEntityDeletion); - response.setEntitiesAffected(deletedRows.stream().filter(row -> row.isKeyAspect()).count()); - response.setAspectRowSummaries( - new AspectRowSummaryArray(deletedRows.subList(0, Math.min(100, deletedRows.size())))); - return response; + int aspectsReverted = deletedRows.size() + rowsDeletedFromEntityDeletion; + + final Map> aspectsSplitByIsKeyAspects = aspectRowsToDelete.stream() + .collect(Collectors.partitioningBy(AspectRowSummary::isKeyAspect)); + + final List keyAspects = aspectsSplitByIsKeyAspects.get(true); + + final long entitiesDeleted = keyAspects.size(); + final long affectedEntities = deletedRows.stream() + .collect(Collectors.groupingBy(AspectRowSummary::getUrn)).keySet().size(); + + final AspectRowSummaryArray rowSummaries = new AspectRowSummaryArray( + aspectRowsToDelete.subList(0, Math.min(100, aspectRowsToDelete.size()))); + + log.info("computing aspects affected by this rollback..."); + // Compute the aspects that exist referencing the key aspects we are deleting + final List affectedAspectsList = keyAspects.stream() + .map((AspectRowSummary urn) -> _systemMetadataService.findByUrn(urn.getUrn(), false)) + .flatMap(List::stream) + .filter(row -> !row.getRunId().equals(runId)) + .collect(Collectors.toList()); + + long affectedAspects = affectedAspectsList.size(); + long unsafeEntitiesCount = affectedAspectsList.stream() + .collect(Collectors.groupingBy(AspectRowSummary::getUrn)).keySet().size(); + + final List unsafeEntityInfos = affectedAspectsList.stream().map(AspectRowSummary::getUrn) + .distinct() + .map(urn -> { + UnsafeEntityInfo unsafeEntityInfo = new UnsafeEntityInfo(); + unsafeEntityInfo.setUrn(urn); + return unsafeEntityInfo; + }) + // Return at most 1 million rows + .limit(DEFAULT_UNSAFE_ENTITIES_PAGE_SIZE) + .collect(Collectors.toList()); + + log.info("calculation done."); + + return response.setAspectsAffected(affectedAspects) + .setAspectsReverted(aspectsReverted) + .setEntitiesAffected(affectedEntities) + .setEntitiesDeleted(entitiesDeleted) + .setUnsafeEntitiesCount(unsafeEntitiesCount) + .setUnsafeEntities(new UnsafeEntityInfoArray(unsafeEntityInfos)) + .setAspectRowSummaries(rowSummaries); }, MetricRegistry.name(this.getClass(), "rollback")); } @@ -116,7 +205,7 @@ private String stringifyRowCount(int size) { if (size < ELASTIC_MAX_PAGE_SIZE) { return String.valueOf(size); } else { - return "at least " + String.valueOf(size); + return "at least " + size; } }