Skip to content

Commit

Permalink
Add a Wrapper to allow use by an external IO implementation
Browse files Browse the repository at this point in the history
And rename Loop -> IO for better wording

Signed-off-by: Francis Bouvier <francis@lightpanda.io>
  • Loading branch information
francisbouvier committed Nov 20, 2024
1 parent 2fc7a24 commit 6324fdb
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 62 deletions.
67 changes: 67 additions & 0 deletions src/Blocking.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
const std = @import("std");

// Blocking is an example implementation of an IO API
// following the zig-async-io model.
// As it name suggests in this implementation all operations are
// in fact blocking, the async API is just faked.
pub const Blocking = @This();

pub const Completion = void;

pub const ConnectError = std.posix.ConnectError;
pub const SendError = std.posix.WriteError;
pub const RecvError = std.posix.ReadError;

pub fn connect(
_: *Blocking,
comptime CtxT: type,
ctx: *CtxT,
_: *Completion,
comptime cbk: fn (ctx: *CtxT, _: *Completion, res: ConnectError!void) void,
socket: std.posix.socket_t,
address: std.net.Address,
) void {
std.posix.connect(socket, &address.any, address.getOsSockLen()) catch |err| {
cbk(ctx, @constCast(&{}), err);
return;
};
cbk(ctx, @constCast(&{}), {});
}

pub fn onConnect(_: *Blocking, _: ConnectError!void) void {}

pub fn send(
_: *Blocking,
comptime CtxT: type,
ctx: *CtxT,
_: *Completion,
comptime cbk: fn (ctx: *CtxT, _: *Completion, res: SendError!usize) void,
socket: std.posix.socket_t,
buf: []const u8,
) void {
const len = std.posix.write(socket, buf) catch |err| {
cbk(ctx, @constCast(&{}), err);
return;
};
cbk(ctx, @constCast(&{}), len);
}

pub fn onSend(_: *Blocking, _: SendError!usize) void {}

pub fn recv(
_: *Blocking,
comptime CtxT: type,
ctx: *CtxT,
_: *Completion,
comptime cbk: fn (ctx: *CtxT, _: *Completion, res: RecvError!usize) void,
socket: std.posix.socket_t,
buf: []u8,
) void {
const len = std.posix.read(socket, buf) catch |err| {
cbk(ctx, @constCast(&{}), err);
return;
};
cbk(ctx, @constCast(&{}), len);
}

pub fn onRecv(_: *Blocking, _: RecvError!usize) void {}
192 changes: 140 additions & 52 deletions src/io.zig
Original file line number Diff line number Diff line change
@@ -1,59 +1,147 @@
const std = @import("std");

pub const Ctx = @import("std/http/Client.zig").Ctx;
pub const Cbk = @import("std/http/Client.zig").Cbk;

