Skip to content

Commit

Permalink
Make fetch() faster at uploading files over http:// (oven-sh#16303)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner authored Jan 10, 2025
1 parent c69aa3c commit 138cf7e
Showing 1 changed file with 224 additions and 100 deletions.
324 changes: 224 additions & 100 deletions src/http.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1048,8 +1048,69 @@ pub const HTTPThread = struct {

has_awoken: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
timer: std.time.Timer,

lazy_libdeflater: ?*LibdeflateState = null,
lazy_request_body_buffer: ?*HeapRequestBodyBuffer = null,

pub const HeapRequestBodyBuffer = struct {
buffer: [512 * 1024]u8 = undefined,
fixed_buffer_allocator: std.heap.FixedBufferAllocator,

pub usingnamespace bun.New(@This());

pub fn init() *@This() {
var this = HeapRequestBodyBuffer.new(.{
.fixed_buffer_allocator = undefined,
});
this.fixed_buffer_allocator = std.heap.FixedBufferAllocator.init(&this.buffer);
return this;
}

pub fn put(this: *@This()) void {
if (http_thread.lazy_request_body_buffer == null) {
// This case hypothetically should never happen
this.fixed_buffer_allocator.reset();
http_thread.lazy_request_body_buffer = this;
} else {
this.deinit();
}
}

pub fn deinit(this: *@This()) void {
this.destroy();
}
};

pub const RequestBodyBuffer = union(enum) {
heap: *HeapRequestBodyBuffer,
stack: std.heap.StackFallbackAllocator(request_body_send_stack_buffer_size),

pub fn deinit(this: *@This()) void {
switch (this.*) {
.heap => |heap| heap.put(),
.stack => {},
}
}

pub fn allocatedSlice(this: *@This()) []u8 {
return switch (this.*) {
.heap => |heap| &heap.buffer,
.stack => |*stack| &stack.buffer,
};
}

pub fn allocator(this: *@This()) std.mem.Allocator {
return switch (this.*) {
.heap => |heap| heap.fixed_buffer_allocator.allocator(),
.stack => |*stack| stack.get(),
};
}

pub fn toArrayList(this: *@This()) std.ArrayList(u8) {
var arraylist = std.ArrayList(u8).fromOwnedSlice(this.allocator(), this.allocatedSlice());
arraylist.items.len = 0;
return arraylist;
}
};

const threadlog = Output.scoped(.HTTPThread, true);
const WriteMessage = struct {
Expand All @@ -1072,6 +1133,24 @@ pub const HTTPThread = struct {
pub usingnamespace bun.New(@This());
};

const request_body_send_stack_buffer_size = 32 * 1024;

pub inline fn getRequestBodySendBuffer(this: *@This(), estimated_size: usize) RequestBodyBuffer {
if (estimated_size >= request_body_send_stack_buffer_size) {
if (this.lazy_request_body_buffer == null) {
log("Allocating HeapRequestBodyBuffer due to {d} bytes request body", .{estimated_size});
return .{
.heap = HeapRequestBodyBuffer.init(),
};
}

return .{ .heap = bun.take(&this.lazy_request_body_buffer).? };
}
return .{
.stack = std.heap.stackFallback(request_body_send_stack_buffer_size, bun.default_allocator),
};
}

pub fn deflater(this: *@This()) *LibdeflateState {
if (this.lazy_libdeflater == null) {
this.lazy_libdeflater = LibdeflateState.new(.{
Expand Down Expand Up @@ -1667,6 +1746,23 @@ pub inline fn getAllocator() std.mem.Allocator {
return default_allocator;
}

const max_tls_record_size = 16 * 1024;

/// Get the buffer we use to write data to the network.
///
/// For large files, we want to avoid extra network send overhead
/// So we do two things:
/// 1. Use a 32 KB stack buffer for small files
/// 2. Use a 512 KB heap buffer for large files
/// This only has an impact on http://
///
/// On https://, we are limited to a 16 KB TLS record size.
inline fn getRequestBodySendBuffer(this: *@This()) HTTPThread.RequestBodyBuffer {
const actual_estimated_size = this.state.request_body.len + this.estimatedRequestHeaderByteLength();
const estimated_size = if (this.isHTTPS()) @min(actual_estimated_size, max_tls_record_size) else actual_estimated_size * 2;
return http_thread.getRequestBodySendBuffer(estimated_size);
}

pub inline fn cleanup(force: bool) void {
default_arena.gc(force);
}
Expand Down Expand Up @@ -3058,6 +3154,114 @@ pub fn onPreconnect(this: *HTTPClient, comptime is_ssl: bool, socket: NewHTTPCon
this.result_callback.run(@fieldParentPtr("client", this), HTTPClientResult{ .fail = null, .metadata = null, .has_more = false });
}

fn estimatedRequestHeaderByteLength(this: *const HTTPClient) usize {
const sliced = this.header_entries.slice();
var count: usize = 0;
for (sliced.items(.name)) |head| {
count += @as(usize, head.length);
}
for (sliced.items(.value)) |value| {
count += @as(usize, value.length);
}
return count;
}

const InitialRequestPayloadResult = struct {
has_sent_headers: bool,
has_sent_body: bool,
try_sending_more_data: bool,
};

// This exists as a separate function to reduce the amount of time the request body buffer is kept around.
noinline fn sendInitialRequestPayload(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) !InitialRequestPayloadResult {
var request_body_buffer = this.getRequestBodySendBuffer();
defer request_body_buffer.deinit();
var temporary_send_buffer = request_body_buffer.toArrayList();
defer temporary_send_buffer.deinit();

const writer = &temporary_send_buffer.writer();

const request = this.buildRequest(this.state.original_request_body.len());

if (this.http_proxy) |_| {
if (this.url.isHTTPS()) {
//DO the tunneling!
this.flags.proxy_tunneling = true;
try writeProxyConnect(@TypeOf(writer), writer, this);
} else {
// HTTP do not need tunneling with CONNECT just a slightly different version of the request
try writeProxyRequest(
@TypeOf(writer),
writer,
request,
this,
);
}
} else {
try writeRequest(
@TypeOf(writer),
writer,
request,
);
}

const headers_len = temporary_send_buffer.items.len;
assert(temporary_send_buffer.items.len == writer.context.items.len);
if (this.state.request_body.len > 0 and temporary_send_buffer.capacity - temporary_send_buffer.items.len > 0 and !this.flags.proxy_tunneling) {
var remain = temporary_send_buffer.items.ptr[temporary_send_buffer.items.len..temporary_send_buffer.capacity];
const wrote = @min(remain.len, this.state.request_body.len);
assert(wrote > 0);
@memcpy(remain[0..wrote], this.state.request_body[0..wrote]);
temporary_send_buffer.items.len += wrote;
}

const to_send = temporary_send_buffer.items[this.state.request_sent_len..];
if (comptime Environment.allow_assert) {
assert(!socket.isShutdown());
assert(!socket.isClosed());
}
const amount = socket.write(
to_send,
false,
);
if (comptime is_first_call) {
if (amount == 0) {
// don't worry about it
return .{
.has_sent_headers = this.state.request_sent_len >= headers_len,
.has_sent_body = false,
.try_sending_more_data = false,
};
}
}

if (amount < 0) {
return error.WriteFailed;
}

this.state.request_sent_len += @as(usize, @intCast(amount));
const has_sent_headers = this.state.request_sent_len >= headers_len;

if (has_sent_headers and this.verbose != .none) {
printRequest(request, this.url.href, !this.flags.reject_unauthorized, this.state.request_body, this.verbose == .curl);
}

if (has_sent_headers and this.state.request_body.len > 0) {
this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..];
}

const has_sent_body = if (this.state.original_request_body == .bytes)
this.state.request_body.len == 0
else
false;

return .{
.has_sent_headers = has_sent_headers,
.has_sent_body = has_sent_body,
.try_sending_more_data = amount == @as(c_int, @intCast(to_send.len)) and (!has_sent_body or !has_sent_headers),
};
}

pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_ssl: bool, socket: NewHTTPContext(is_ssl).HTTPSocket) void {
if (this.signals.get(.aborted)) {
this.closeAndAbort(is_ssl, socket);
Expand All @@ -3077,95 +3281,14 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s

switch (this.state.request_stage) {
.pending, .headers => {
var stack_fallback = std.heap.stackFallback(16384, default_allocator);
const allocator = stack_fallback.get();
var list = std.ArrayList(u8).initCapacity(allocator, stack_fallback.buffer.len) catch unreachable;
defer if (list.capacity > stack_fallback.buffer.len) list.deinit();
const writer = &list.writer();

this.setTimeout(socket, 5);

const request = this.buildRequest(this.state.original_request_body.len());

if (this.http_proxy) |_| {
if (this.url.isHTTPS()) {

//DO the tunneling!
this.flags.proxy_tunneling = true;
writeProxyConnect(@TypeOf(writer), writer, this) catch {
this.closeAndFail(error.OutOfMemory, is_ssl, socket);
return;
};
} else {
//HTTP do not need tunneling with CONNECT just a slightly different version of the request

writeProxyRequest(
@TypeOf(writer),
writer,
request,
this,
) catch {
this.closeAndFail(error.OutOfMemory, is_ssl, socket);
return;
};
}
} else {
writeRequest(
@TypeOf(writer),
writer,
request,
) catch {
this.closeAndFail(error.OutOfMemory, is_ssl, socket);
return;
};
}

const headers_len = list.items.len;
assert(list.items.len == writer.context.items.len);
if (this.state.request_body.len > 0 and list.capacity - list.items.len > 0 and !this.flags.proxy_tunneling) {
var remain = list.items.ptr[list.items.len..list.capacity];
const wrote = @min(remain.len, this.state.request_body.len);
assert(wrote > 0);
@memcpy(remain[0..wrote], this.state.request_body[0..wrote]);
list.items.len += wrote;
}

const to_send = list.items[this.state.request_sent_len..];
if (comptime Environment.allow_assert) {
assert(!socket.isShutdown());
assert(!socket.isClosed());
}
const amount = socket.write(
to_send,
false,
);
if (comptime is_first_call) {
if (amount == 0) {
// don't worry about it
return;
}
}

if (amount < 0) {
this.closeAndFail(error.WriteFailed, is_ssl, socket);
const result = sendInitialRequestPayload(this, is_first_call, is_ssl, socket) catch |err| {
this.closeAndFail(err, is_ssl, socket);
return;
}

this.state.request_sent_len += @as(usize, @intCast(amount));
const has_sent_headers = this.state.request_sent_len >= headers_len;

if (has_sent_headers and this.verbose != .none) {
printRequest(request, this.url.href, !this.flags.reject_unauthorized, this.state.request_body, this.verbose == .curl);
}

if (has_sent_headers and this.state.request_body.len > 0) {
this.state.request_body = this.state.request_body[this.state.request_sent_len - headers_len ..];
}

const has_sent_body = if (this.state.original_request_body == .bytes)
this.state.request_body.len == 0
else
false;
};
const has_sent_headers = result.has_sent_headers;
const has_sent_body = result.has_sent_body;
const try_sending_more_data = result.try_sending_more_data;

if (has_sent_headers and has_sent_body) {
if (this.flags.proxy_tunneling) {
Expand Down Expand Up @@ -3197,7 +3320,7 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
);

// we sent everything, but there's some body leftover
if (amount == @as(c_int, @intCast(to_send.len))) {
if (try_sending_more_data) {
this.onWritable(false, is_ssl, socket);
}
} else {
Expand Down Expand Up @@ -3320,11 +3443,12 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
.proxy_headers => {
if (this.proxy_tunnel) |proxy| {
this.setTimeout(socket, 5);
var stack_fallback = std.heap.stackFallback(16384, default_allocator);
const allocator = stack_fallback.get();
var list = std.ArrayList(u8).initCapacity(allocator, stack_fallback.buffer.len) catch unreachable;
defer if (list.capacity > stack_fallback.buffer.len) list.deinit();
const writer = &list.writer();
var stack_buffer = std.heap.stackFallback(1024 * 16, bun.default_allocator);
const allocator = stack_buffer.get();
var temporary_send_buffer = std.ArrayList(u8).fromOwnedSlice(allocator, &stack_buffer.buffer);
temporary_send_buffer.items.len = 0;
defer temporary_send_buffer.deinit();
const writer = &temporary_send_buffer.writer();

const request = this.buildRequest(this.state.request_body.len);
writeRequest(
Expand All @@ -3336,17 +3460,17 @@ pub fn onWritable(this: *HTTPClient, comptime is_first_call: bool, comptime is_s
return;
};

const headers_len = list.items.len;
assert(list.items.len == writer.context.items.len);
if (this.state.request_body.len > 0 and list.capacity - list.items.len > 0) {
var remain = list.items.ptr[list.items.len..list.capacity];
const headers_len = temporary_send_buffer.items.len;
assert(temporary_send_buffer.items.len == writer.context.items.len);
if (this.state.request_body.len > 0 and temporary_send_buffer.capacity - temporary_send_buffer.items.len > 0) {
var remain = temporary_send_buffer.items.ptr[temporary_send_buffer.items.len..temporary_send_buffer.capacity];
const wrote = @min(remain.len, this.state.request_body.len);
assert(wrote > 0);
@memcpy(remain[0..wrote], this.state.request_body[0..wrote]);
list.items.len += wrote;
temporary_send_buffer.items.len += wrote;
}

const to_send = list.items[this.state.request_sent_len..];
const to_send = temporary_send_buffer.items[this.state.request_sent_len..];
if (comptime Environment.allow_assert) {
assert(!socket.isShutdown());
assert(!socket.isClosed());
Expand Down

0 comments on commit 138cf7e

Please sign in to comment.