From 8f16dbdd7b10d586266e8042eb5b51c08808c8f2 Mon Sep 17 00:00:00 2001 From: Jonathan Rosenberg Date: Mon, 9 Dec 2024 12:41:33 +0200 Subject: [PATCH 1/2] assume unfound removed entries were vacuumed --- pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua b/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua index 874923c5f04..fb2f8641b33 100644 --- a/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua +++ b/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua @@ -40,7 +40,7 @@ end delta_client: - get_table: function(repo, ref, prefix) - + path_transformer: function(path) used for transforming path scheme (ex: Azure https to abfss) ]] @@ -128,6 +128,9 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli elseif entry.remove ~= nil then entry.remove.path = physical_path end + elseif code == 404 and entry.remove ~= nil then + -- If the object is not found, and the entry is a remove entry, we can assume it was vacuumed + print(string.format("REMOVE entry object `%s` wasn't found. Assuming vacuum.", unescaped_path)) else error("failed stat_object with code: " .. tostring(code) .. ", and path: " .. unescaped_path) end From a1a0729fe9daaf446eee64472ab91e4f62833ac1 Mon Sep 17 00:00:00 2001 From: Jonathan Rosenberg Date: Mon, 9 Dec 2024 14:01:49 +0200 Subject: [PATCH 2/2] handle 'add' entries accordingly --- .../lakefs/catalogexport/delta_exporter.lua | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua b/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua index fb2f8641b33..62c44326a93 100644 --- a/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua +++ b/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua @@ -5,6 +5,11 @@ local utils = require("lakefs/catalogexport/internal") local extractor = require("lakefs/catalogexport/table_extractor") local strings = require("strings") local url = require("net/url") + +local function isTableNotEmpty(t) + return next(t) ~= nil +end + --[[ delta_log_entry_key_generator returns a closure that returns a Delta Lake version key according to the Delta Lake protocol: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-log-entries @@ -87,6 +92,7 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli ]] local table_log = {} local keyGenerator = delta_log_entry_key_generator() + local unfound_paths = {} for _, key in ipairs(sortedKeys) do local content = t[key] local entry_log = {} @@ -128,9 +134,14 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli elseif entry.remove ~= nil then entry.remove.path = physical_path end - elseif code == 404 and entry.remove ~= nil then - -- If the object is not found, and the entry is a remove entry, we can assume it was vacuumed - print(string.format("REMOVE entry object `%s` wasn't found. Assuming vacuum.", unescaped_path)) + elseif code == 404 then + if entry.remove ~= nil then + -- If the object is not found, and the entry is a remove entry, we can assume it was vacuumed + print(string.format("Object with path '%s' of a `remove` entry wasn't found. Assuming vacuum.", unescaped_path)) + unfound_paths[unescaped_path] = nil + else + unfound_paths[unescaped_path] = true + end else error("failed stat_object with code: " .. tostring(code) .. ", and path: " .. unescaped_path) end @@ -141,6 +152,17 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli table_log[keyGenerator()] = entry_log end + if isTableNotEmpty(unfound_paths) then + local unfound_paths_str = "" + for p, v in pairs(unfound_paths) do + if v ~= nil then + unfound_paths_str = pathlib.join(" ", unfound_paths_str, p) + print(p) + end + end + error("The following objects were not found: " .. unfound_paths) + end + local table_export_prefix = utils.get_storage_uri_prefix(ns, commit_id, action) local table_physical_path = pathlib.join("/", table_export_prefix, table_name) local table_log_physical_path = pathlib.join("/", table_physical_path, "_delta_log")