Skip to content

Commit

Permalink
Package.Fetch: retry fetch on error
Browse files Browse the repository at this point in the history
As noted in ziglang#17472, this should probably be configurable; however, I'm
not sure what the best approach for this is, and it's not necessary for
an MVP. For now, this is hardcoded to make 4 total attempts, with delays
of 200, 800, and 4000 ms respectively between attempts.

I've erred on the side of retrying unnecessarily here; there are
definitely errors we retry on which really ought to trigger the error
immediately. We can work to tighten this up a little in future.

Resolves: ziglang#17472
  • Loading branch information
mlugg committed Oct 10, 2023
1 parent 7abf9b3 commit 0bad878
Showing 1 changed file with 115 additions and 39 deletions.
154 changes: 115 additions & 39 deletions src/Package/Fetch.zig
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,44 @@ pub const RunError = error{
FetchFailed,
};

const RetryableRunError = RunError || error{
/// An error occurred, but it may be temporary (e.g. due to network conditions).
/// Also, we have not yet retried this fetch `retry_delays.len` many times.
/// No error was added to the bundle. Attempt the fetch again after a short delay.
FetchFailedRetryable,
};

/// The delay between fetch attempts (after retryable errors) in ns.
/// The length of this list is the number of times a fetch will be retried.
const retry_delays = [_]u32{
200 * std.time.ns_per_ms,
800 * std.time.ns_per_ms,
4000 * std.time.ns_per_ms,
};

pub fn run(f: *Fetch) RunError!void {
var attempt_count: u32 = 0;
while (true) {
if (f.runInner(&attempt_count)) |_| {
return;
} else |err| switch (err) {
error.FetchFailedRetryable => {
// The function which triggered this error should have checked and updated attempt_count.
assert(attempt_count > 0);
assert(attempt_count <= retry_delays.len);

// Since we'll be sleeping, make sure the progress is up-to-date.
f.prog_node.context.refresh();

std.time.sleep(retry_delays[attempt_count - 1]);
continue;
},
else => |e| return e,
}
}
}

fn runInner(f: *Fetch, attempt_count: *u32) RetryableRunError!void {
const eb = &f.error_bundle;
const arena = f.arena.allocator();
const gpa = f.arena.child_allocator;
Expand Down Expand Up @@ -278,12 +315,12 @@ pub fn run(f: *Fetch) RunError!void {
.path_or_url => |path_or_url| {
if (fs.cwd().openIterableDir(path_or_url, .{})) |dir| {
var resource: Resource = .{ .dir = dir };
return runResource(f, path_or_url, &resource, null);
return runResource(f, path_or_url, &resource, null, attempt_count);
} else |dir_err| {
const file_err = if (dir_err == error.NotDir) e: {
if (fs.cwd().openFile(path_or_url, .{})) |file| {
var resource: Resource = .{ .file = file };
return runResource(f, path_or_url, &resource, null);
return runResource(f, path_or_url, &resource, null, attempt_count);
} else |err| break :e err;
} else dir_err;

Expand All @@ -293,8 +330,9 @@ pub fn run(f: *Fetch) RunError!void {
.{ path_or_url, @errorName(file_err), @errorName(uri_err) },
));
};
var resource = try f.initResource(uri);
return runResource(f, uri.path, &resource, null);

var resource = try f.initResource(uri, attempt_count);
return runResource(f, uri.path, &resource, null, attempt_count);
}
},
};
Expand Down Expand Up @@ -330,8 +368,8 @@ pub fn run(f: *Fetch) RunError!void {
f.location_tok,
try eb.printString("invalid URI: {s}", .{@errorName(err)}),
);
var resource = try f.initResource(uri);
return runResource(f, uri.path, &resource, remote.hash);
var resource = try f.initResource(uri, attempt_count);
return runResource(f, uri.path, &resource, remote.hash, attempt_count);
}

