async: add intrusive queue

all code contributed by @steeve

* add intrusive queue
* change the constructor of Channel with default AsyncThread executor

---------

Co-authored-by: Steeve Morin <steeve@zml.ai>
This commit is contained in:
Tarry Singh 2023-10-24 14:36:22 +00:00
parent 98b512c495
commit 27c8309424
4 changed files with 191 additions and 28 deletions

View File

@ -3,6 +3,7 @@ load("@rules_zig//zig:defs.bzl", "zig_library")
zig_library(
name = "async",
srcs = [
"queue.zig",
"queue_mpsc.zig",
],
import_name = "async",

101
async/queue.zig Normal file
View File

@ -0,0 +1,101 @@
const std = @import("std");
const assert = std.debug.assert;
/// An intrusive queue implementation. The type T must have a field
/// "next" of type `?*T`.
///
/// For those unaware, an intrusive variant of a data structure is one in which
/// the data type in the list has the pointer to the next element, rather
/// than a higher level "node" or "container" type. The primary benefit
/// of this (and the reason we implement this) is that it defers all memory
/// management to the caller: the data structure implementation doesn't need
/// to allocate "nodes" to contain each element. Instead, the caller provides
/// the element and how its allocated is up to them.
pub fn Intrusive(comptime T: type) type {
return struct {
const Self = @This();
/// Head is the front of the queue and tail is the back of the queue.
head: ?*T = null,
tail: ?*T = null,
/// Enqueue a new element to the back of the queue.
pub fn push(self: *Self, v: *T) void {
assert(v.next == null);
if (self.tail) |tail| {
// If we have elements in the queue, then we add a new tail.
tail.next = v;
self.tail = v;
} else {
// No elements in the queue we setup the initial state.
self.head = v;
self.tail = v;
}
}
/// Dequeue the next element from the queue.
pub fn pop(self: *Self) ?*T {
// The next element is in "head".
const next = self.head orelse return null;
// If the head and tail are equal this is the last element
// so we also set tail to null so we can now be empty.
if (self.head == self.tail) self.tail = null;
// Head is whatever is next (if we're the last element,
// this will be null);
self.head = next.next;
// We set the "next" field to null so that this element
// can be inserted again.
next.next = null;
return next;
}
/// Returns true if the queue is empty.
pub fn empty(self: *const Self) bool {
return self.head == null;
}
};
}
test Intrusive {
const testing = std.testing;
// Types
const Elem = struct {
const Self = @This();
next: ?*Self = null,
};
const Queue = Intrusive(Elem);
var q: Queue = .{};
try testing.expect(q.empty());
// Elems
var elems: [10]Elem = .{.{}} ** 10;
// One
try testing.expect(q.pop() == null);
q.push(&elems[0]);
try testing.expect(!q.empty());
try testing.expect(q.pop().? == &elems[0]);
try testing.expect(q.pop() == null);
try testing.expect(q.empty());
// Two
try testing.expect(q.pop() == null);
q.push(&elems[0]);
q.push(&elems[1]);
try testing.expect(q.pop().? == &elems[0]);
try testing.expect(q.pop().? == &elems[1]);
try testing.expect(q.pop() == null);
// Interleaved
try testing.expect(q.pop() == null);
q.push(&elems[0]);
try testing.expect(q.pop().? == &elems[0]);
q.push(&elems[1]);
try testing.expect(q.pop().? == &elems[1]);
try testing.expect(q.pop() == null);
}

View File

@ -5,6 +5,51 @@ const libcoro = @import("libcoro");
const aio = libcoro.asyncio;
const queue_mpsc = @import("queue_mpsc.zig");
pub const Queue = @import("queue.zig").Intrusive;
pub const Condition = struct {
const CoroResume = struct {
coro: libcoro.Frame,
fn init() CoroResume {
return .{ .coro = libcoro.xframe() };
}
fn func(self: *CoroResume) libcoro.Executor.Func {
return .{ .func = CoroResume.cb, .userdata = self };
}
fn cb(ud: ?*anyopaque) void {
const self: *CoroResume = @ptrCast(@alignCast(ud));
libcoro.xresume(self.coro);
}
};
exec: *libcoro.Executor,
waiters: Queue(libcoro.Executor.Func) = .{},
pub fn init() Condition {
return .{ .exec = &AsyncThread.current.executor.exec };
}
pub fn broadcast(self: *Condition) void {
while (self.waiters.pop()) |waiter| {
self.exec.runSoon(waiter);
}
}
pub fn signal(self: *Condition) void {
if (self.waiters.pop()) |waiter| self.exec.runSoon(waiter);
}
pub fn wait(self: *Condition) void {
var res = CoroResume.init();
var cb = res.func();
self.waiters.push(&cb);
libcoro.xsuspend();
}
};
pub fn Frame(comptime func: anytype) type {
const Signature = stdx.meta.FnSignature(func, null);
return FrameExx(func, Signature.ArgsT, Signature.ReturnT);
@ -482,6 +527,31 @@ pub const Socket = struct {
};
};
pub fn Channel(comptime T: type, capacity: usize) type {
return struct {
const Self = @This();
const Inner = libcoro.Channel(T, .{ .capacity = capacity });
inner: Inner,
pub fn init() Self {
return .{ .inner = Inner.init(&AsyncThread.current.executor.exec) };
}
pub fn close(self: *Self) void {
self.inner.close();
}
pub fn send(self: *Self, val: T) void {
self.inner.send(val) catch unreachable;
}
pub fn recv(self: *Self) ?T {
return self.inner.recv();
}
};
}
pub const Mutex = struct {
const VoidChannel = libcoro.Channel(void, .{ .capacity = 1 });

View File

@ -47,37 +47,28 @@ pub fn ArgsTuple(comptime funcT: anytype, comptime ArgsT: ?type) type {
});
}
pub fn FnSignature(comptime func: anytype, comptime ArgsT: ?type) type {
const n_params = switch (@typeInfo(@TypeOf(func))) {
.Fn => |fn_info| fn_info.params.len,
else => compileError("FnSignature expects a function as first argument got: {}", .{@TypeOf(func)}),
};
if (ArgsT != null) {
const n_args = switch (@typeInfo(ArgsT.?)) {
.Struct => |struct_info| struct_info.fields.len,
else => compileError("function {} need to be called with a tuple of args", .{@TypeOf(func)}),
};
if (n_params != n_args) {
compileError("function {} expected {} args, got {}", .{ @TypeOf(func), n_params, n_args });
}
}
return FnSignatureX(func, ArgsTuple(@TypeOf(func), ArgsT));
}
pub const Signature = struct {
FuncT: type,
ArgsT: type,
ReturnT: type,
ReturnPayloadT: type,
ReturnErrorSet: ?type,
};
// TODO: I think this should return a struct instead of returing at type
// this gives a better error stacktrace because here the error is delayed to when the fields are read.
fn FnSignatureX(comptime func: anytype, comptime ArgsT_: type) type {
return struct {
pub const FuncT = @TypeOf(func);
pub const ArgsT = ArgsT_;
pub const ReturnT = @TypeOf(@call(.auto, func, @as(ArgsT_, undefined)));
pub const ReturnPayloadT = switch (@typeInfo(ReturnT)) {
pub fn FnSignature(comptime func: anytype, comptime argsT_: ?type) Signature {
const argsT = ArgsTuple(@TypeOf(func), argsT_);
const return_type = @TypeOf(@call(.auto, func, @as(argsT, undefined)));
return Signature{
.FuncT = @TypeOf(func),
.ArgsT = argsT,
.ReturnT = return_type,
.ReturnPayloadT = switch (@typeInfo(return_type)) {
.ErrorUnion => |u| u.payload,
else => ReturnT,
};
pub const ReturnErrorSet: ?type = switch (@typeInfo(ReturnT)) {
else => return_type,
},
.ReturnErrorSet = switch (@typeInfo(return_type)) {
.ErrorUnion => |u| u.error_set,
else => null,
};
},
};
}