diff --git a/async/BUILD.bazel b/async/BUILD.bazel index 933b166..8eb40dc 100644 --- a/async/BUILD.bazel +++ b/async/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_zig//zig:defs.bzl", "zig_library") zig_library( name = "async", srcs = [ + "queue.zig", "queue_mpsc.zig", ], import_name = "async", diff --git a/async/queue.zig b/async/queue.zig new file mode 100644 index 0000000..aa3dfe5 --- /dev/null +++ b/async/queue.zig @@ -0,0 +1,101 @@ +const std = @import("std"); +const assert = std.debug.assert; + +/// An intrusive queue implementation. The type T must have a field +/// "next" of type `?*T`. +/// +/// For those unaware, an intrusive variant of a data structure is one in which +/// the data type in the list has the pointer to the next element, rather +/// than a higher level "node" or "container" type. The primary benefit +/// of this (and the reason we implement this) is that it defers all memory +/// management to the caller: the data structure implementation doesn't need +/// to allocate "nodes" to contain each element. Instead, the caller provides +/// the element and how its allocated is up to them. +pub fn Intrusive(comptime T: type) type { + return struct { + const Self = @This(); + + /// Head is the front of the queue and tail is the back of the queue. + head: ?*T = null, + tail: ?*T = null, + + /// Enqueue a new element to the back of the queue. + pub fn push(self: *Self, v: *T) void { + assert(v.next == null); + + if (self.tail) |tail| { + // If we have elements in the queue, then we add a new tail. + tail.next = v; + self.tail = v; + } else { + // No elements in the queue we setup the initial state. + self.head = v; + self.tail = v; + } + } + + /// Dequeue the next element from the queue. + pub fn pop(self: *Self) ?*T { + // The next element is in "head". + const next = self.head orelse return null; + + // If the head and tail are equal this is the last element + // so we also set tail to null so we can now be empty. + if (self.head == self.tail) self.tail = null; + + // Head is whatever is next (if we're the last element, + // this will be null); + self.head = next.next; + + // We set the "next" field to null so that this element + // can be inserted again. + next.next = null; + return next; + } + + /// Returns true if the queue is empty. + pub fn empty(self: *const Self) bool { + return self.head == null; + } + }; +} + +test Intrusive { + const testing = std.testing; + + // Types + const Elem = struct { + const Self = @This(); + next: ?*Self = null, + }; + const Queue = Intrusive(Elem); + var q: Queue = .{}; + try testing.expect(q.empty()); + + // Elems + var elems: [10]Elem = .{.{}} ** 10; + + // One + try testing.expect(q.pop() == null); + q.push(&elems[0]); + try testing.expect(!q.empty()); + try testing.expect(q.pop().? == &elems[0]); + try testing.expect(q.pop() == null); + try testing.expect(q.empty()); + + // Two + try testing.expect(q.pop() == null); + q.push(&elems[0]); + q.push(&elems[1]); + try testing.expect(q.pop().? == &elems[0]); + try testing.expect(q.pop().? == &elems[1]); + try testing.expect(q.pop() == null); + + // Interleaved + try testing.expect(q.pop() == null); + q.push(&elems[0]); + try testing.expect(q.pop().? == &elems[0]); + q.push(&elems[1]); + try testing.expect(q.pop().? == &elems[1]); + try testing.expect(q.pop() == null); +} diff --git a/async/zigcoro.zig b/async/zigcoro.zig index 13fdbf3..1f8fe93 100644 --- a/async/zigcoro.zig +++ b/async/zigcoro.zig @@ -5,6 +5,51 @@ const libcoro = @import("libcoro"); const aio = libcoro.asyncio; const queue_mpsc = @import("queue_mpsc.zig"); +pub const Queue = @import("queue.zig").Intrusive; + +pub const Condition = struct { + const CoroResume = struct { + coro: libcoro.Frame, + + fn init() CoroResume { + return .{ .coro = libcoro.xframe() }; + } + + fn func(self: *CoroResume) libcoro.Executor.Func { + return .{ .func = CoroResume.cb, .userdata = self }; + } + + fn cb(ud: ?*anyopaque) void { + const self: *CoroResume = @ptrCast(@alignCast(ud)); + libcoro.xresume(self.coro); + } + }; + + exec: *libcoro.Executor, + waiters: Queue(libcoro.Executor.Func) = .{}, + + pub fn init() Condition { + return .{ .exec = &AsyncThread.current.executor.exec }; + } + + pub fn broadcast(self: *Condition) void { + while (self.waiters.pop()) |waiter| { + self.exec.runSoon(waiter); + } + } + + pub fn signal(self: *Condition) void { + if (self.waiters.pop()) |waiter| self.exec.runSoon(waiter); + } + + pub fn wait(self: *Condition) void { + var res = CoroResume.init(); + var cb = res.func(); + self.waiters.push(&cb); + libcoro.xsuspend(); + } +}; + pub fn Frame(comptime func: anytype) type { const Signature = stdx.meta.FnSignature(func, null); return FrameExx(func, Signature.ArgsT, Signature.ReturnT); @@ -482,6 +527,31 @@ pub const Socket = struct { }; }; +pub fn Channel(comptime T: type, capacity: usize) type { + return struct { + const Self = @This(); + const Inner = libcoro.Channel(T, .{ .capacity = capacity }); + + inner: Inner, + + pub fn init() Self { + return .{ .inner = Inner.init(&AsyncThread.current.executor.exec) }; + } + + pub fn close(self: *Self) void { + self.inner.close(); + } + + pub fn send(self: *Self, val: T) void { + self.inner.send(val) catch unreachable; + } + + pub fn recv(self: *Self) ?T { + return self.inner.recv(); + } + }; +} + pub const Mutex = struct { const VoidChannel = libcoro.Channel(void, .{ .capacity = 1 }); diff --git a/stdx/signature.zig b/stdx/signature.zig index 7058efd..6a4e06f 100644 --- a/stdx/signature.zig +++ b/stdx/signature.zig @@ -47,37 +47,28 @@ pub fn ArgsTuple(comptime funcT: anytype, comptime ArgsT: ?type) type { }); } -pub fn FnSignature(comptime func: anytype, comptime ArgsT: ?type) type { - const n_params = switch (@typeInfo(@TypeOf(func))) { - .Fn => |fn_info| fn_info.params.len, - else => compileError("FnSignature expects a function as first argument got: {}", .{@TypeOf(func)}), - }; - if (ArgsT != null) { - const n_args = switch (@typeInfo(ArgsT.?)) { - .Struct => |struct_info| struct_info.fields.len, - else => compileError("function {} need to be called with a tuple of args", .{@TypeOf(func)}), - }; - if (n_params != n_args) { - compileError("function {} expected {} args, got {}", .{ @TypeOf(func), n_params, n_args }); - } - } - return FnSignatureX(func, ArgsTuple(@TypeOf(func), ArgsT)); -} +pub const Signature = struct { + FuncT: type, + ArgsT: type, + ReturnT: type, + ReturnPayloadT: type, + ReturnErrorSet: ?type, +}; -// TODO: I think this should return a struct instead of returing at type -// this gives a better error stacktrace because here the error is delayed to when the fields are read. -fn FnSignatureX(comptime func: anytype, comptime ArgsT_: type) type { - return struct { - pub const FuncT = @TypeOf(func); - pub const ArgsT = ArgsT_; - pub const ReturnT = @TypeOf(@call(.auto, func, @as(ArgsT_, undefined))); - pub const ReturnPayloadT = switch (@typeInfo(ReturnT)) { +pub fn FnSignature(comptime func: anytype, comptime argsT_: ?type) Signature { + const argsT = ArgsTuple(@TypeOf(func), argsT_); + const return_type = @TypeOf(@call(.auto, func, @as(argsT, undefined))); + return Signature{ + .FuncT = @TypeOf(func), + .ArgsT = argsT, + .ReturnT = return_type, + .ReturnPayloadT = switch (@typeInfo(return_type)) { .ErrorUnion => |u| u.payload, - else => ReturnT, - }; - pub const ReturnErrorSet: ?type = switch (@typeInfo(ReturnT)) { + else => return_type, + }, + .ReturnErrorSet = switch (@typeInfo(return_type)) { .ErrorUnion => |u| u.error_set, else => null, - }; + }, }; }