diff --git a/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua b/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua index 874923c5f04..62c44326a93 100644 --- a/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua +++ b/pkg/actions/lua/lakefs/catalogexport/delta_exporter.lua @@ -5,6 +5,11 @@ local utils = require("lakefs/catalogexport/internal") local extractor = require("lakefs/catalogexport/table_extractor") local strings = require("strings") local url = require("net/url") + +local function isTableNotEmpty(t) + return next(t) ~= nil +end + --[[ delta_log_entry_key_generator returns a closure that returns a Delta Lake version key according to the Delta Lake protocol: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#delta-log-entries @@ -40,7 +45,7 @@ end delta_client: - get_table: function(repo, ref, prefix) - + path_transformer: function(path) used for transforming path scheme (ex: Azure https to abfss) ]] @@ -87,6 +92,7 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli ]] local table_log = {} local keyGenerator = delta_log_entry_key_generator() + local unfound_paths = {} for _, key in ipairs(sortedKeys) do local content = t[key] local entry_log = {} @@ -128,6 +134,14 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli elseif entry.remove ~= nil then entry.remove.path = physical_path end + elseif code == 404 then + if entry.remove ~= nil then + -- If the object is not found, and the entry is a remove entry, we can assume it was vacuumed + print(string.format("Object with path '%s' of a `remove` entry wasn't found. Assuming vacuum.", unescaped_path)) + unfound_paths[unescaped_path] = nil + else + unfound_paths[unescaped_path] = true + end else error("failed stat_object with code: " .. tostring(code) .. ", and path: " .. unescaped_path) end @@ -138,6 +152,17 @@ local function export_delta_log(action, table_def_names, write_object, delta_cli table_log[keyGenerator()] = entry_log end + if isTableNotEmpty(unfound_paths) then + local unfound_paths_str = "" + for p, v in pairs(unfound_paths) do + if v ~= nil then + unfound_paths_str = pathlib.join(" ", unfound_paths_str, p) + print(p) + end + end + error("The following objects were not found: " .. unfound_paths) + end + local table_export_prefix = utils.get_storage_uri_prefix(ns, commit_id, action) local table_physical_path = pathlib.join("/", table_export_prefix, table_name) local table_log_physical_path = pathlib.join("/", table_physical_path, "_delta_log")