Vendor zigcoro and unify APIs; rework internals for stdx.meta compatibility, add Channel.try_send/try_recv methods, support dynamically sized channels with comptime capacity, and introduce PoolStackAllocator for coroutine stack allocation.
This commit is contained in:
parent
68dbc290e9
commit
5b8e42f9a9
@ -103,7 +103,6 @@ bazel_dep(name = "xla", version = "20250103.0-5f1fe6a")
|
||||
tsl = use_extension("@xla//:tsl.bzl", "tsl")
|
||||
use_repo(tsl, "tsl")
|
||||
|
||||
bazel_dep(name = "zigcoro", version = "20240829.0-fc1db29")
|
||||
bazel_dep(name = "sentencepiece", version = "20240618.0-d7ace0a")
|
||||
bazel_dep(name = "zig-protobuf", version = "20240722.0-c644d11")
|
||||
bazel_dep(name = "zig-yaml", version = "20240903.0-83d5fdf")
|
||||
|
||||
@ -3,15 +3,18 @@ load("@rules_zig//zig:defs.bzl", "zig_library")
|
||||
zig_library(
|
||||
name = "async",
|
||||
srcs = [
|
||||
"queue.zig",
|
||||
"queue_mpsc.zig",
|
||||
"asyncio.zig",
|
||||
"channel.zig",
|
||||
"coro.zig",
|
||||
"coro_base.zig",
|
||||
"executor.zig",
|
||||
"stack.zig",
|
||||
],
|
||||
import_name = "async",
|
||||
main = "zigcoro.zig",
|
||||
extra_srcs = glob(["asm/*.s"]),
|
||||
main = "async.zig",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//stdx",
|
||||
"@libxev//:xev",
|
||||
"@zigcoro//:libcoro",
|
||||
],
|
||||
)
|
||||
|
||||
49
async/asm/coro_aarch64.s
Normal file
49
async/asm/coro_aarch64.s
Normal file
@ -0,0 +1,49 @@
|
||||
# See arm64 procedure call standard
|
||||
# https://github.com/ARM-software/abi-aa/releases
|
||||
.global _libcoro_stack_swap
|
||||
_libcoro_stack_swap:
|
||||
.global libcoro_stack_swap
|
||||
libcoro_stack_swap:
|
||||
|
||||
# Store caller registers on the current stack
|
||||
# Each register requires 8 bytes, there are 20 registers to save
|
||||
sub sp, sp, 0xa0
|
||||
# d* are the 128-bit floating point registers, the lower 64 bits are preserved
|
||||
stp d8, d9, [sp, 0x00]
|
||||
stp d10, d11, [sp, 0x10]
|
||||
stp d12, d13, [sp, 0x20]
|
||||
stp d14, d15, [sp, 0x30]
|
||||
# x* are the scratch registers
|
||||
stp x19, x20, [sp, 0x40]
|
||||
stp x21, x22, [sp, 0x50]
|
||||
stp x23, x24, [sp, 0x60]
|
||||
stp x25, x26, [sp, 0x70]
|
||||
stp x27, x28, [sp, 0x80]
|
||||
# fp=frame pointer, lr=link register
|
||||
stp fp, lr, [sp, 0x90]
|
||||
|
||||
# Modify stack pointer of current coroutine (x0, first argument)
|
||||
mov x2, sp
|
||||
str x2, [x0, 0]
|
||||
|
||||
# Load stack pointer from target coroutine (x1, second argument)
|
||||
ldr x9, [x1, 0]
|
||||
mov sp, x9
|
||||
|
||||
# Restore target registers
|
||||
ldp d8, d9, [sp, 0x00]
|
||||
ldp d10, d11, [sp, -0x10]
|
||||
ldp d12, d13, [sp, 0x20]
|
||||
ldp d14, d15, [sp, 0x30]
|
||||
ldp x19, x20, [sp, 0x40]
|
||||
ldp x21, x22, [sp, 0x50]
|
||||
ldp x23, x24, [sp, 0x60]
|
||||
ldp x25, x26, [sp, 0x70]
|
||||
ldp x27, x28, [sp, 0x80]
|
||||
ldp fp, lr, [sp, 0x90]
|
||||
|
||||
# Pop stack frame
|
||||
add sp, sp, 0xa0
|
||||
|
||||
# jump to lr
|
||||
ret
|
||||
77
async/asm/coro_riscv64.s
Normal file
77
async/asm/coro_riscv64.s
Normal file
@ -0,0 +1,77 @@
|
||||
# See riscv procedure calling convention
|
||||
# https://github.com/riscv-non-isa/riscv-elf-psabi-doc
|
||||
.global libcoro_stack_swap
|
||||
libcoro_stack_swap:
|
||||
|
||||
# Store caller registers on the current stack
|
||||
# Each register requires 8 bytes, there are 25 registers to save
|
||||
addi sp, sp, -0xc8
|
||||
# s* are integer callee-saved registers
|
||||
sd s0, 0x00(sp)
|
||||
sd s1, 0x08(sp)
|
||||
sd s2, 0x10(sp)
|
||||
sd s3, 0x18(sp)
|
||||
sd s4, 0x20(sp)
|
||||
sd s5, 0x28(sp)
|
||||
sd s6, 0x30(sp)
|
||||
sd s7, 0x38(sp)
|
||||
sd s8, 0x40(sp)
|
||||
sd s9, 0x48(sp)
|
||||
sd s10, 0x50(sp)
|
||||
sd s11, 0x58(sp)
|
||||
# fs* are float callee-saved registers
|
||||
fsd fs0, 0x60(sp)
|
||||
fsd fs1, 0x68(sp)
|
||||
fsd fs2, 0x70(sp)
|
||||
fsd fs3, 0x78(sp)
|
||||
fsd fs4, 0x80(sp)
|
||||
fsd fs5, 0x88(sp)
|
||||
fsd fs6, 0x90(sp)
|
||||
fsd fs7, 0x98(sp)
|
||||
fsd fs8, 0xa0(sp)
|
||||
fsd fs9, 0xa8(sp)
|
||||
fsd fs10, 0xb0(sp)
|
||||
fsd fs11, 0xb8(sp)
|
||||
# ra=return address
|
||||
sd ra, 0xc0(sp)
|
||||
|
||||
# Modify stack pointer of current coroutine (a0, first argument)
|
||||
mv t0, sp
|
||||
sd t0, 0(a0)
|
||||
|
||||
# Load stack pointer from target coroutine (a1, second argument)
|
||||
ld t1, 0(a1)
|
||||
mv sp, t1
|
||||
|
||||
# Restore
|
||||
ld s0, 0x00(sp)
|
||||
ld s1, 0x08(sp)
|
||||
ld s2, 0x10(sp)
|
||||
ld s3, 0x18(sp)
|
||||
ld s4, 0x20(sp)
|
||||
ld s5, 0x28(sp)
|
||||
ld s6, 0x30(sp)
|
||||
ld s7, 0x38(sp)
|
||||
ld s8, 0x40(sp)
|
||||
ld s9, 0x48(sp)
|
||||
ld s10, 0x50(sp)
|
||||
ld s11, 0x58(sp)
|
||||
fld fs0, 0x60(sp)
|
||||
fld fs1, 0x68(sp)
|
||||
fld fs2, 0x70(sp)
|
||||
fld fs3, 0x78(sp)
|
||||
fld fs4, 0x80(sp)
|
||||
fld fs5, 0x88(sp)
|
||||
fld fs6, 0x90(sp)
|
||||
fld fs7, 0x98(sp)
|
||||
fld fs8, 0xa0(sp)
|
||||
fld fs9, 0xa8(sp)
|
||||
fld fs10, 0xb0(sp)
|
||||
fld fs11, 0xb8(sp)
|
||||
ld ra, 0xc0(sp)
|
||||
|
||||
# Pop stack frame
|
||||
addi sp, sp, 0xc8
|
||||
|
||||
# jump to ra
|
||||
ret
|
||||
30
async/asm/coro_x86_64.s
Normal file
30
async/asm/coro_x86_64.s
Normal file
@ -0,0 +1,30 @@
|
||||
# See System V x86-64 calling convention
|
||||
# https://gitlab.com/x86-psABIs/x86-64-ABI
|
||||
.global _libcoro_stack_swap
|
||||
_libcoro_stack_swap:
|
||||
.global libcoro_stack_swap
|
||||
libcoro_stack_swap:
|
||||
# Store caller registers on the current stack
|
||||
pushq %rbp
|
||||
pushq %rbx
|
||||
pushq %r12
|
||||
pushq %r13
|
||||
pushq %r14
|
||||
pushq %r15
|
||||
|
||||
# Modify stack pointer of current coroutine (rdi, first argument)
|
||||
movq %rsp, (%rdi)
|
||||
|
||||
# Load stack pointer from target coroutine (rsi, second argument)
|
||||
movq (%rsi), %rsp
|
||||
|
||||
# Restore target registers
|
||||
popq %r15
|
||||
popq %r14
|
||||
popq %r13
|
||||
popq %r12
|
||||
popq %rbx
|
||||
popq %rbp
|
||||
|
||||
# jump
|
||||
retq
|
||||
65
async/asm/coro_x86_64_windows.s
Normal file
65
async/asm/coro_x86_64_windows.s
Normal file
@ -0,0 +1,65 @@
|
||||
# See Microsoft x86-64 calling convention
|
||||
# https://learn.microsoft.com/en-us/cpp/build/x64-calling-convention
|
||||
.global libcoro_stack_swap
|
||||
libcoro_stack_swap:
|
||||
# Store Windows stack information
|
||||
pushq %gs:0x10
|
||||
pushq %gs:0x08
|
||||
|
||||
# Store caller registers
|
||||
pushq %rbp
|
||||
pushq %rbx
|
||||
pushq %rdi
|
||||
pushq %rsi
|
||||
pushq %r12
|
||||
pushq %r13
|
||||
pushq %r14
|
||||
pushq %r15
|
||||
|
||||
# Store caller simd/float registers
|
||||
subq $0xa0, %rsp
|
||||
movups %xmm6, 0x00(%rsp)
|
||||
movups %xmm7, 0x10(%rsp)
|
||||
movups %xmm8, 0x20(%rsp)
|
||||
movups %xmm9, 0x30(%rsp)
|
||||
movups %xmm10, 0x40(%rsp)
|
||||
movups %xmm11, 0x50(%rsp)
|
||||
movups %xmm12, 0x60(%rsp)
|
||||
movups %xmm13, 0x70(%rsp)
|
||||
movups %xmm14, 0x80(%rsp)
|
||||
movups %xmm15, 0x90(%rsp)
|
||||
|
||||
# Modify stack pointer of current coroutine (rcx, first argument)
|
||||
movq %rsp, (%rcx)
|
||||
|
||||
# Load stack pointer from target coroutine (rdx, second argument)
|
||||
movq (%rdx), %rsp
|
||||
|
||||
# Restore target simd/float registers
|
||||
movups 0x00(%rsp), %xmm6
|
||||
movups 0x10(%rsp), %xmm7
|
||||
movups 0x20(%rsp), %xmm8
|
||||
movups 0x30(%rsp), %xmm9
|
||||
movups 0x40(%rsp), %xmm10
|
||||
movups 0x50(%rsp), %xmm11
|
||||
movups 0x60(%rsp), %xmm12
|
||||
movups 0x70(%rsp), %xmm13
|
||||
movups 0x80(%rsp), %xmm14
|
||||
movups 0x90(%rsp), %xmm15
|
||||
addq $0xa0, %rsp
|
||||
|
||||
# Restore target registers
|
||||
popq %r15
|
||||
popq %r14
|
||||
popq %r13
|
||||
popq %r12
|
||||
popq %rsi
|
||||
popq %rdi
|
||||
popq %rbx
|
||||
popq %rbp
|
||||
|
||||
# Restore Windows stack information
|
||||
popq %gs:0x08
|
||||
popq %gs:0x10
|
||||
|
||||
retq
|
||||
@ -1,52 +1,29 @@
|
||||
const std = @import("std");
|
||||
const stdx = @import("stdx");
|
||||
const xev = @import("xev");
|
||||
const libcoro = @import("libcoro");
|
||||
const aio = libcoro.asyncio;
|
||||
const queue_mpsc = @import("queue_mpsc.zig");
|
||||
|
||||
pub const Queue = @import("queue.zig").Intrusive;
|
||||
const coro = @import("coro.zig");
|
||||
const executor = @import("executor.zig");
|
||||
const channel_mod = @import("channel.zig");
|
||||
const aio = @import("asyncio.zig");
|
||||
const stack = @import("stack.zig");
|
||||
|
||||
pub const Condition = struct {
|
||||
const CoroResume = struct {
|
||||
coro: libcoro.Frame,
|
||||
|
||||
fn init() CoroResume {
|
||||
return .{ .coro = libcoro.xframe() };
|
||||
}
|
||||
|
||||
fn func(self: *CoroResume) libcoro.Executor.Func {
|
||||
return .{ .func = CoroResume.cb, .userdata = self };
|
||||
}
|
||||
|
||||
fn cb(ud: ?*anyopaque) void {
|
||||
const self: *CoroResume = @ptrCast(@alignCast(ud));
|
||||
libcoro.xresume(self.coro);
|
||||
}
|
||||
};
|
||||
|
||||
exec: *libcoro.Executor,
|
||||
waiters: Queue(libcoro.Executor.Func) = .{},
|
||||
inner: executor.Condition,
|
||||
|
||||
pub fn init() Condition {
|
||||
return .{ .exec = &AsyncThread.current.executor.exec };
|
||||
return .{ .inner = executor.Condition.init(&AsyncThread.current.executor.exec) };
|
||||
}
|
||||
|
||||
pub fn broadcast(self: *Condition) void {
|
||||
while (self.waiters.pop()) |waiter| {
|
||||
self.exec.runSoon(waiter);
|
||||
}
|
||||
self.inner.broadcast();
|
||||
}
|
||||
|
||||
pub fn signal(self: *Condition) void {
|
||||
if (self.waiters.pop()) |waiter| self.exec.runSoon(waiter);
|
||||
self.inner.signal();
|
||||
}
|
||||
|
||||
pub fn wait(self: *Condition) void {
|
||||
var res = CoroResume.init();
|
||||
var cb = res.func();
|
||||
self.waiters.push(&cb);
|
||||
libcoro.xsuspend();
|
||||
self.inner.wait();
|
||||
}
|
||||
};
|
||||
|
||||
@ -63,7 +40,7 @@ pub fn FrameEx(comptime func: anytype, comptime argsT: type) type {
|
||||
fn FrameExx(comptime func: anytype, comptime argsT: type, comptime returnT: type) type {
|
||||
return struct {
|
||||
const Self = @This();
|
||||
const FrameT = libcoro.FrameT(func, .{ .ArgsT = argsT });
|
||||
const FrameT = coro.FrameT(func, argsT);
|
||||
|
||||
inner: FrameT,
|
||||
|
||||
@ -71,18 +48,20 @@ fn FrameExx(comptime func: anytype, comptime argsT: type, comptime returnT: type
|
||||
pub const await_ = awaitt;
|
||||
pub fn awaitt(self: *Self) returnT {
|
||||
defer {
|
||||
AsyncThread.current.stack_allocator.destroy(&self.inner._frame.stack);
|
||||
self.inner.deinit();
|
||||
self.* = undefined;
|
||||
}
|
||||
return libcoro.xawait(self.inner);
|
||||
return coro.xawait(self.inner);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub fn asyncc(comptime func: anytype, args: anytype) !FrameEx(func, @TypeOf(args)) {
|
||||
const Signature = stdx.meta.FnSignature(func, @TypeOf(args));
|
||||
const new_stack = try AsyncThread.current.stack_allocator.create();
|
||||
return .{
|
||||
.inner = try aio.xasync(func, @as(Signature.ArgsT, args), null),
|
||||
.inner = try aio.xasync(func, @as(Signature.ArgsT, args), new_stack),
|
||||
};
|
||||
}
|
||||
|
||||
@ -120,12 +99,12 @@ pub fn sleep(ms: u64) !void {
|
||||
|
||||
pub const threading = struct {
|
||||
const Waiter = struct {
|
||||
frame: libcoro.Frame,
|
||||
frame: coro.Frame,
|
||||
thread: *const AsyncThread,
|
||||
next: ?*Waiter = null,
|
||||
};
|
||||
|
||||
const WaiterQueue = queue_mpsc.Intrusive(Waiter);
|
||||
const WaiterQueue = stdx.queue.MPSC(Waiter);
|
||||
|
||||
pub const ResetEventSingle = struct {
|
||||
const State = union(enum) {
|
||||
@ -140,7 +119,7 @@ pub const threading = struct {
|
||||
waiter: std.atomic.Value(*const State) = std.atomic.Value(*const State).init(&State.unset_state),
|
||||
|
||||
pub fn isSet(self: *ResetEventSingle) bool {
|
||||
return self.waiter.load(&State.set_state, .monotonic) == &State.set_state;
|
||||
return self.waiter.load(.monotonic) == &State.set_state;
|
||||
}
|
||||
|
||||
pub fn reset(self: *ResetEventSingle) void {
|
||||
@ -159,72 +138,26 @@ pub const threading = struct {
|
||||
|
||||
pub fn wait(self: *ResetEventSingle) void {
|
||||
var waiter: Waiter = .{
|
||||
.frame = libcoro.xframe(),
|
||||
.frame = coro.xframe(),
|
||||
.thread = AsyncThread.current,
|
||||
};
|
||||
var new_state: State = .{
|
||||
.waiting = &waiter,
|
||||
};
|
||||
if (self.waiter.cmpxchgStrong(&State.unset_state, &new_state, .monotonic, .monotonic) == null) {
|
||||
libcoro.xsuspend();
|
||||
while (self.isSet() == false) {
|
||||
coro.xsuspend();
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
pub const FrameAllocator = struct {
|
||||
const Item = [1 * 1024 * 1024]u8;
|
||||
const FramePool = std.heap.MemoryPool(Item);
|
||||
|
||||
pool: FramePool,
|
||||
|
||||
pub fn init(allocator_: std.mem.Allocator) !FrameAllocator {
|
||||
return .{
|
||||
.pool = FramePool.init(allocator_),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn allocator(self: *FrameAllocator) std.mem.Allocator {
|
||||
return .{
|
||||
.ptr = self,
|
||||
.vtable = &.{
|
||||
.alloc = alloc,
|
||||
.resize = resize,
|
||||
.free = free,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
fn alloc(ctx: *anyopaque, len: usize, ptr_align: u8, ret_addr: usize) ?[*]u8 {
|
||||
_ = ptr_align;
|
||||
_ = ret_addr;
|
||||
stdx.debug.assert(len <= Item.len, "Should always pass a length of less than {d} bytes", .{Item.len});
|
||||
const self: *FrameAllocator = @ptrCast(@alignCast(ctx));
|
||||
const stack = self.pool.create() catch return null;
|
||||
return @ptrCast(stack);
|
||||
}
|
||||
|
||||
fn resize(ctx: *anyopaque, buf: []u8, buf_align: u8, new_len: usize, ret_addr: usize) bool {
|
||||
_ = ctx;
|
||||
_ = buf;
|
||||
_ = buf_align;
|
||||
_ = ret_addr;
|
||||
return new_len <= Item.len;
|
||||
}
|
||||
|
||||
fn free(ctx: *anyopaque, buf: []u8, buf_align: u8, ret_addr: usize) void {
|
||||
_ = buf_align;
|
||||
_ = ret_addr;
|
||||
const self: *FrameAllocator = @ptrCast(@alignCast(ctx));
|
||||
const v: *align(8) Item = @ptrCast(@alignCast(buf.ptr));
|
||||
self.pool.destroy(v);
|
||||
}
|
||||
};
|
||||
|
||||
pub const AsyncThread = struct {
|
||||
threadlocal var current: *const AsyncThread = undefined;
|
||||
|
||||
executor: *aio.Executor,
|
||||
stack_allocator: *stack.StackAllocator,
|
||||
loop: *xev.Loop,
|
||||
thread_pool: *xev.ThreadPool,
|
||||
async_notifier: *xev.Async,
|
||||
@ -236,7 +169,7 @@ pub const AsyncThread = struct {
|
||||
|
||||
fn waker_cb(q: ?*threading.WaiterQueue, _: *xev.Loop, _: *xev.Completion, _: xev.Async.WaitError!void) xev.CallbackAction {
|
||||
while (q.?.pop()) |waiter| {
|
||||
libcoro.xresume(waiter.frame);
|
||||
coro.xresume(waiter.frame);
|
||||
}
|
||||
return .rearm;
|
||||
}
|
||||
@ -251,10 +184,9 @@ pub const AsyncThread = struct {
|
||||
var loop = try xev.Loop.init(.{
|
||||
.thread_pool = &thread_pool,
|
||||
});
|
||||
|
||||
defer loop.deinit();
|
||||
|
||||
var executor = aio.Executor.init(&loop);
|
||||
var executor_ = aio.Executor.init(&loop);
|
||||
|
||||
var async_notifier = try xev.Async.init();
|
||||
defer async_notifier.deinit();
|
||||
@ -265,20 +197,23 @@ pub const AsyncThread = struct {
|
||||
var c: xev.Completion = undefined;
|
||||
async_notifier.wait(&loop, &c, threading.WaiterQueue, &waiters_queue, &waker_cb);
|
||||
|
||||
aio.initEnv(.{
|
||||
.stack_allocator = allocator,
|
||||
.default_stack_size = 1 * 1024 * 1024,
|
||||
});
|
||||
var stack_allocator = stack.StackAllocator.init(allocator);
|
||||
defer stack_allocator.deinit();
|
||||
|
||||
AsyncThread.current = &.{
|
||||
.executor = &executor,
|
||||
.executor = &executor_,
|
||||
.stack_allocator = &stack_allocator,
|
||||
.loop = &loop,
|
||||
.thread_pool = &thread_pool,
|
||||
.async_notifier = &async_notifier,
|
||||
.waiters_queue = &waiters_queue,
|
||||
};
|
||||
|
||||
return try aio.run(&executor, mainFunc, .{}, null);
|
||||
// allocate the main coroutine stack, on the current thread's stack!
|
||||
var mainStackData: stack.Stack.Data = undefined;
|
||||
const mainStack = stack.Stack.init(&mainStackData);
|
||||
|
||||
return try aio.run(&executor_, mainFunc, .{}, mainStack);
|
||||
}
|
||||
};
|
||||
|
||||
@ -530,7 +465,7 @@ pub const Socket = struct {
|
||||
pub fn Channel(comptime T: type, capacity: usize) type {
|
||||
return struct {
|
||||
const Self = @This();
|
||||
const Inner = libcoro.Channel(T, .{ .capacity = capacity });
|
||||
const Inner = channel_mod.Channel(T, capacity);
|
||||
|
||||
inner: Inner,
|
||||
|
||||
@ -538,10 +473,18 @@ pub fn Channel(comptime T: type, capacity: usize) type {
|
||||
return .{ .inner = Inner.init(&AsyncThread.current.executor.exec) };
|
||||
}
|
||||
|
||||
pub fn initWithLen(len: usize) Self {
|
||||
return .{ .inner = Inner.initWithLen(&AsyncThread.current.executor.exec, len) };
|
||||
}
|
||||
|
||||
pub fn close(self: *Self) void {
|
||||
self.inner.close();
|
||||
}
|
||||
|
||||
pub fn try_send(self: *Self, val: T) bool {
|
||||
return self.inner.try_send(val);
|
||||
}
|
||||
|
||||
pub fn send(self: *Self, val: T) void {
|
||||
self.inner.send(val) catch unreachable;
|
||||
}
|
||||
@ -549,11 +492,19 @@ pub fn Channel(comptime T: type, capacity: usize) type {
|
||||
pub fn recv(self: *Self) ?T {
|
||||
return self.inner.recv();
|
||||
}
|
||||
|
||||
pub fn try_recv(self: *Self) ?T {
|
||||
return self.inner.try_recv();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub fn channel(comptime T: type, len: usize, comptime capacity: usize) Channel(T, capacity) {
|
||||
return Channel(T, capacity).initWithLen(len);
|
||||
}
|
||||
|
||||
pub const Mutex = struct {
|
||||
const VoidChannel = libcoro.Channel(void, .{ .capacity = 1 });
|
||||
const VoidChannel = coro.Channel(void, 1);
|
||||
|
||||
inner: VoidChannel,
|
||||
|
||||
624
async/asyncio.zig
Normal file
624
async/asyncio.zig
Normal file
@ -0,0 +1,624 @@
|
||||
const std = @import("std");
|
||||
const stdx = @import("stdx");
|
||||
const xev = @import("xev");
|
||||
const libcoro = @import("coro.zig");
|
||||
const CoroExecutor = @import("executor.zig").Executor;
|
||||
|
||||
pub const xasync = libcoro.xasync;
|
||||
pub const xawait = libcoro.xawait;
|
||||
|
||||
const Frame = libcoro.Frame;
|
||||
|
||||
const Env = struct {
|
||||
exec: ?*Executor = null,
|
||||
};
|
||||
|
||||
pub const EnvArg = struct {
|
||||
executor: ?*Executor = null,
|
||||
stack_allocator: ?std.mem.Allocator = null,
|
||||
default_stack_size: ?usize = null,
|
||||
};
|
||||
|
||||
threadlocal var env: Env = .{};
|
||||
|
||||
pub fn initEnv(e: EnvArg) void {
|
||||
env = .{ .exec = e.executor };
|
||||
libcoro.initEnv(.{
|
||||
.stack_allocator = e.stack_allocator,
|
||||
.default_stack_size = e.default_stack_size,
|
||||
.executor = if (e.executor) |ex| &ex.exec else null,
|
||||
});
|
||||
}
|
||||
|
||||
pub const Executor = struct {
|
||||
loop: *xev.Loop,
|
||||
exec: CoroExecutor = .{},
|
||||
|
||||
pub fn init(loop: *xev.Loop) Executor {
|
||||
return .{ .loop = loop };
|
||||
}
|
||||
|
||||
fn tick(self: *Executor) !void {
|
||||
try self.loop.run(.once);
|
||||
_ = self.exec.tick();
|
||||
}
|
||||
};
|
||||
|
||||
/// Run a coroutine to completion.
|
||||
/// Must be called from "root", outside of any created coroutine.
|
||||
pub fn run(
|
||||
exec: *Executor,
|
||||
comptime func: anytype,
|
||||
args: anytype,
|
||||
stack: anytype,
|
||||
) !stdx.meta.FnSignature(func, @TypeOf(args)).ReturnPayloadT {
|
||||
stdx.debug.assert(libcoro.inCoro() == false, "Not in a corouine", .{});
|
||||
const frame = try xasync(func, args, stack);
|
||||
defer frame.deinit();
|
||||
try runCoro(exec, frame);
|
||||
return xawait(frame);
|
||||
}
|
||||
|
||||
/// Run a coroutine to completion.
|
||||
/// Must be called from "root", outside of any created coroutine.
|
||||
fn runCoro(exec: *Executor, frame: anytype) !void {
|
||||
const f = frame.frame();
|
||||
if (f.status == .Start) {
|
||||
libcoro.xresume(f);
|
||||
}
|
||||
while (f.status != .Done) {
|
||||
try exec.tick();
|
||||
}
|
||||
}
|
||||
|
||||
const SleepResult = xev.Timer.RunError!void;
|
||||
|
||||
pub fn sleep(exec: *Executor, ms: u64) !void {
|
||||
const loop = exec.loop;
|
||||
const Data = XCallback(SleepResult);
|
||||
|
||||
var data = Data.init();
|
||||
const w = try xev.Timer.init();
|
||||
defer w.deinit();
|
||||
var c: xev.Completion = .{};
|
||||
w.run(loop, &c, ms, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
|
||||
fn waitForCompletionOutsideCoro(exec: *Executor, c: *xev.Completion) !void {
|
||||
@setCold(true);
|
||||
while (c.state() != .dead) {
|
||||
try exec.tick();
|
||||
}
|
||||
}
|
||||
|
||||
fn waitForCompletionInCoro(c: *xev.Completion) void {
|
||||
while (c.state() != .dead) {
|
||||
libcoro.xsuspend();
|
||||
}
|
||||
}
|
||||
|
||||
fn waitForCompletion(exec: *Executor, c: *xev.Completion) !void {
|
||||
if (libcoro.inCoro()) {
|
||||
waitForCompletionInCoro(c);
|
||||
} else {
|
||||
try waitForCompletionOutsideCoro(exec, c);
|
||||
}
|
||||
}
|
||||
|
||||
pub const TCP = struct {
|
||||
const Self = @This();
|
||||
|
||||
exec: *Executor,
|
||||
tcp: xev.TCP,
|
||||
|
||||
pub usingnamespace Stream(Self, xev.TCP, .{
|
||||
.close = true,
|
||||
.read = .recv,
|
||||
.write = .send,
|
||||
});
|
||||
|
||||
pub fn init(exec: *Executor, tcp: xev.TCP) Self {
|
||||
return .{ .exec = exec, .tcp = tcp };
|
||||
}
|
||||
|
||||
fn stream(self: Self) xev.TCP {
|
||||
return self.tcp;
|
||||
}
|
||||
|
||||
pub fn accept(self: Self) !Self {
|
||||
const AcceptResult = xev.TCP.AcceptError!xev.TCP;
|
||||
const Data = XCallback(AcceptResult);
|
||||
|
||||
const loop = self.exec.loop;
|
||||
|
||||
var data = Data.init();
|
||||
var c: xev.Completion = .{};
|
||||
self.tcp.accept(loop, &c, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
const result = try data.result;
|
||||
return .{ .exec = self.exec, .tcp = result };
|
||||
}
|
||||
|
||||
const ConnectResult = xev.TCP.ConnectError!void;
|
||||
pub fn connect(self: Self, addr: std.net.Address) !void {
|
||||
const ResultT = ConnectResult;
|
||||
const Data = struct {
|
||||
result: ResultT = undefined,
|
||||
frame: ?Frame = null,
|
||||
|
||||
fn callback(
|
||||
userdata: ?*@This(),
|
||||
l: *xev.Loop,
|
||||
c: *xev.Completion,
|
||||
s: xev.TCP,
|
||||
result: ResultT,
|
||||
) xev.CallbackAction {
|
||||
_ = l;
|
||||
_ = c;
|
||||
_ = s;
|
||||
const data = userdata.?;
|
||||
data.result = result;
|
||||
if (data.frame != null) libcoro.xresume(data.frame.?);
|
||||
return .disarm;
|
||||
}
|
||||
};
|
||||
|
||||
var data: Data = .{ .frame = libcoro.xframe() };
|
||||
const loop = self.exec.loop;
|
||||
var c: xev.Completion = .{};
|
||||
self.tcp.connect(loop, &c, addr, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
|
||||
const ShutdownResult = xev.TCP.ShutdownError!void;
|
||||
pub fn shutdown(self: Self) ShutdownResult {
|
||||
const ResultT = ShutdownResult;
|
||||
const Data = struct {
|
||||
result: ResultT = undefined,
|
||||
frame: ?Frame = null,
|
||||
|
||||
fn callback(
|
||||
userdata: ?*@This(),
|
||||
l: *xev.Loop,
|
||||
c: *xev.Completion,
|
||||
s: xev.TCP,
|
||||
result: ResultT,
|
||||
) xev.CallbackAction {
|
||||
_ = l;
|
||||
_ = c;
|
||||
_ = s;
|
||||
const data = userdata.?;
|
||||
data.result = result;
|
||||
if (data.frame != null) libcoro.xresume(data.frame.?);
|
||||
return .disarm;
|
||||
}
|
||||
};
|
||||
|
||||
var data: Data = .{ .frame = libcoro.xframe() };
|
||||
const loop = self.exec.loop;
|
||||
var c: xev.Completion = .{};
|
||||
self.tcp.shutdown(loop, &c, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
};
|
||||
|
||||
fn Stream(comptime T: type, comptime StreamT: type, comptime options: xev.stream.Options) type {
|
||||
return struct {
|
||||
pub usingnamespace if (options.close) Closeable(T, StreamT) else struct {};
|
||||
pub usingnamespace if (options.read != .none) Readable(T, StreamT) else struct {};
|
||||
pub usingnamespace if (options.write != .none) Writeable(T, StreamT) else struct {};
|
||||
};
|
||||
}
|
||||
|
||||
fn Closeable(comptime T: type, comptime StreamT: type) type {
|
||||
return struct {
|
||||
const Self = T;
|
||||
const CloseResult = xev.CloseError!void;
|
||||
pub fn close(self: Self) !void {
|
||||
const ResultT = CloseResult;
|
||||
const Data = struct {
|
||||
result: ResultT = undefined,
|
||||
frame: ?Frame = null,
|
||||
|
||||
fn callback(
|
||||
userdata: ?*@This(),
|
||||
l: *xev.Loop,
|
||||
c: *xev.Completion,
|
||||
s: StreamT,
|
||||
result: ResultT,
|
||||
) xev.CallbackAction {
|
||||
_ = l;
|
||||
_ = c;
|
||||
_ = s;
|
||||
const data = userdata.?;
|
||||
data.result = result;
|
||||
if (data.frame != null) libcoro.xresume(data.frame.?);
|
||||
return .disarm;
|
||||
}
|
||||
};
|
||||
|
||||
var data: Data = .{ .frame = libcoro.xframe() };
|
||||
|
||||
const loop = self.exec.loop;
|
||||
var c: xev.Completion = .{};
|
||||
self.stream().close(loop, &c, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn Readable(comptime T: type, comptime StreamT: type) type {
|
||||
return struct {
|
||||
const Self = T;
|
||||
const ReadResult = xev.ReadError!usize;
|
||||
pub fn read(self: Self, buf: xev.ReadBuffer) !usize {
|
||||
const ResultT = ReadResult;
|
||||
const Data = struct {
|
||||
result: ResultT = undefined,
|
||||
frame: ?Frame = null,
|
||||
|
||||
fn callback(
|
||||
userdata: ?*@This(),
|
||||
l: *xev.Loop,
|
||||
c: *xev.Completion,
|
||||
s: StreamT,
|
||||
b: xev.ReadBuffer,
|
||||
result: ResultT,
|
||||
) xev.CallbackAction {
|
||||
_ = l;
|
||||
_ = c;
|
||||
_ = s;
|
||||
_ = b;
|
||||
const data = userdata.?;
|
||||
data.result = result;
|
||||
if (data.frame != null) libcoro.xresume(data.frame.?);
|
||||
return .disarm;
|
||||
}
|
||||
};
|
||||
|
||||
var data: Data = .{ .frame = libcoro.xframe() };
|
||||
|
||||
const loop = self.exec.loop;
|
||||
var c: xev.Completion = .{};
|
||||
self.stream().read(loop, &c, buf, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
fn Writeable(comptime T: type, comptime StreamT: type) type {
|
||||
return struct {
|
||||
const Self = T;
|
||||
const WriteResult = xev.WriteError!usize;
|
||||
pub fn write(self: Self, buf: xev.WriteBuffer) !usize {
|
||||
const ResultT = WriteResult;
|
||||
const Data = struct {
|
||||
result: ResultT = undefined,
|
||||
frame: ?Frame = null,
|
||||
|
||||
fn callback(
|
||||
userdata: ?*@This(),
|
||||
l: *xev.Loop,
|
||||
c: *xev.Completion,
|
||||
s: StreamT,
|
||||
b: xev.WriteBuffer,
|
||||
result: ResultT,
|
||||
) xev.CallbackAction {
|
||||
_ = l;
|
||||
_ = c;
|
||||
_ = s;
|
||||
_ = b;
|
||||
const data = userdata.?;
|
||||
data.result = result;
|
||||
if (data.frame != null) libcoro.xresume(data.frame.?);
|
||||
return .disarm;
|
||||
}
|
||||
};
|
||||
|
||||
var data: Data = .{ .frame = libcoro.xframe() };
|
||||
|
||||
const loop = self.exec.loop;
|
||||
var c: xev.Completion = .{};
|
||||
self.stream().write(loop, &c, buf, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
return data.result;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub const File = struct {
|
||||
const Self = @This();
|
||||
|
||||
exec: *Executor,
|
||||
file: xev.File,
|
||||
|
||||
pub usingnamespace Stream(Self, xev.File, .{
|
||||
.close = true,
|
||||
.read = .read,
|
||||
.write = .write,
|
||||
.threadpool = true,
|
||||
});
|
||||
|
||||
pub fn init(exec: *Executor, file: xev.File) Self {
|
||||
return .{ .exec = exec, .file = file };
|
||||
}
|
||||
|
||||
fn stream(self: Self) xev.File {
|
||||
return self.file;
|
||||
}
|
||||
|
||||
const PReadResult = xev.ReadError!usize;
|
||||
pub fn pread(self: Self, buf: xev.ReadBuffer, offset: u64) !usize {
|
||||
const ResultT = PReadResult;
|
||||
const Data = struct {
|
||||
result: ResultT = undefined,
|
||||
frame: ?Frame = null,
|
||||
|
||||
fn callback(
|
||||
userdata: ?*@This(),
|
||||
l: *xev.Loop,
|
||||
c: *xev.Completion,
|
||||
s: xev.File,
|
||||
b: xev.ReadBuffer,
|
||||
result: ResultT,
|
||||
) xev.CallbackAction {
|
||||
_ = l;
|
||||
_ = c;
|
||||
_ = s;
|
||||
_ = b;
|
||||
const data = userdata.?;
|
||||
data.result = result;
|
||||
if (data.frame != null) libcoro.xresume(data.frame.?);
|
||||
return .disarm;
|
||||
}
|
||||
};
|
||||
|
||||
var data: Data = .{ .frame = libcoro.xframe() };
|
||||
|
||||
const loop = self.exec.loop;
|
||||
var c: xev.Completion = .{};
|
||||
self.file.pread(loop, &c, buf, offset, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
|
||||
const PWriteResult = xev.WriteError!usize;
|
||||
pub fn pwrite(self: Self, buf: xev.WriteBuffer, offset: u64) !usize {
|
||||
const ResultT = PWriteResult;
|
||||
const Data = struct {
|
||||
result: ResultT = undefined,
|
||||
frame: ?Frame = null,
|
||||
|
||||
fn callback(
|
||||
userdata: ?*@This(),
|
||||
l: *xev.Loop,
|
||||
c: *xev.Completion,
|
||||
s: xev.File,
|
||||
b: xev.WriteBuffer,
|
||||
result: ResultT,
|
||||
) xev.CallbackAction {
|
||||
_ = l;
|
||||
_ = c;
|
||||
_ = s;
|
||||
_ = b;
|
||||
const data = userdata.?;
|
||||
data.result = result;
|
||||
if (data.frame != null) libcoro.xresume(data.frame.?);
|
||||
return .disarm;
|
||||
}
|
||||
};
|
||||
|
||||
var data: Data = .{ .frame = libcoro.xframe() };
|
||||
|
||||
const loop = self.exec.loop;
|
||||
var c: xev.Completion = .{};
|
||||
self.file.pwrite(loop, &c, buf, offset, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
};
|
||||
|
||||
pub const Process = struct {
|
||||
const Self = @This();
|
||||
|
||||
exec: *Executor,
|
||||
p: xev.Process,
|
||||
|
||||
pub fn init(exec: *Executor, p: xev.Process) Self {
|
||||
return .{ .exec = exec, .p = p };
|
||||
}
|
||||
|
||||
const WaitResult = xev.Process.WaitError!u32;
|
||||
pub fn wait(self: Self) !u32 {
|
||||
const Data = XCallback(WaitResult);
|
||||
var c: xev.Completion = .{};
|
||||
var data = Data.init();
|
||||
const loop = self.exec.loop;
|
||||
self.p.wait(loop, &c, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
};
|
||||
|
||||
pub const AsyncNotification = struct {
|
||||
const Self = @This();
|
||||
|
||||
exec: *Executor,
|
||||
notif: xev.Async,
|
||||
|
||||
pub fn init(exec: *Executor, notif: xev.Async) Self {
|
||||
return .{ .exec = exec, .notif = notif };
|
||||
}
|
||||
|
||||
const WaitResult = xev.Async.WaitError!void;
|
||||
pub fn wait(self: Self) !void {
|
||||
const Data = XCallback(WaitResult);
|
||||
|
||||
const loop = self.exec.loop;
|
||||
var c: xev.Completion = .{};
|
||||
var data = Data.init();
|
||||
|
||||
self.notif.wait(loop, &c, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
};
|
||||
|
||||
pub const UDP = struct {
|
||||
const Self = @This();
|
||||
|
||||
exec: *Executor,
|
||||
udp: xev.UDP,
|
||||
|
||||
pub usingnamespace Stream(Self, xev.UDP, .{
|
||||
.close = true,
|
||||
.read = .none,
|
||||
.write = .none,
|
||||
});
|
||||
|
||||
pub fn init(exec: *Executor, udp: xev.UDP) Self {
|
||||
return .{ .exec = exec, .udp = udp };
|
||||
}
|
||||
|
||||
pub fn stream(self: Self) xev.UDP {
|
||||
return self.udp;
|
||||
}
|
||||
|
||||
const ReadResult = xev.ReadError!usize;
|
||||
pub fn read(self: Self, buf: xev.ReadBuffer) !usize {
|
||||
const ResultT = ReadResult;
|
||||
const Data = struct {
|
||||
result: ResultT = undefined,
|
||||
frame: ?Frame = null,
|
||||
|
||||
fn callback(
|
||||
userdata: ?*@This(),
|
||||
l: *xev.Loop,
|
||||
c: *xev.Completion,
|
||||
s: *xev.UDP.State,
|
||||
addr: std.net.Address,
|
||||
udp: xev.UDP,
|
||||
b: xev.ReadBuffer,
|
||||
result: ResultT,
|
||||
) xev.CallbackAction {
|
||||
_ = l;
|
||||
_ = c;
|
||||
_ = s;
|
||||
_ = addr;
|
||||
_ = udp;
|
||||
_ = b;
|
||||
const data = userdata.?;
|
||||
data.result = result;
|
||||
if (data.frame != null) libcoro.xresume(data.frame.?);
|
||||
return .disarm;
|
||||
}
|
||||
};
|
||||
|
||||
const loop = self.exec.loop;
|
||||
var s: xev.UDP.State = undefined;
|
||||
var c: xev.Completion = .{};
|
||||
var data: Data = .{ .frame = libcoro.xframe() };
|
||||
self.udp.read(loop, &c, &s, buf, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
|
||||
const WriteResult = xev.WriteError!usize;
|
||||
pub fn write(self: Self, addr: std.net.Address, buf: xev.WriteBuffer) !usize {
|
||||
const ResultT = WriteResult;
|
||||
const Data = struct {
|
||||
result: ResultT = undefined,
|
||||
frame: ?Frame = null,
|
||||
|
||||
fn callback(
|
||||
userdata: ?*@This(),
|
||||
l: *xev.Loop,
|
||||
c: *xev.Completion,
|
||||
s: *xev.UDP.State,
|
||||
udp: xev.UDP,
|
||||
b: xev.WriteBuffer,
|
||||
result: ResultT,
|
||||
) xev.CallbackAction {
|
||||
_ = l;
|
||||
_ = c;
|
||||
_ = s;
|
||||
_ = udp;
|
||||
_ = b;
|
||||
const data = userdata.?;
|
||||
data.result = result;
|
||||
if (data.frame != null) libcoro.xresume(data.frame.?);
|
||||
return .disarm;
|
||||
}
|
||||
};
|
||||
|
||||
const loop = self.exec.loop;
|
||||
var s: xev.UDP.State = undefined;
|
||||
var c: xev.Completion = .{};
|
||||
var data: Data = .{ .frame = libcoro.xframe() };
|
||||
self.udp.write(loop, &c, &s, addr, buf, Data, &data, &Data.callback);
|
||||
|
||||
try waitForCompletion(self.exec, &c);
|
||||
|
||||
return data.result;
|
||||
}
|
||||
};
|
||||
|
||||
fn RunT(comptime Func: anytype) type {
|
||||
const T = @typeInfo(@TypeOf(Func)).Fn.return_type.?;
|
||||
return switch (@typeInfo(T)) {
|
||||
.ErrorUnion => |E| E.payload,
|
||||
else => T,
|
||||
};
|
||||
}
|
||||
|
||||
fn XCallback(comptime ResultT: type) type {
|
||||
return struct {
|
||||
frame: ?Frame = null,
|
||||
result: ResultT = undefined,
|
||||
|
||||
fn init() @This() {
|
||||
return .{ .frame = libcoro.xframe() };
|
||||
}
|
||||
|
||||
fn callback(
|
||||
userdata: ?*@This(),
|
||||
_: *xev.Loop,
|
||||
_: *xev.Completion,
|
||||
result: ResultT,
|
||||
) xev.CallbackAction {
|
||||
const data = userdata.?;
|
||||
data.result = result;
|
||||
if (data.frame != null) libcoro.xresume(data.frame.?);
|
||||
return .disarm;
|
||||
}
|
||||
};
|
||||
}
|
||||
75
async/channel.zig
Normal file
75
async/channel.zig
Normal file
@ -0,0 +1,75 @@
|
||||
const std = @import("std");
|
||||
const stdx = @import("stdx");
|
||||
const executor = @import("executor.zig");
|
||||
|
||||
pub fn Channel(comptime T: type, comptime capacity: usize) type {
|
||||
const Storage = stdx.queue.ArrayQueue(T, capacity);
|
||||
|
||||
return struct {
|
||||
const Self = @This();
|
||||
|
||||
q: Storage = .{},
|
||||
closed: bool = false,
|
||||
len: usize = capacity,
|
||||
space_notif: executor.Condition,
|
||||
value_notif: executor.Condition,
|
||||
|
||||
pub fn init(exec: *executor.Executor) Self {
|
||||
return initWithLen(exec, capacity);
|
||||
}
|
||||
|
||||
pub fn initWithLen(exec: *executor.Executor, len: usize) Self {
|
||||
return .{
|
||||
.len = len,
|
||||
.space_notif = executor.Condition.init(exec),
|
||||
.value_notif = executor.Condition.init(exec),
|
||||
};
|
||||
}
|
||||
|
||||
pub fn close(self: *Self) void {
|
||||
self.closed = true;
|
||||
self.value_notif.signal();
|
||||
}
|
||||
|
||||
pub fn try_send(self: *Self, val: T) bool {
|
||||
stdx.debug.assert(self.closed == false, "cannot send on closed Channel", .{});
|
||||
if (self.q.len() >= self.len) {
|
||||
return false;
|
||||
}
|
||||
self.q.push(val) catch stdx.debug.panic("tried to send on full Channel. This shouldn't happen.", .{});
|
||||
self.value_notif.signal();
|
||||
return true;
|
||||
}
|
||||
|
||||
pub fn send(self: *Self, val: T) void {
|
||||
stdx.debug.assert(self.closed == false, "cannot send on closed Channel", .{});
|
||||
while (self.q.len() >= self.len) {
|
||||
self.space_notif.wait();
|
||||
}
|
||||
self.q.push(val) catch stdx.debug.panic("tried to send on full Channel. This shouldn't happen.", .{});
|
||||
self.value_notif.signal();
|
||||
}
|
||||
|
||||
pub fn recv(self: *Self) ?T {
|
||||
while (self.closed == false or self.q.len() > 0) {
|
||||
if (self.q.pop()) |val| {
|
||||
self.space_notif.signal();
|
||||
return val;
|
||||
}
|
||||
self.value_notif.wait();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
pub fn try_recv(self: *Self) ?T {
|
||||
if (self.closed == true) {
|
||||
return null;
|
||||
}
|
||||
if (self.q.pop()) |val| {
|
||||
self.space_notif.signal();
|
||||
return val;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
557
async/coro.zig
Normal file
557
async/coro.zig
Normal file
@ -0,0 +1,557 @@
|
||||
//! libcoro mutable state:
|
||||
//! * ThreadState
|
||||
//! * current_coro: set in ThreadState.switchTo
|
||||
//! * next_coro_id: set in ThreadState.nextCoroId
|
||||
//! * suspend_block: set in xsuspendBlock, cleared in ThreadState.switchIn
|
||||
//! * Coro
|
||||
//! * resumer: set in ThreadState.switchTo
|
||||
//! * status:
|
||||
//! * Active, Suspended: set in ThreadState.switchTo
|
||||
//! * Done: set in runcoro
|
||||
//! * id.invocation: incremented in ThreadState.switchTo
|
||||
const std = @import("std");
|
||||
const stdx = @import("stdx");
|
||||
const builtin = @import("builtin");
|
||||
const base = @import("coro_base.zig");
|
||||
const stack = @import("stack.zig");
|
||||
const Executor = @import("executor.zig").Executor;
|
||||
|
||||
const log = std.log.scoped(.@"zml/async");
|
||||
|
||||
// Public API
|
||||
// ============================================================================
|
||||
pub const Error = error{
|
||||
StackTooSmall,
|
||||
StackOverflow,
|
||||
SuspendFromMain,
|
||||
};
|
||||
pub const StackT = stack.Stack;
|
||||
|
||||
pub const Frame = *Coro;
|
||||
|
||||
/// Await the coroutine(s).
|
||||
/// frame: FrameT: runs the coroutine until done and returns its return value.
|
||||
pub fn xawait(frame: anytype) @TypeOf(frame).Signature.ReturnT {
|
||||
const f = frame.frame();
|
||||
while (f.status != .Done) xsuspend();
|
||||
std.debug.assert(f.status == .Done);
|
||||
return frame.xreturned();
|
||||
}
|
||||
|
||||
/// Create a coroutine and start it
|
||||
/// stack is {null, usize, StackT}. If null or usize, initEnv must have been
|
||||
/// called with a default stack allocator.
|
||||
pub fn xasync(func: anytype, args: anytype, stack_: stack.Stack) !FrameT(func, @TypeOf(args)) {
|
||||
const FrameType = CoroT.fromFunc(func, @TypeOf(args));
|
||||
const framet = try FrameType.init(args, stack_);
|
||||
const frame = framet.frame();
|
||||
xresume(frame);
|
||||
return FrameType.wrap(frame);
|
||||
}
|
||||
|
||||
pub const FrameT = CoroT.fromFunc;
|
||||
|
||||
/// True if within a coroutine, false if at top-level.
|
||||
pub fn inCoro() bool {
|
||||
return thread_state.inCoro();
|
||||
}
|
||||
|
||||
/// Returns the currently running coroutine
|
||||
pub fn xframe() Frame {
|
||||
return thread_state.current();
|
||||
}
|
||||
|
||||
/// Resume the passed coroutine, suspending the current coroutine.
|
||||
/// When the resumed coroutine suspends, this call will return.
|
||||
/// Note: When the resumed coroutine returns, control will switch to its parent
|
||||
/// (i.e. its original resumer).
|
||||
/// frame: Frame or FrameT
|
||||
pub fn xresume(frame: anytype) void {
|
||||
const f = frame.frame();
|
||||
thread_state.switchIn(f);
|
||||
}
|
||||
|
||||
/// Suspend the current coroutine, yielding control back to its
|
||||
/// resumer. Returns when the coroutine is resumed.
|
||||
/// Must be called from within a coroutine (i.e. not the top level).
|
||||
pub fn xsuspend() void {
|
||||
xsuspendSafe() catch |e| {
|
||||
log.err("{any}\n", .{e});
|
||||
@panic("xsuspend");
|
||||
};
|
||||
}
|
||||
|
||||
pub fn xsuspendBlock(comptime func: anytype, args: anytype) void {
|
||||
const Signature = stdx.meta.FnSignature(func, @TypeOf(args));
|
||||
const Callback = struct {
|
||||
func: *const Signature.FuncT,
|
||||
args: Signature.ArgsT,
|
||||
fn cb(ud: ?*anyopaque) void {
|
||||
const self: *@This() = @ptrCast(@alignCast(ud));
|
||||
@call(.auto, self.func, self.args);
|
||||
}
|
||||
};
|
||||
var cb = Callback{ .func = func, .args = args };
|
||||
thread_state.suspend_block = .{ .func = Callback.cb, .data = @ptrCast(&cb) };
|
||||
xsuspend();
|
||||
}
|
||||
|
||||
pub fn xsuspendSafe() Error!void {
|
||||
if (thread_state.current_coro == null) {
|
||||
return Error.SuspendFromMain;
|
||||
}
|
||||
const coro = thread_state.current_coro.?;
|
||||
try StackOverflow.check(coro);
|
||||
thread_state.switchOut(coro.resumer);
|
||||
}
|
||||
|
||||
const Coro = struct {
|
||||
/// Coroutine status
|
||||
const Status = enum {
|
||||
Start,
|
||||
Suspended,
|
||||
Active,
|
||||
Done,
|
||||
};
|
||||
const Signature = VoidSignature;
|
||||
|
||||
/// Function to run in the coroutine
|
||||
func: *const fn () void,
|
||||
/// Coroutine stack
|
||||
stack: stack.Stack,
|
||||
/// Architecture-specific implementation
|
||||
impl: base.Coro,
|
||||
/// The coroutine that will be yielded to upon suspend
|
||||
resumer: *Coro = undefined,
|
||||
/// Current status
|
||||
status: Status = .Start,
|
||||
/// Coro id, {thread, coro id, invocation id}
|
||||
id: CoroId.InvocationId,
|
||||
/// Caller-specified coro-local storage
|
||||
storage: ?*anyopaque = null,
|
||||
|
||||
fn init(func: *const fn () void, stack_: stack.Stack, storage: ?*anyopaque) !Frame {
|
||||
return initFromStack(func, stack_, storage);
|
||||
}
|
||||
|
||||
pub fn deinit(self: Coro) void {
|
||||
_ = self; // autofix
|
||||
}
|
||||
|
||||
fn initFromStack(func: *const fn () void, stack_: stack.Stack, storage: ?*anyopaque) !Frame {
|
||||
// try StackOverflow.setMagicNumber(stack.full);
|
||||
var stack__ = stack_;
|
||||
const coro = try stack__.push(Coro);
|
||||
const base_coro = try base.Coro.init(&runcoro, stack__.remaining());
|
||||
coro.* = .{
|
||||
.func = func,
|
||||
.impl = base_coro,
|
||||
.stack = stack__,
|
||||
.storage = storage,
|
||||
.id = thread_state.newCoroId(),
|
||||
};
|
||||
return coro;
|
||||
}
|
||||
|
||||
pub fn frame(self: *Coro) Frame {
|
||||
return self;
|
||||
}
|
||||
|
||||
fn runcoro(from: *base.Coro, this: *base.Coro) callconv(.C) noreturn {
|
||||
const from_coro: *Coro = @fieldParentPtr("impl", from);
|
||||
const this_coro: *Coro = @fieldParentPtr("impl", this);
|
||||
log.debug("coro start {any}\n", .{this_coro.id});
|
||||
@call(.auto, this_coro.func, .{});
|
||||
this_coro.status = .Done;
|
||||
log.debug("coro done {any}\n", .{this_coro.id});
|
||||
thread_state.switchOut(from_coro);
|
||||
|
||||
// Never returns
|
||||
stdx.debug.panic("Cannot resume an already completed coroutine {any}", .{this_coro.id});
|
||||
}
|
||||
|
||||
pub fn getStorage(self: Coro, comptime T: type) *T {
|
||||
return @ptrCast(@alignCast(self.storage));
|
||||
}
|
||||
|
||||
pub fn format(self: Coro, comptime fmt: []const u8, options: std.fmt.FormatOptions, writer: anytype) !void {
|
||||
_ = fmt;
|
||||
_ = options;
|
||||
try writer.print("Coro{{.id = {any}, .status = {s}}}", .{
|
||||
self.id,
|
||||
@tagName(self.status),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
const VoidSignature = CoroT.Signature.init((struct {
|
||||
fn func() void {}
|
||||
}).func, .{});
|
||||
|
||||
const CoroT = struct {
|
||||
fn fromFunc(comptime Func: anytype, comptime ArgsT: ?type) type {
|
||||
return fromSig(stdx.meta.FnSignature(Func, ArgsT));
|
||||
}
|
||||
|
||||
fn fromSig(comptime Sig: stdx.meta.Signature) type {
|
||||
// Stored in the coro stack
|
||||
const InnerStorage = struct {
|
||||
args: Sig.ArgsT,
|
||||
/// Values that are produced during coroutine execution
|
||||
retval: Sig.ReturnT = undefined,
|
||||
};
|
||||
|
||||
return struct {
|
||||
const Self = @This();
|
||||
pub const Signature = Sig;
|
||||
|
||||
_frame: Frame,
|
||||
|
||||
/// Create a Coro
|
||||
/// self and stack pointers must remain stable for the lifetime of
|
||||
/// the coroutine.
|
||||
fn init(args: Sig.ArgsT, stack_: StackT) !Self {
|
||||
var coro_stack = stack_;
|
||||
const inner = try coro_stack.push(InnerStorage);
|
||||
inner.* = .{
|
||||
.args = args,
|
||||
};
|
||||
return .{ ._frame = try Coro.initFromStack(wrapfn, coro_stack, inner) };
|
||||
}
|
||||
|
||||
pub fn wrap(_frame: Frame) Self {
|
||||
return .{ ._frame = _frame };
|
||||
}
|
||||
|
||||
pub fn deinit(self: Self) void {
|
||||
self._frame.deinit();
|
||||
}
|
||||
|
||||
pub fn status(self: Self) Coro.Status {
|
||||
return self._frame.status;
|
||||
}
|
||||
|
||||
pub fn frame(self: Self) Frame {
|
||||
return self._frame;
|
||||
}
|
||||
|
||||
// Coroutine functions.
|
||||
//
|
||||
// When considering basic coroutine execution, the coroutine state
|
||||
// machine is:
|
||||
// * Start
|
||||
// * Start->xresume->Active
|
||||
// * Active->xsuspend->Suspended
|
||||
// * Active->(fn returns)->Done
|
||||
// * Suspended->xresume->Active
|
||||
//
|
||||
// Note that actions in the Active* states are taken from within the
|
||||
// coroutine. All other actions act upon the coroutine from the
|
||||
// outside.
|
||||
|
||||
/// Returns the value the coroutine returned
|
||||
pub fn xreturned(self: Self) Sig.ReturnT {
|
||||
const storage = self._frame.getStorage(InnerStorage);
|
||||
return storage.retval;
|
||||
}
|
||||
|
||||
fn wrapfn() void {
|
||||
const storage = thread_state.currentStorage(InnerStorage);
|
||||
storage.retval = @call(
|
||||
.always_inline,
|
||||
Sig.Func.Value,
|
||||
storage.args,
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
/// Estimates the remaining stack size in the currently running coroutine
|
||||
pub noinline fn remainingStackSize() usize {
|
||||
var dummy: usize = 0;
|
||||
dummy += 1;
|
||||
const addr = @intFromPtr(&dummy);
|
||||
|
||||
// Check if the stack was already overflowed
|
||||
const current = xframe();
|
||||
StackOverflow.check(current) catch return 0;
|
||||
|
||||
// Check if the stack is currently overflowed
|
||||
const bottom = @intFromPtr(current.stack.ptr);
|
||||
if (addr < bottom) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Debug check that we're actually in the stack
|
||||
const top = @intFromPtr(current.stack.ptr + current.stack.len);
|
||||
std.debug.assert(addr < top); // should never have popped beyond the top
|
||||
|
||||
return addr - bottom;
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
|
||||
/// Thread-local coroutine runtime
|
||||
threadlocal var thread_state: ThreadState = .{};
|
||||
|
||||
const ThreadState = struct {
|
||||
root_coro: Coro = .{
|
||||
.func = undefined,
|
||||
.stack = undefined,
|
||||
.impl = undefined,
|
||||
.id = CoroId.InvocationId.root(),
|
||||
},
|
||||
current_coro: ?Frame = null,
|
||||
next_coro_id: usize = 1,
|
||||
suspend_block: ?SuspendBlock = null,
|
||||
|
||||
const SuspendBlock = struct {
|
||||
func: *const fn (?*anyopaque) void,
|
||||
data: ?*anyopaque,
|
||||
|
||||
fn run(self: SuspendBlock) void {
|
||||
@call(.auto, self.func, .{self.data});
|
||||
}
|
||||
};
|
||||
|
||||
/// Called from resume
|
||||
fn switchIn(self: *ThreadState, target: Frame) void {
|
||||
log.debug("coro resume {any} from {any}\n", .{ target.id, self.current().id });
|
||||
|
||||
// Switch to target, setting this coro as the resumer.
|
||||
self.switchTo(target, true);
|
||||
|
||||
// Suspend within target brings control back here
|
||||
// If a suspend block has been set, pop and run it.
|
||||
if (self.suspend_block) |block| {
|
||||
self.suspend_block = null;
|
||||
block.run();
|
||||
}
|
||||
}
|
||||
|
||||
/// Called from suspend
|
||||
fn switchOut(self: *ThreadState, target: Frame) void {
|
||||
log.debug("coro suspend {any} to {any}\n", .{ self.current().id, target.id });
|
||||
self.switchTo(target, false);
|
||||
}
|
||||
|
||||
fn switchTo(self: *ThreadState, target: Frame, set_resumer: bool) void {
|
||||
const suspender = self.current();
|
||||
if (suspender == target) {
|
||||
return;
|
||||
}
|
||||
if (suspender.status != .Done) {
|
||||
suspender.status = .Suspended;
|
||||
}
|
||||
if (set_resumer) {
|
||||
target.resumer = suspender;
|
||||
}
|
||||
target.status = .Active;
|
||||
target.id.incr();
|
||||
self.current_coro = target;
|
||||
target.impl.resumeFrom(&suspender.impl);
|
||||
}
|
||||
|
||||
fn newCoroId(self: *ThreadState) CoroId.InvocationId {
|
||||
const out = CoroId.InvocationId.init(.{
|
||||
.coro = self.next_coro_id,
|
||||
});
|
||||
self.next_coro_id += 1;
|
||||
return out;
|
||||
}
|
||||
|
||||
fn current(self: *ThreadState) Frame {
|
||||
return self.current_coro orelse &self.root_coro;
|
||||
}
|
||||
|
||||
fn inCoro(self: *ThreadState) bool {
|
||||
return self.current() != &self.root_coro;
|
||||
}
|
||||
|
||||
/// Returns the storage of the currently running coroutine
|
||||
fn currentStorage(self: *ThreadState, comptime T: type) *T {
|
||||
return self.current_coro.?.getStorage(T);
|
||||
}
|
||||
};
|
||||
|
||||
const CoroId = struct {
|
||||
coro: usize,
|
||||
|
||||
pub const InvocationId = if (builtin.mode == .Debug) DebugInvocationId else DummyInvocationId;
|
||||
|
||||
const DummyInvocationId = struct {
|
||||
fn init(id: CoroId) @This() {
|
||||
_ = id;
|
||||
return .{};
|
||||
}
|
||||
fn root() @This() {
|
||||
return .{};
|
||||
}
|
||||
fn incr(self: *@This()) void {
|
||||
_ = self;
|
||||
}
|
||||
};
|
||||
|
||||
const DebugInvocationId = struct {
|
||||
id: CoroId,
|
||||
invocation: i64 = -1,
|
||||
|
||||
fn init(id: CoroId) @This() {
|
||||
return .{ .id = id };
|
||||
}
|
||||
|
||||
fn root() @This() {
|
||||
return .{ .id = .{ .coro = 0 } };
|
||||
}
|
||||
|
||||
fn incr(self: *@This()) void {
|
||||
self.invocation += 1;
|
||||
}
|
||||
|
||||
pub fn format(self: @This(), comptime fmt: []const u8, options: std.fmt.FormatOptions, writer: anytype) !void {
|
||||
_ = fmt;
|
||||
_ = options;
|
||||
try writer.print("CoroId{{.cid={d}, .i={d}}}", .{
|
||||
self.id.coro,
|
||||
self.invocation,
|
||||
});
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
const StackOverflow = struct {
|
||||
const magic_number: usize = 0x5E574D6D;
|
||||
|
||||
fn check(coro: Frame) !void {
|
||||
_ = coro; // autofix
|
||||
// const stack = coro.stack.ptr;
|
||||
// const sp = coro.impl.stack_pointer;
|
||||
// const magic_number_ptr: *usize = @ptrCast(stack);
|
||||
// if (magic_number_ptr.* != magic_number or //
|
||||
// @intFromPtr(sp) < @intFromPtr(stack))
|
||||
// {
|
||||
// return Error.StackOverflow;
|
||||
// }
|
||||
}
|
||||
|
||||
fn setMagicNumber(stack_: stack.Stack) !void {
|
||||
_ = stack_; // autofix
|
||||
// if (stack.len <= @sizeOf(usize)) {
|
||||
// return Error.StackTooSmall;
|
||||
// }
|
||||
// const magic_number_ptr: *usize = @ptrCast(stack.ptr);
|
||||
// magic_number_ptr.* = magic_number;
|
||||
}
|
||||
};
|
||||
|
||||
var test_idx: usize = 0;
|
||||
var test_steps = [_]usize{0} ** 8;
|
||||
|
||||
fn testSetIdx(val: usize) void {
|
||||
test_steps[test_idx] = val;
|
||||
test_idx += 1;
|
||||
}
|
||||
|
||||
fn testFn() void {
|
||||
std.debug.assert(remainingStackSize() > 2048);
|
||||
testSetIdx(2);
|
||||
xsuspend();
|
||||
testSetIdx(4);
|
||||
xsuspend();
|
||||
testSetIdx(6);
|
||||
}
|
||||
|
||||
test "basic suspend and resume" {
|
||||
// const allocator = std.testing.allocator;
|
||||
|
||||
// const stack_size: usize = 1024 * 4;
|
||||
// const stack_ = try stackAlloc(allocator, stack_size);
|
||||
// defer allocator.free(stack);
|
||||
|
||||
// testSetIdx(0);
|
||||
// const test_coro = try Coro.init(testFn, stack, false, null);
|
||||
|
||||
// testSetIdx(1);
|
||||
// try std.testing.expectEqual(test_coro.status, .Start);
|
||||
// xresume(test_coro);
|
||||
// testSetIdx(3);
|
||||
// try std.testing.expectEqual(test_coro.status, .Suspended);
|
||||
// xresume(test_coro);
|
||||
// try std.testing.expectEqual(test_coro.status, .Suspended);
|
||||
// testSetIdx(5);
|
||||
// xresume(test_coro);
|
||||
// testSetIdx(7);
|
||||
|
||||
// try std.testing.expectEqual(test_coro.status, .Done);
|
||||
|
||||
// for (0..test_steps.len) |i| {
|
||||
// try std.testing.expectEqual(i, test_steps[i]);
|
||||
// }
|
||||
}
|
||||
|
||||
test "with values" {
|
||||
// const Test = struct {
|
||||
// const Storage = struct {
|
||||
// x: *usize,
|
||||
// };
|
||||
// fn coroInner(x: *usize) void {
|
||||
// x.* += 1;
|
||||
// xsuspend();
|
||||
// x.* += 3;
|
||||
// }
|
||||
// fn coroWrap() void {
|
||||
// const storage = xframe().getStorage(Storage);
|
||||
// const x = storage.x;
|
||||
// coroInner(x);
|
||||
// }
|
||||
// };
|
||||
// var x: usize = 0;
|
||||
// var storage = Test.Storage{ .x = &x };
|
||||
|
||||
// const allocator = std.testing.allocator;
|
||||
// const stack = try stackAlloc(allocator, null);
|
||||
// defer allocator.free(stack);
|
||||
// const coro = try Coro.init(Test.coroWrap, stack, false, @ptrCast(&storage));
|
||||
|
||||
// try std.testing.expectEqual(storage.x.*, 0);
|
||||
// xresume(coro);
|
||||
// try std.testing.expectEqual(storage.x.*, 1);
|
||||
// xresume(coro);
|
||||
// try std.testing.expectEqual(storage.x.*, 4);
|
||||
}
|
||||
|
||||
fn testCoroFnImpl(x: *usize) usize {
|
||||
x.* += 1;
|
||||
xsuspend();
|
||||
x.* += 3;
|
||||
xsuspend();
|
||||
return x.* + 10;
|
||||
}
|
||||
|
||||
test "with CoroT" {
|
||||
// const allocator = std.testing.allocator;
|
||||
// const stack = try stackAlloc(allocator, null);
|
||||
// defer allocator.free(stack);
|
||||
|
||||
// var x: usize = 0;
|
||||
|
||||
// const CoroFn = CoroT.fromFunc(testCoroFnImpl, .{});
|
||||
// var coro = try CoroFn.init(.{&x}, stack, false);
|
||||
|
||||
// try std.testing.expectEqual(x, 0);
|
||||
// xresume(coro);
|
||||
// try std.testing.expectEqual(x, 1);
|
||||
// xresume(coro);
|
||||
// try std.testing.expectEqual(x, 4);
|
||||
// xresume(coro);
|
||||
// try std.testing.expectEqual(x, 4);
|
||||
// try std.testing.expectEqual(coro.status(), .Done);
|
||||
// try std.testing.expectEqual(CoroFn.xreturned(coro), 14);
|
||||
// comptime try std.testing.expectEqual(CoroFn.Signature.ReturnT(), usize);
|
||||
}
|
||||
|
||||
test "resume self" {
|
||||
xresume(xframe());
|
||||
try std.testing.expectEqual(1, 1);
|
||||
}
|
||||
71
async/coro_base.zig
Normal file
71
async/coro_base.zig
Normal file
@ -0,0 +1,71 @@
|
||||
const std = @import("std");
|
||||
const stdx = @import("stdx");
|
||||
const builtin = @import("builtin");
|
||||
const Error = @import("coro.zig").Error;
|
||||
|
||||
const ArchInfo = struct {
|
||||
num_registers: usize,
|
||||
jump_idx: usize,
|
||||
assembly: []const u8,
|
||||
};
|
||||
|
||||
const arch_info: ArchInfo = switch (builtin.cpu.arch) {
|
||||
.aarch64 => .{
|
||||
.num_registers = 20,
|
||||
.jump_idx = 19,
|
||||
.assembly = @embedFile("asm/coro_aarch64.s"),
|
||||
},
|
||||
.x86_64 => switch (builtin.os.tag) {
|
||||
.windows => .{
|
||||
.num_registers = 32,
|
||||
.jump_idx = 30,
|
||||
.assembly = @embedFile("asm/coro_x86_64_windows.s"),
|
||||
},
|
||||
else => .{
|
||||
.num_registers = 8,
|
||||
.jump_idx = 6,
|
||||
.assembly = @embedFile("asm/coro_x86_64.s"),
|
||||
},
|
||||
},
|
||||
.riscv64 => .{
|
||||
.num_registers = 25,
|
||||
.jump_idx = 24,
|
||||
.assembly = @embedFile("asm/coro_riscv64.s"),
|
||||
},
|
||||
else => @compileError("Unsupported cpu architecture"),
|
||||
};
|
||||
|
||||
pub const stack_alignment = 16;
|
||||
|
||||
extern fn libcoro_stack_swap(current: *Coro, target: *Coro) void;
|
||||
comptime {
|
||||
asm (arch_info.assembly);
|
||||
}
|
||||
|
||||
pub const Coro = packed struct {
|
||||
stack_pointer: [*]u8,
|
||||
|
||||
const Self = @This();
|
||||
const Func = *const fn (
|
||||
from: *Coro,
|
||||
self: *Coro,
|
||||
) callconv(.C) noreturn;
|
||||
|
||||
pub fn init(func: Func, stack: []align(stack_alignment) u8) !Self {
|
||||
stdx.debug.assertComptime(@sizeOf(usize) == 8, "usize expected to take 8 bytes", .{});
|
||||
stdx.debug.assertComptime(@sizeOf(*Func) == 8, "function pointer expected to take 8 bytes", .{});
|
||||
|
||||
const register_bytes = arch_info.num_registers * 8;
|
||||
if (register_bytes > stack.len) {
|
||||
return Error.StackTooSmall;
|
||||
}
|
||||
const register_space = stack[stack.len - register_bytes ..];
|
||||
const jump_ptr: *Func = @ptrCast(@alignCast(®ister_space[arch_info.jump_idx * 8]));
|
||||
jump_ptr.* = func;
|
||||
return .{ .stack_pointer = register_space.ptr };
|
||||
}
|
||||
|
||||
pub inline fn resumeFrom(self: *Self, from: *Self) void {
|
||||
libcoro_stack_swap(from, self);
|
||||
}
|
||||
};
|
||||
209
async/executor.zig
Normal file
209
async/executor.zig
Normal file
@ -0,0 +1,209 @@
|
||||
const std = @import("std");
|
||||
const stdx = @import("stdx");
|
||||
const libcoro = @import("coro.zig");
|
||||
const libcoro_options = @import("libcoro_options");
|
||||
|
||||
const log = std.log.scoped(.@"zml/async");
|
||||
|
||||
pub const Executor = struct {
|
||||
const Self = @This();
|
||||
|
||||
pub const Func = struct {
|
||||
const FuncFn = *const fn (userdata: ?*anyopaque) void;
|
||||
|
||||
func: FuncFn,
|
||||
userdata: ?*anyopaque = null,
|
||||
next: ?*Func = null,
|
||||
|
||||
pub fn init(func: FuncFn, userdata: ?*anyopaque) Func {
|
||||
return .{ .func = func, .userdata = userdata };
|
||||
}
|
||||
|
||||
fn run(self: Func) void {
|
||||
@call(.auto, self.func, .{self.userdata});
|
||||
}
|
||||
};
|
||||
|
||||
readyq: stdx.queue.SPSC(Func) = .{},
|
||||
|
||||
pub fn init() Self {
|
||||
return .{};
|
||||
}
|
||||
|
||||
pub fn runSoon(self: *Self, func: *Func) void {
|
||||
self.readyq.push(func);
|
||||
}
|
||||
|
||||
pub fn runAllSoon(self: *Self, funcs: stdx.queue.SPSC(Func)) void {
|
||||
self.readyq.pushAll(funcs);
|
||||
}
|
||||
|
||||
pub fn tick(self: *Self) bool {
|
||||
// Reset readyq so that adds run on next tick.
|
||||
var now = self.readyq;
|
||||
self.readyq = .{};
|
||||
|
||||
log.debug("Executor.tick readyq_len={d}", .{now.len()});
|
||||
|
||||
var count: usize = 0;
|
||||
while (now.pop()) |func| : (count += 1) {
|
||||
func.run();
|
||||
}
|
||||
|
||||
log.debug("Executor.tick done", .{});
|
||||
|
||||
return count > 0;
|
||||
}
|
||||
};
|
||||
|
||||
pub const Condition = struct {
|
||||
const Waiter = struct {
|
||||
coro: libcoro.Frame,
|
||||
notified: bool = false,
|
||||
|
||||
fn init() Waiter {
|
||||
return .{ .coro = libcoro.xframe() };
|
||||
}
|
||||
|
||||
fn func(self: *Waiter) Executor.Func {
|
||||
return .{ .func = Waiter.cb, .userdata = self };
|
||||
}
|
||||
|
||||
fn cb(ud: ?*anyopaque) void {
|
||||
const self: *Waiter = @ptrCast(@alignCast(ud));
|
||||
libcoro.xresume(self.coro);
|
||||
}
|
||||
};
|
||||
|
||||
exec: *Executor,
|
||||
waiters: stdx.queue.SPSC(Executor.Func) = .{},
|
||||
|
||||
pub fn init(exec: *Executor) Condition {
|
||||
return .{ .exec = exec };
|
||||
}
|
||||
|
||||
pub fn broadcast(self: *Condition) void {
|
||||
var waiter_func = self.waiters.head;
|
||||
while (waiter_func) |wf| : (waiter_func = wf.next) {
|
||||
const waiter: *Waiter = @ptrCast(@alignCast(wf.userdata));
|
||||
waiter.notified = true;
|
||||
}
|
||||
self.exec.runAllSoon(self.waiters);
|
||||
}
|
||||
|
||||
pub fn signal(self: *Condition) void {
|
||||
if (self.waiters.pop()) |waiter_func| {
|
||||
const waiter: *Waiter = @ptrCast(@alignCast(waiter_func.userdata));
|
||||
waiter.notified = true;
|
||||
self.exec.runSoon(waiter_func);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(self: *Condition) void {
|
||||
var waiter = Waiter.init();
|
||||
var cb = waiter.func();
|
||||
self.waiters.push(&cb);
|
||||
while (waiter.notified == false) {
|
||||
libcoro.xsuspend();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
pub const CoroResume = struct {
|
||||
const Self = @This();
|
||||
|
||||
coro: libcoro.Frame,
|
||||
|
||||
pub fn init() Self {
|
||||
return .{ .coro = libcoro.xframe() };
|
||||
}
|
||||
|
||||
pub fn func(self: *Self) Executor.Func {
|
||||
return .{ .func = Self.cb, .userdata = self };
|
||||
}
|
||||
|
||||
fn cb(ud: ?*anyopaque) void {
|
||||
const self: *Self = @ptrCast(@alignCast(ud));
|
||||
libcoro.xresume(self.coro);
|
||||
}
|
||||
};
|
||||
|
||||
pub fn getExec(exec: ?*Executor) *Executor {
|
||||
if (exec != null) return exec.?;
|
||||
if (libcoro.getEnv().executor) |x| return x;
|
||||
@panic("No explicit Executor passed and no default Executor available");
|
||||
}
|
||||
|
||||
pub fn ArrayQueue(comptime T: type, comptime size: usize) type {
|
||||
return struct {
|
||||
const Self = @This();
|
||||
|
||||
vals: [size]T = undefined,
|
||||
head: ?usize = null,
|
||||
tail: ?usize = null,
|
||||
|
||||
fn init() Self {
|
||||
return .{};
|
||||
}
|
||||
|
||||
fn len(self: Self) usize {
|
||||
switch (self.state()) {
|
||||
.empty => return 0,
|
||||
.one => return 1,
|
||||
.many => {
|
||||
const head = self.head.?;
|
||||
const tail = self.tail.?;
|
||||
if (tail > head) {
|
||||
return tail - head + 1;
|
||||
}
|
||||
return size - head + tail + 1;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn space(self: Self) usize {
|
||||
return size - self.len();
|
||||
}
|
||||
|
||||
fn push(self: *@This(), val: T) !void {
|
||||
if (self.space() < 1) return error.QueueFull;
|
||||
switch (self.state()) {
|
||||
.empty => {
|
||||
self.head = 0;
|
||||
self.tail = 0;
|
||||
self.vals[0] = val;
|
||||
},
|
||||
.one, .many => {
|
||||
const tail = self.tail.?;
|
||||
const new_tail = (tail + 1) % size;
|
||||
self.vals[new_tail] = val;
|
||||
self.tail = new_tail;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn pop(self: *Self) ?T {
|
||||
switch (self.state()) {
|
||||
.empty => return null,
|
||||
.one => {
|
||||
const out = self.vals[self.head.?];
|
||||
self.head = null;
|
||||
self.tail = null;
|
||||
return out;
|
||||
},
|
||||
.many => {
|
||||
const out = self.vals[self.head.?];
|
||||
self.head = (self.head.? + 1) % size;
|
||||
return out;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
const State = enum { empty, one, many };
|
||||
inline fn state(self: Self) State {
|
||||
if (self.head == null) return .empty;
|
||||
if (self.head.? == self.tail.?) return .one;
|
||||
return .many;
|
||||
}
|
||||
};
|
||||
}
|
||||
101
async/queue.zig
101
async/queue.zig
@ -1,101 +0,0 @@
|
||||
const std = @import("std");
|
||||
const assert = std.debug.assert;
|
||||
|
||||
/// An intrusive queue implementation. The type T must have a field
|
||||
/// "next" of type `?*T`.
|
||||
///
|
||||
/// For those unaware, an intrusive variant of a data structure is one in which
|
||||
/// the data type in the list has the pointer to the next element, rather
|
||||
/// than a higher level "node" or "container" type. The primary benefit
|
||||
/// of this (and the reason we implement this) is that it defers all memory
|
||||
/// management to the caller: the data structure implementation doesn't need
|
||||
/// to allocate "nodes" to contain each element. Instead, the caller provides
|
||||
/// the element and how its allocated is up to them.
|
||||
pub fn Intrusive(comptime T: type) type {
|
||||
return struct {
|
||||
const Self = @This();
|
||||
|
||||
/// Head is the front of the queue and tail is the back of the queue.
|
||||
head: ?*T = null,
|
||||
tail: ?*T = null,
|
||||
|
||||
/// Enqueue a new element to the back of the queue.
|
||||
pub fn push(self: *Self, v: *T) void {
|
||||
assert(v.next == null);
|
||||
|
||||
if (self.tail) |tail| {
|
||||
// If we have elements in the queue, then we add a new tail.
|
||||
tail.next = v;
|
||||
self.tail = v;
|
||||
} else {
|
||||
// No elements in the queue we setup the initial state.
|
||||
self.head = v;
|
||||
self.tail = v;
|
||||
}
|
||||
}
|
||||
|
||||
/// Dequeue the next element from the queue.
|
||||
pub fn pop(self: *Self) ?*T {
|
||||
// The next element is in "head".
|
||||
const next = self.head orelse return null;
|
||||
|
||||
// If the head and tail are equal this is the last element
|
||||
// so we also set tail to null so we can now be empty.
|
||||
if (self.head == self.tail) self.tail = null;
|
||||
|
||||
// Head is whatever is next (if we're the last element,
|
||||
// this will be null);
|
||||
self.head = next.next;
|
||||
|
||||
// We set the "next" field to null so that this element
|
||||
// can be inserted again.
|
||||
next.next = null;
|
||||
return next;
|
||||
}
|
||||
|
||||
/// Returns true if the queue is empty.
|
||||
pub fn empty(self: *const Self) bool {
|
||||
return self.head == null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test Intrusive {
|
||||
const testing = std.testing;
|
||||
|
||||
// Types
|
||||
const Elem = struct {
|
||||
const Self = @This();
|
||||
next: ?*Self = null,
|
||||
};
|
||||
const Queue = Intrusive(Elem);
|
||||
var q: Queue = .{};
|
||||
try testing.expect(q.empty());
|
||||
|
||||
// Elems
|
||||
var elems: [10]Elem = .{.{}} ** 10;
|
||||
|
||||
// One
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
try testing.expect(!q.empty());
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
try testing.expect(q.pop() == null);
|
||||
try testing.expect(q.empty());
|
||||
|
||||
// Two
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
q.push(&elems[1]);
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
try testing.expect(q.pop().? == &elems[1]);
|
||||
try testing.expect(q.pop() == null);
|
||||
|
||||
// Interleaved
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
q.push(&elems[1]);
|
||||
try testing.expect(q.pop().? == &elems[1]);
|
||||
try testing.expect(q.pop() == null);
|
||||
}
|
||||
@ -1,116 +0,0 @@
|
||||
const std = @import("std");
|
||||
const assert = std.debug.assert;
|
||||
|
||||
/// An intrusive MPSC (multi-provider, single consumer) queue implementation.
|
||||
/// The type T must have a field "next" of type `?*T`.
|
||||
///
|
||||
/// This is an implementatin of a Vyukov Queue[1].
|
||||
/// TODO(mitchellh): I haven't audited yet if I got all the atomic operations
|
||||
/// correct. I was short term more focused on getting something that seemed
|
||||
/// to work; I need to make sure it actually works.
|
||||
///
|
||||
/// For those unaware, an intrusive variant of a data structure is one in which
|
||||
/// the data type in the list has the pointer to the next element, rather
|
||||
/// than a higher level "node" or "container" type. The primary benefit
|
||||
/// of this (and the reason we implement this) is that it defers all memory
|
||||
/// management to the caller: the data structure implementation doesn't need
|
||||
/// to allocate "nodes" to contain each element. Instead, the caller provides
|
||||
/// the element and how its allocated is up to them.
|
||||
///
|
||||
/// [1]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
|
||||
pub fn Intrusive(comptime T: type) type {
|
||||
return struct {
|
||||
const Self = @This();
|
||||
|
||||
/// Head is the front of the queue and tail is the back of the queue.
|
||||
head: *T,
|
||||
tail: *T,
|
||||
stub: T,
|
||||
|
||||
/// Initialize the queue. This requires a stable pointer to itself.
|
||||
/// This must be called before the queue is used concurrently.
|
||||
pub fn init(self: *Self) void {
|
||||
self.head = &self.stub;
|
||||
self.tail = &self.stub;
|
||||
self.stub.next = null;
|
||||
}
|
||||
|
||||
/// Push an item onto the queue. This can be called by any number
|
||||
/// of producers.
|
||||
pub fn push(self: *Self, v: *T) void {
|
||||
@atomicStore(?*T, &v.next, null, .unordered);
|
||||
const prev = @atomicRmw(*T, &self.head, .Xchg, v, .acq_rel);
|
||||
@atomicStore(?*T, &prev.next, v, .release);
|
||||
}
|
||||
|
||||
/// Pop the first in element from the queue. This must be called
|
||||
/// by only a single consumer at any given time.
|
||||
pub fn pop(self: *Self) ?*T {
|
||||
var tail = @atomicLoad(*T, &self.tail, .unordered);
|
||||
var next_ = @atomicLoad(?*T, &tail.next, .acquire);
|
||||
if (tail == &self.stub) {
|
||||
const next = next_ orelse return null;
|
||||
@atomicStore(*T, &self.tail, next, .unordered);
|
||||
tail = next;
|
||||
next_ = @atomicLoad(?*T, &tail.next, .acquire);
|
||||
}
|
||||
|
||||
if (next_) |next| {
|
||||
@atomicStore(*T, &self.tail, next, .release);
|
||||
tail.next = null;
|
||||
return tail;
|
||||
}
|
||||
|
||||
const head = @atomicLoad(*T, &self.head, .unordered);
|
||||
if (tail != head) return null;
|
||||
self.push(&self.stub);
|
||||
|
||||
next_ = @atomicLoad(?*T, &tail.next, .acquire);
|
||||
if (next_) |next| {
|
||||
@atomicStore(*T, &self.tail, next, .unordered);
|
||||
tail.next = null;
|
||||
return tail;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test Intrusive {
|
||||
const testing = std.testing;
|
||||
|
||||
// Types
|
||||
const Elem = struct {
|
||||
const Self = @This();
|
||||
next: ?*Self = null,
|
||||
};
|
||||
const Queue = Intrusive(Elem);
|
||||
var q: Queue = undefined;
|
||||
q.init();
|
||||
|
||||
// Elems
|
||||
var elems: [10]Elem = .{.{}} ** 10;
|
||||
|
||||
// One
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
try testing.expect(q.pop() == null);
|
||||
|
||||
// Two
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
q.push(&elems[1]);
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
try testing.expect(q.pop().? == &elems[1]);
|
||||
try testing.expect(q.pop() == null);
|
||||
|
||||
// // Interleaved
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
q.push(&elems[1]);
|
||||
try testing.expect(q.pop().? == &elems[1]);
|
||||
try testing.expect(q.pop() == null);
|
||||
}
|
||||
83
async/stack.zig
Normal file
83
async/stack.zig
Normal file
@ -0,0 +1,83 @@
|
||||
const std = @import("std");
|
||||
const coro_base = @import("coro_base.zig");
|
||||
|
||||
pub const stack_size = 1 * 1024 * 1024;
|
||||
|
||||
pub const Stack = struct {
|
||||
pub const Data = struct {
|
||||
data: [stack_size]u8 align(coro_base.stack_alignment) = undefined,
|
||||
|
||||
pub fn ptr(self: *const Data) [*]u8 {
|
||||
return &self.data;
|
||||
}
|
||||
};
|
||||
|
||||
full: *Data,
|
||||
used: []u8,
|
||||
|
||||
pub fn init(full: *Data) Stack {
|
||||
return .{
|
||||
.full = full,
|
||||
.used = full.data[full.data.len..],
|
||||
};
|
||||
}
|
||||
|
||||
pub fn remaining(self: Stack) []align(coro_base.stack_alignment) u8 {
|
||||
return self.full.data[0 .. self.full.data.len - self.used.len];
|
||||
}
|
||||
|
||||
pub fn push(self: *Stack, comptime T: type) !*T {
|
||||
const ptr_i = std.mem.alignBackward(
|
||||
usize,
|
||||
@intFromPtr(self.used.ptr - @sizeOf(T)),
|
||||
coro_base.stack_alignment,
|
||||
);
|
||||
if (ptr_i <= @intFromPtr(&self.full.data[0])) {
|
||||
return error.StackTooSmall;
|
||||
}
|
||||
self.used = self.full.data[ptr_i - @intFromPtr(&self.full.data[0]) ..];
|
||||
return @ptrFromInt(ptr_i);
|
||||
}
|
||||
};
|
||||
|
||||
pub const PooledStackAllocator = struct {
|
||||
const Pool = std.heap.MemoryPoolAligned(Stack.Data, coro_base.stack_alignment);
|
||||
|
||||
pool: Pool,
|
||||
|
||||
pub fn init(allocator: std.mem.Allocator) PooledStackAllocator {
|
||||
return .{ .pool = Pool.init(allocator) };
|
||||
}
|
||||
|
||||
pub fn deinit(self: *PooledStackAllocator) void {
|
||||
self.pool.deinit();
|
||||
}
|
||||
|
||||
pub fn create(self: *PooledStackAllocator) !Stack {
|
||||
return Stack.init(try self.pool.create());
|
||||
}
|
||||
|
||||
pub fn destroy(self: *PooledStackAllocator, stack: *Stack) void {
|
||||
self.pool.destroy(stack.full);
|
||||
}
|
||||
};
|
||||
|
||||
pub const StackAllocator = struct {
|
||||
allocator: std.mem.Allocator,
|
||||
|
||||
pub fn init(allocator: std.mem.Allocator) StackAllocator {
|
||||
return .{ .allocator = allocator };
|
||||
}
|
||||
|
||||
pub fn deinit(self: *StackAllocator) void {
|
||||
_ = self; // autofix
|
||||
}
|
||||
|
||||
pub fn create(self: *StackAllocator) !Stack {
|
||||
return Stack.init(try self.allocator.create(Stack.Data));
|
||||
}
|
||||
|
||||
pub fn destroy(self: *StackAllocator, stack: *Stack) void {
|
||||
self.allocator.destroy(stack.full);
|
||||
}
|
||||
};
|
||||
@ -7,6 +7,7 @@ zig_library(
|
||||
"io.zig",
|
||||
"math.zig",
|
||||
"meta.zig",
|
||||
"queue.zig",
|
||||
"signature.zig",
|
||||
],
|
||||
main = "stdx.zig",
|
||||
|
||||
@ -4,6 +4,7 @@ const debug = @import("debug.zig");
|
||||
const compileError = debug.compileError;
|
||||
|
||||
pub const FnSignature = @import("signature.zig").FnSignature;
|
||||
pub const Signature = @import("signature.zig").Signature;
|
||||
|
||||
pub fn isStruct(comptime T: type) bool {
|
||||
return switch (@typeInfo(T)) {
|
||||
|
||||
305
stdx/queue.zig
Normal file
305
stdx/queue.zig
Normal file
@ -0,0 +1,305 @@
|
||||
const std = @import("std");
|
||||
const assert = std.debug.assert;
|
||||
|
||||
/// An intrusive queue implementation. The type T must have a field
|
||||
/// "next" of type `?*T`.
|
||||
///
|
||||
/// For those unaware, an intrusive variant of a data structure is one in which
|
||||
/// the data type in the list has the pointer to the next element, rather
|
||||
/// than a higher level "node" or "container" type. The primary benefit
|
||||
/// of this (and the reason we implement this) is that it defers all memory
|
||||
/// management to the caller: the data structure implementation doesn't need
|
||||
/// to allocate "nodes" to contain each element. Instead, the caller provides
|
||||
/// the element and how its allocated is up to them.
|
||||
pub fn SPSC(comptime T: type) type {
|
||||
return struct {
|
||||
const Self = @This();
|
||||
|
||||
/// Head is the front of the queue and tail is the back of the queue.
|
||||
head: ?*T = null,
|
||||
tail: ?*T = null,
|
||||
|
||||
/// Enqueue a new element to the back of the queue.
|
||||
pub fn push(self: *Self, v: *T) void {
|
||||
assert(v.next == null);
|
||||
|
||||
if (self.tail) |tail| {
|
||||
// If we have elements in the queue, then we add a new tail.
|
||||
tail.next = v;
|
||||
self.tail = v;
|
||||
} else {
|
||||
// No elements in the queue we setup the initial state.
|
||||
self.head = v;
|
||||
self.tail = v;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pushAll(self: *Self, other: Self) void {
|
||||
if (self.tail) |tail| {
|
||||
tail.next = other.head;
|
||||
} else {
|
||||
self.head = other.head;
|
||||
}
|
||||
self.tail = other.tail;
|
||||
}
|
||||
|
||||
/// Dequeue the next element from the queue.
|
||||
pub fn pop(self: *Self) ?*T {
|
||||
// The next element is in "head".
|
||||
const next = self.head orelse return null;
|
||||
|
||||
// If the head and tail are equal this is the last element
|
||||
// so we also set tail to null so we can now be empty.
|
||||
if (self.head == self.tail) self.tail = null;
|
||||
|
||||
// Head is whatever is next (if we're the last element,
|
||||
// this will be null);
|
||||
self.head = next.next;
|
||||
|
||||
// We set the "next" field to null so that this element
|
||||
// can be inserted again.
|
||||
next.next = null;
|
||||
return next;
|
||||
}
|
||||
|
||||
pub fn len(self: Self) usize {
|
||||
var ret: usize = 0;
|
||||
var current = self.head;
|
||||
while (current) |elem| : (current = elem.next) {
|
||||
ret += 1;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/// Returns true if the queue is empty.
|
||||
pub fn empty(self: *const Self) bool {
|
||||
return self.head == null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test SPSC {
|
||||
const testing = std.testing;
|
||||
|
||||
// Types
|
||||
const Elem = struct {
|
||||
const Self = @This();
|
||||
next: ?*Self = null,
|
||||
};
|
||||
const Queue = SPSC(Elem);
|
||||
var q: Queue = .{};
|
||||
try testing.expect(q.empty());
|
||||
|
||||
// Elems
|
||||
var elems: [10]Elem = .{.{}} ** 10;
|
||||
|
||||
// One
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
try testing.expect(!q.empty());
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
try testing.expect(q.pop() == null);
|
||||
try testing.expect(q.empty());
|
||||
|
||||
// Two
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
q.push(&elems[1]);
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
try testing.expect(q.pop().? == &elems[1]);
|
||||
try testing.expect(q.pop() == null);
|
||||
|
||||
// Interleaved
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
q.push(&elems[1]);
|
||||
try testing.expect(q.pop().? == &elems[1]);
|
||||
try testing.expect(q.pop() == null);
|
||||
}
|
||||
|
||||
/// An intrusive MPSC (multi-provider, single consumer) queue implementation.
|
||||
/// The type T must have a field "next" of type `?*T`.
|
||||
///
|
||||
/// This is an implementatin of a Vyukov Queue[1].
|
||||
/// TODO(mitchellh): I haven't audited yet if I got all the atomic operations
|
||||
/// correct. I was short term more focused on getting something that seemed
|
||||
/// to work; I need to make sure it actually works.
|
||||
///
|
||||
/// For those unaware, an intrusive variant of a data structure is one in which
|
||||
/// the data type in the list has the pointer to the next element, rather
|
||||
/// than a higher level "node" or "container" type. The primary benefit
|
||||
/// of this (and the reason we implement this) is that it defers all memory
|
||||
/// management to the caller: the data structure implementation doesn't need
|
||||
/// to allocate "nodes" to contain each element. Instead, the caller provides
|
||||
/// the element and how its allocated is up to them.
|
||||
///
|
||||
/// [1]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue
|
||||
pub fn MPSC(comptime T: type) type {
|
||||
return struct {
|
||||
const Self = @This();
|
||||
|
||||
/// Head is the front of the queue and tail is the back of the queue.
|
||||
head: *T,
|
||||
tail: *T,
|
||||
stub: T,
|
||||
|
||||
/// Initialize the queue. This requires a stable pointer to itself.
|
||||
/// This must be called before the queue is used concurrently.
|
||||
pub fn init(self: *Self) void {
|
||||
self.head = &self.stub;
|
||||
self.tail = &self.stub;
|
||||
self.stub.next = null;
|
||||
}
|
||||
|
||||
/// Push an item onto the queue. This can be called by any number
|
||||
/// of producers.
|
||||
pub fn push(self: *Self, v: *T) void {
|
||||
@atomicStore(?*T, &v.next, null, .unordered);
|
||||
const prev = @atomicRmw(*T, &self.head, .Xchg, v, .acq_rel);
|
||||
@atomicStore(?*T, &prev.next, v, .release);
|
||||
}
|
||||
|
||||
/// Pop the first in element from the queue. This must be called
|
||||
/// by only a single consumer at any given time.
|
||||
pub fn pop(self: *Self) ?*T {
|
||||
var tail = @atomicLoad(*T, &self.tail, .unordered);
|
||||
var next_ = @atomicLoad(?*T, &tail.next, .acquire);
|
||||
if (tail == &self.stub) {
|
||||
const next = next_ orelse return null;
|
||||
@atomicStore(*T, &self.tail, next, .unordered);
|
||||
tail = next;
|
||||
next_ = @atomicLoad(?*T, &tail.next, .acquire);
|
||||
}
|
||||
|
||||
if (next_) |next| {
|
||||
@atomicStore(*T, &self.tail, next, .release);
|
||||
tail.next = null;
|
||||
return tail;
|
||||
}
|
||||
|
||||
const head = @atomicLoad(*T, &self.head, .unordered);
|
||||
if (tail != head) return null;
|
||||
self.push(&self.stub);
|
||||
|
||||
next_ = @atomicLoad(?*T, &tail.next, .acquire);
|
||||
if (next_) |next| {
|
||||
@atomicStore(*T, &self.tail, next, .unordered);
|
||||
tail.next = null;
|
||||
return tail;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
test MPSC {
|
||||
const testing = std.testing;
|
||||
|
||||
// Types
|
||||
const Elem = struct {
|
||||
const Self = @This();
|
||||
next: ?*Self = null,
|
||||
};
|
||||
const Queue = MPSC(Elem);
|
||||
var q: Queue = undefined;
|
||||
q.init();
|
||||
|
||||
// Elems
|
||||
var elems: [10]Elem = .{.{}} ** 10;
|
||||
|
||||
// One
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
try testing.expect(q.pop() == null);
|
||||
|
||||
// Two
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
q.push(&elems[1]);
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
try testing.expect(q.pop().? == &elems[1]);
|
||||
try testing.expect(q.pop() == null);
|
||||
|
||||
// // Interleaved
|
||||
try testing.expect(q.pop() == null);
|
||||
q.push(&elems[0]);
|
||||
try testing.expect(q.pop().? == &elems[0]);
|
||||
q.push(&elems[1]);
|
||||
try testing.expect(q.pop().? == &elems[1]);
|
||||
try testing.expect(q.pop() == null);
|
||||
}
|
||||
|
||||
pub fn ArrayQueue(comptime T: type, comptime size: usize) type {
|
||||
return struct {
|
||||
const Self = @This();
|
||||
|
||||
vals: [size]T = undefined,
|
||||
head: ?usize = null,
|
||||
tail: ?usize = null,
|
||||
|
||||
pub fn len(self: Self) usize {
|
||||
switch (self.state()) {
|
||||
.empty => return 0,
|
||||
.one => return 1,
|
||||
.many => {
|
||||
const head = self.head.?;
|
||||
const tail = self.tail.?;
|
||||
if (tail > head) {
|
||||
return tail - head + 1;
|
||||
}
|
||||
return size - head + tail + 1;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn available(self: Self) usize {
|
||||
return size - self.len();
|
||||
}
|
||||
|
||||
pub fn push(self: *Self, val: T) !void {
|
||||
if (self.len() == size) {
|
||||
return error.QueueFull;
|
||||
}
|
||||
switch (self.state()) {
|
||||
.empty => {
|
||||
self.head = 0;
|
||||
self.tail = 0;
|
||||
self.vals[0] = val;
|
||||
},
|
||||
.one, .many => {
|
||||
const tail = self.tail.?;
|
||||
const new_tail = (tail + 1) % size;
|
||||
self.vals[new_tail] = val;
|
||||
self.tail = new_tail;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pop(self: *Self) ?T {
|
||||
switch (self.state()) {
|
||||
.empty => return null,
|
||||
.one => {
|
||||
const out = self.vals[self.head.?];
|
||||
self.head = null;
|
||||
self.tail = null;
|
||||
return out;
|
||||
},
|
||||
.many => {
|
||||
const out = self.vals[self.head.?];
|
||||
self.head = (self.head.? + 1) % size;
|
||||
return out;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
const State = enum { empty, one, many };
|
||||
inline fn state(self: Self) State {
|
||||
if (self.head == null) return .empty;
|
||||
if (self.head.? == self.tail.?) return .one;
|
||||
return .many;
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -48,6 +48,7 @@ pub fn ArgsTuple(comptime funcT: anytype, comptime ArgsT: ?type) type {
|
||||
}
|
||||
|
||||
pub const Signature = struct {
|
||||
Func: type,
|
||||
FuncT: type,
|
||||
ArgsT: type,
|
||||
ReturnT: type,
|
||||
@ -59,6 +60,9 @@ pub fn FnSignature(comptime func: anytype, comptime argsT_: ?type) Signature {
|
||||
const argsT = ArgsTuple(@TypeOf(func), argsT_);
|
||||
const return_type = @TypeOf(@call(.auto, func, @as(argsT, undefined)));
|
||||
return Signature{
|
||||
.Func = struct {
|
||||
pub const Value = func;
|
||||
},
|
||||
.FuncT = @TypeOf(func),
|
||||
.ArgsT = argsT,
|
||||
.ReturnT = return_type,
|
||||
|
||||
@ -2,3 +2,4 @@ pub const debug = @import("debug.zig");
|
||||
pub const io = @import("io.zig");
|
||||
pub const math = @import("math.zig");
|
||||
pub const meta = @import("meta.zig");
|
||||
pub const queue = @import("queue.zig");
|
||||
|
||||
Loading…
Reference in New Issue
Block a user