pub const Blocking = struct {
pub fn connect(
_: *Blocking,
comptime ctxT: type,
ctx: *ctxT,
comptime cbk: Cbk,
socket: std.posix.socket_t,
address: std.net.Address,
) void {
std.posix.connect(socket, &address.any, address.getOsSockLen()) catch |err| {
std.posix.close(socket);
cbk(ctx, err) catch |e| {
ctx.setErr(e);
};
};
cbk(ctx, {}) catch |e| ctx.setErr(e);
// IO is a type defined via a root declaration.
// It must implements the following methods:
// - connect, onConnect
// - send, onSend
// - recv, onRecv
// It must also define the following types:
// - Completion
// - ConnectError
// - SendError
// - RecvError
// see Blocking.io for an implementation example.
pub const IO = blk: {
const root = @import("root");
if (@hasDecl(root, "IO")) {
break :blk root.IO;
}
@compileError("no IO API defined at root");
};

// Wrapper for a base IO API.
pub fn Wrapper(IO_T: type) type {
return struct {
io: *IO_T,
completion: IO_T.Completion,

const Self = @This();

pub fn init(io: *IO_T) Self {
return .{ .io = io, .completion = undefined };
}

// NOTE: Business methods connect, send, recv expect a Ctx
// who should reference the base IO API in Ctx.io field

// NOTE: Ctx is already none (ie. @import("std/http/Client.zig").Ctx)
// but we require to provide it's type (comptime) as argument
// to avoid dependancy loop
// ie. Wrapper requiring Ctx and Ctx requiring Wrapper

fn Cbk(comptime Ctx: type) type {
return *const fn (ctx: *Ctx, res: anyerror!void) anyerror!void;
}

pub fn send(
_: *Blocking,
comptime ctxT: type,
ctx: *ctxT,
comptime cbk: Cbk,
socket: std.posix.socket_t,
buf: []const u8,
) void {
const len = std.posix.write(socket, buf) catch |err| {
cbk(ctx, err) catch |e| {
return ctx.setErr(e);
pub fn connect(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
comptime cbk: Cbk(Ctx),
socket: std.posix.socket_t,
address: std.net.Address,
) void {
self.io.connect(Ctx, ctx, &self.completion, onConnect(Ctx, cbk), socket, address);
}

fn onConnectFn(comptime Ctx: type) type {
return fn (
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.ConnectError!void,
) void;
}
fn onConnect(comptime Ctx: type, comptime cbk: Cbk(Ctx)) onConnectFn(Ctx) {
const s = struct {
fn on(
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.ConnectError!void,
) void {
ctx.io.io.onConnect(result); // base IO callback
_ = result catch |err| return ctx.setErr(err);
cbk(ctx, {}) catch |err| return ctx.setErr(err);
}
};
return ctx.setErr(err);
};
ctx.setLen(len);
cbk(ctx, {}) catch |e| ctx.setErr(e);
}
return s.on;
}

pub fn recv(
_: *Blocking,
comptime ctxT: type,
ctx: *ctxT,
comptime cbk: Cbk,
socket: std.posix.socket_t,
buf: []u8,
) void {
const len = std.posix.read(socket, buf) catch |err| {
cbk(ctx, err) catch |e| {
return ctx.setErr(e);
pub fn send(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
comptime cbk: Cbk(Ctx),
socket: std.posix.socket_t,
buf: []const u8,
) void {
self.io.send(Ctx, ctx, &self.completion, onSend(Ctx, cbk), socket, buf);
}

fn onSendFn(comptime Ctx: type) type {
return fn (
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.SendError!usize,
) void;
}
fn onSend(comptime Ctx: type, comptime cbk: Cbk(Ctx)) onSendFn(Ctx) {
const s = struct {
fn on(
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.SendError!usize,
) void {
ctx.io.io.onSend(result); // base IO callback
const len = result catch |err| return ctx.setErr(err);
ctx.setLen(len);
cbk(ctx, {}) catch |e| ctx.setErr(e);
}
};
return ctx.setErr(err);
};
ctx.setLen(len);
cbk(ctx, {}) catch |e| ctx.setErr(e);
}
};
return s.on;
}

pub fn recv(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
comptime cbk: Cbk(Ctx),
socket: std.posix.socket_t,
buf: []u8,
) void {
self.io.recv(Ctx, ctx, &self.completion, onRecv(Ctx, cbk), socket, buf);
}

fn onRecvFn(comptime Ctx: type) type {
return fn (
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.RecvError!usize,
) void;
}
fn onRecv(comptime Ctx: type, comptime cbk: Cbk(Ctx)) onRecvFn(Ctx) {
const s = struct {
fn do(
ctx: *Ctx,
_: *IO_T.Completion,
result: IO_T.RecvError!usize,
) void {
ctx.io.io.onRecv(result); // base IO callback
const len = result catch |err| return ctx.setErr(err);
ctx.setLen(len);
cbk(ctx, {}) catch |err| return ctx.setErr(err);
}
};
return s.do;
}
};
}
7 changes: 5 additions & 2 deletions src/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ const std = @import("std");
const stack = @import("stack.zig");
pub const Client = @import("std/http/Client.zig");
const Ctx = Client.Ctx;
const Loop = @import("io.zig").Blocking;
pub const Wrapper = @import("io.zig").Wrapper;
const Blocking = @import("blocking.zig").Blocking;
pub const IO = Wrapper(Blocking);

const root = @import("root");

Expand Down Expand Up @@ -53,7 +55,8 @@ pub fn run() !void {
};
const alloc = gpa.allocator();

var loop = Loop{};
var blocking = Blocking{};
var loop = IO.init(&blocking);

var client = Client{ .allocator = alloc };
defer client.deinit();
Expand Down
9 changes: 4 additions & 5 deletions src/std/http/Client.zig
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ const proto = @import("protocol.zig");
const tls23 = @import("../../tls.zig/main.zig");
const VecPut = @import("../../tls.zig/connection.zig").VecPut;
const GenericStack = @import("../../stack.zig").Stack;
const async_io = @import("../../io.zig");
const Loop = async_io.Blocking;
pub const IO = @import("../../io.zig").IO;

const cipher = @import("../../tls.zig/cipher.zig");

Expand Down Expand Up @@ -2390,7 +2389,7 @@ pub const Ctx = struct {

userData: *anyopaque = undefined,

loop: *Loop,
io: *IO,
data: Data,
stack: ?*Stack = null,
err: ?anyerror = null,
Expand Down Expand Up @@ -2419,7 +2418,7 @@ pub const Ctx = struct {
_tls_write_index: usize = 0,
_tls_write_buf: [cipher.max_ciphertext_record_len]u8 = undefined,

pub fn init(loop: *Loop, req: *Request) !Ctx {
pub fn init(io: *IO, req: *Request) !Ctx {
const connection = try req.client.allocator.create(Connection);
connection.* = .{
.stream = undefined,
Expand All @@ -2430,7 +2429,7 @@ pub const Ctx = struct {
};
return .{
.req = req,
.loop = loop,
.io = io,
.data = .{ .conn = connection },
};
}
Expand Down
6 changes: 3 additions & 3 deletions src/std/net.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1908,7 +1908,7 @@ pub const Stream = struct {
ctx: *Ctx,
comptime cbk: Cbk,
) !void {
return ctx.loop.recv(Ctx, ctx, cbk, self.handle, buffer);
return ctx.io.recv(Ctx, ctx, cbk, self.handle, buffer);
}

pub fn async_readv(
Expand All @@ -1924,7 +1924,7 @@ pub const Stream = struct {

// TODO: why not take a buffer here?
pub fn async_write(self: Stream, buffer: []const u8, ctx: *Ctx, comptime cbk: Cbk) void {
return ctx.loop.send(Ctx, ctx, cbk, self.handle, buffer);
return ctx.io.send(Ctx, ctx, cbk, self.handle, buffer);
}

fn onWriteAll(ctx: *Ctx, res: anyerror!void) anyerror!void {
Expand Down Expand Up @@ -2033,7 +2033,7 @@ pub fn async_tcpConnectToAddress(address: std.net.Address, ctx: *Ctx, comptime c
ctx.data.socket = sockfd;
ctx.push(cbk) catch |e| return ctx.pop(e);

ctx.loop.connect(
ctx.io.connect(
Ctx,
ctx,
setStream,
Expand Down

0 comments on commit 6324fdb

Please sign in to comment.