pub fn deinit(f: *Fetch) void {
Expand All @@ -345,7 +383,8 @@ fn runResource(
uri_path: []const u8,
resource: *Resource,
remote_hash: ?Manifest.MultiHashHexDigest,
) RunError!void {
attempt_count: *u32,
) RetryableRunError!void {
defer resource.deinit();
const arena = f.arena.allocator();
const eb = &f.error_bundle;
Expand All @@ -371,7 +410,7 @@ fn runResource(
};
defer tmp_directory.handle.close();

try unpackResource(f, resource, uri_path, tmp_directory);
try unpackResource(f, resource, uri_path, tmp_directory, attempt_count);

// Load, parse, and validate the unpacked build.zig.zon file. It is allowed
// for the file to be missing, in which case this fetched package is
Expand Down Expand Up @@ -707,6 +746,14 @@ fn fail(f: *Fetch, msg_tok: std.zig.Ast.TokenIndex, msg_str: u32) RunError {
return error.FetchFailed;
}

fn failRetryable(f: *Fetch, msg_tok: std.zig.Ast.TokenIndex, comptime msg_format: []const u8, msg_args: anytype, attempt_count: *u32) RetryableRunError {
if (attempt_count.* == retry_delays.len) {
return f.fail(msg_tok, try f.error_bundle.printString(msg_format, msg_args));
}
attempt_count.* += 1;
return error.FetchFailedRetryable;
}

const Resource = union(enum) {
file: fs.File,
http_request: std.http.Client.Request,
Expand Down Expand Up @@ -797,7 +844,7 @@ const FileType = enum {
}
};

fn initResource(f: *Fetch, uri: std.Uri) RunError!Resource {
fn initResource(f: *Fetch, uri: std.Uri, attempt_count: *u32) RetryableRunError!Resource {
const gpa = f.arena.child_allocator;
const arena = f.arena.allocator();
const eb = &f.error_bundle;
Expand All @@ -819,31 +866,39 @@ fn initResource(f: *Fetch, uri: std.Uri) RunError!Resource {
defer h.deinit();

var req = http_client.request(.GET, uri, h, .{}) catch |err| {
return f.fail(f.location_tok, try eb.printString(
return f.failRetryable(
f.location_tok,
"unable to connect to server: {s}",
.{@errorName(err)},
));
attempt_count,
);
};
errdefer req.deinit(); // releases more than memory

req.start(.{}) catch |err| {
return f.fail(f.location_tok, try eb.printString(
return f.failRetryable(
f.location_tok,
"HTTP request failed: {s}",
.{@errorName(err)},
));
attempt_count,
);
};
req.wait() catch |err| {
return f.fail(f.location_tok, try eb.printString(
return f.failRetryable(
f.location_tok,
"invalid HTTP response: {s}",
.{@errorName(err)},
));
attempt_count,
);
};

if (req.response.status != .ok) {
return f.fail(f.location_tok, try eb.printString(
return f.failRetryable(
f.location_tok,
"bad HTTP response code: '{d} {s}'",
.{ @intFromEnum(req.response.status), req.response.status.phrase() orelse "" },
));
attempt_count,
);
}

return .{ .http_request = req };
Expand All @@ -859,16 +914,19 @@ fn initResource(f: *Fetch, uri: std.Uri) RunError!Resource {
session.discoverCapabilities(gpa, &redirect_uri) catch |err| switch (err) {
error.Redirected => {
defer gpa.free(redirect_uri);
// We got a valid response, so this wasn't a network issue, so this is not retryable.
return f.fail(f.location_tok, try eb.printString(
"repository moved to {s}",
.{redirect_uri},
));
},
else => |e| {
return f.fail(f.location_tok, try eb.printString(
return f.failRetryable(
f.location_tok,
"unable to discover remote git server capabilities: {s}",
.{@errorName(e)},
));
attempt_count,
);
},
};

Expand All @@ -883,17 +941,21 @@ fn initResource(f: *Fetch, uri: std.Uri) RunError!Resource {
.ref_prefixes = &.{ want_ref, want_ref_head, want_ref_tag },
.include_peeled = true,
}) catch |err| {
return f.fail(f.location_tok, try eb.printString(
return f.failRetryable(
f.location_tok,
"unable to list refs: {s}",
.{@errorName(err)},
));
attempt_count,
);
};
defer ref_iterator.deinit();
while (ref_iterator.next() catch |err| {
return f.fail(f.location_tok, try eb.printString(
return f.failRetryable(
f.location_tok,
"unable to iterate refs: {s}",
.{@errorName(err)},
));
attempt_count,
);
}) |ref| {
if (std.mem.eql(u8, ref.name, want_ref) or
std.mem.eql(u8, ref.name, want_ref_head) or
Expand All @@ -902,6 +964,7 @@ fn initResource(f: *Fetch, uri: std.Uri) RunError!Resource {
break :want_oid ref.peeled orelse ref.oid;
}
}
// We successfully iterated the refs, so this wasn't a network issue, so this is not retryable.
return f.fail(f.location_tok, try eb.printString("ref not found: {s}", .{want_ref}));
};
if (uri.fragment == null) {
Expand All @@ -925,10 +988,12 @@ fn initResource(f: *Fetch, uri: std.Uri) RunError!Resource {
std.fmt.fmtSliceHexLower(&want_oid),
}) catch unreachable;
var fetch_stream = session.fetch(gpa, &.{&want_oid_buf}) catch |err| {
return f.fail(f.location_tok, try eb.printString(
return f.failRetryable(
f.location_tok,
"unable to create fetch stream: {s}",
.{@errorName(err)},
));
attempt_count,
);
};
errdefer fetch_stream.deinit();

Expand All @@ -949,7 +1014,8 @@ fn unpackResource(
resource: *Resource,
uri_path: []const u8,
tmp_directory: Cache.Directory,
) RunError!void {
attempt_count: *u32,
) RetryableRunError!void {
const eb = &f.error_bundle;
const file_type = switch (resource.*) {
.file => FileType.fromPath(uri_path) orelse
Expand Down Expand Up @@ -1010,16 +1076,19 @@ fn unpackResource(
};

switch (file_type) {
.tar => try unpackTarball(f, tmp_directory.handle, resource.reader()),
.@"tar.gz" => try unpackTarballCompressed(f, tmp_directory.handle, resource, std.compress.gzip),
.@"tar.xz" => try unpackTarballCompressed(f, tmp_directory.handle, resource, std.compress.xz),
.tar => try unpackTarball(f, tmp_directory.handle, resource.reader(), attempt_count),
.@"tar.gz" => try unpackTarballCompressed(f, tmp_directory.handle, resource, std.compress.gzip, attempt_count),
.@"tar.xz" => try unpackTarballCompressed(f, tmp_directory.handle, resource, std.compress.xz, attempt_count),
.git_pack => unpackGitPack(f, tmp_directory.handle, resource) catch |err| switch (err) {
error.FetchFailed => return error.FetchFailed,
error.OutOfMemory => return error.OutOfMemory,
else => |e| return f.fail(f.location_tok, try eb.printString(
// TODO: don't mark actual git errors as retryable. We're only interested in reader errors.
else => |e| return f.failRetryable(
f.location_tok,
"unable to unpack git files: {s}",
.{@errorName(e)},
)),
attempt_count,
),
},
}
}
Expand All @@ -1029,7 +1098,8 @@ fn unpackTarballCompressed(
out_dir: fs.Dir,
resource: *Resource,
comptime Compression: type,
) RunError!void {
attempt_count: *u32,
) RetryableRunError!void {
const gpa = f.arena.child_allocator;
const eb = &f.error_bundle;
const reader = resource.reader();
Expand All @@ -1043,10 +1113,10 @@ fn unpackTarballCompressed(
};
defer decompress.deinit();

return unpackTarball(f, out_dir, decompress.reader());
return unpackTarball(f, out_dir, decompress.reader(), attempt_count);
}

fn unpackTarball(f: *Fetch, out_dir: fs.Dir, reader: anytype) RunError!void {
fn unpackTarball(f: *Fetch, out_dir: fs.Dir, reader: anytype, attempt_count: *u32) RetryableRunError!void {
const eb = &f.error_bundle;
const gpa = f.arena.child_allocator;

Expand All @@ -1063,10 +1133,15 @@ fn unpackTarball(f: *Fetch, out_dir: fs.Dir, reader: anytype) RunError!void {
// bit on Windows from the ACLs (see the isExecutable function).
.mode_mode = .ignore,
.exclude_empty_directories = true,
}) catch |err| return f.fail(f.location_tok, try eb.printString(
"unable to unpack tarball to temporary directory: {s}",
.{@errorName(err)},
));
}) catch |err| {
// TODO: don't mark actual tar errors as retryable. We're only interested in reader errors.
return f.failRetryable(
f.location_tok,
"unable to unpack tarball to temporary directory: {s}",
.{@errorName(err)},
attempt_count,
);
};

if (diagnostics.errors.items.len > 0) {
const notes_len: u32 = @intCast(diagnostics.errors.items.len);
Expand Down Expand Up @@ -1163,7 +1238,8 @@ fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource) anyerror!void
},
}
}
return error.InvalidGitPack;
// Use FetchFailed since we already added an error.
return error.FetchFailed;
}
}
}
Expand Down

0 comments on commit 0bad878

Please sign in to comment.