From 5b8e42f9a92c7e94362da466ec989f58753e90fb Mon Sep 17 00:00:00 2001 From: Tarry Singh Date: Thu, 11 Jan 2024 15:40:15 +0000 Subject: [PATCH] Vendor zigcoro and unify APIs; rework internals for stdx.meta compatibility, add Channel.try_send/try_recv methods, support dynamically sized channels with comptime capacity, and introduce PoolStackAllocator for coroutine stack allocation. --- MODULE.bazel | 1 - async/BUILD.bazel | 13 +- async/asm/coro_aarch64.s | 49 +++ async/asm/coro_riscv64.s | 77 ++++ async/asm/coro_x86_64.s | 30 ++ async/asm/coro_x86_64_windows.s | 65 ++++ async/{zigcoro.zig => async.zig} | 153 +++----- async/asyncio.zig | 624 +++++++++++++++++++++++++++++++ async/channel.zig | 75 ++++ async/coro.zig | 557 +++++++++++++++++++++++++++ async/coro_base.zig | 71 ++++ async/executor.zig | 209 +++++++++++ async/queue.zig | 101 ----- async/queue_mpsc.zig | 116 ------ async/stack.zig | 83 ++++ stdx/BUILD.bazel | 1 + stdx/meta.zig | 1 + stdx/queue.zig | 305 +++++++++++++++ stdx/signature.zig | 4 + stdx/stdx.zig | 1 + 20 files changed, 2212 insertions(+), 324 deletions(-) create mode 100644 async/asm/coro_aarch64.s create mode 100644 async/asm/coro_riscv64.s create mode 100644 async/asm/coro_x86_64.s create mode 100644 async/asm/coro_x86_64_windows.s rename async/{zigcoro.zig => async.zig} (81%) create mode 100644 async/asyncio.zig create mode 100644 async/channel.zig create mode 100644 async/coro.zig create mode 100644 async/coro_base.zig create mode 100644 async/executor.zig delete mode 100644 async/queue.zig delete mode 100644 async/queue_mpsc.zig create mode 100644 async/stack.zig create mode 100644 stdx/queue.zig diff --git a/MODULE.bazel b/MODULE.bazel index 1dc10d8..77c4d9e 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -103,7 +103,6 @@ bazel_dep(name = "xla", version = "20250103.0-5f1fe6a") tsl = use_extension("@xla//:tsl.bzl", "tsl") use_repo(tsl, "tsl") -bazel_dep(name = "zigcoro", version = "20240829.0-fc1db29") bazel_dep(name = "sentencepiece", version = "20240618.0-d7ace0a") bazel_dep(name = "zig-protobuf", version = "20240722.0-c644d11") bazel_dep(name = "zig-yaml", version = "20240903.0-83d5fdf") diff --git a/async/BUILD.bazel b/async/BUILD.bazel index 8eb40dc..9d6efff 100644 --- a/async/BUILD.bazel +++ b/async/BUILD.bazel @@ -3,15 +3,18 @@ load("@rules_zig//zig:defs.bzl", "zig_library") zig_library( name = "async", srcs = [ - "queue.zig", - "queue_mpsc.zig", + "asyncio.zig", + "channel.zig", + "coro.zig", + "coro_base.zig", + "executor.zig", + "stack.zig", ], - import_name = "async", - main = "zigcoro.zig", + extra_srcs = glob(["asm/*.s"]), + main = "async.zig", visibility = ["//visibility:public"], deps = [ "//stdx", "@libxev//:xev", - "@zigcoro//:libcoro", ], ) diff --git a/async/asm/coro_aarch64.s b/async/asm/coro_aarch64.s new file mode 100644 index 0000000..99485f8 --- /dev/null +++ b/async/asm/coro_aarch64.s @@ -0,0 +1,49 @@ +# See arm64 procedure call standard +# https://github.com/ARM-software/abi-aa/releases +.global _libcoro_stack_swap +_libcoro_stack_swap: +.global libcoro_stack_swap +libcoro_stack_swap: + +# Store caller registers on the current stack +# Each register requires 8 bytes, there are 20 registers to save +sub sp, sp, 0xa0 +# d* are the 128-bit floating point registers, the lower 64 bits are preserved +stp d8, d9, [sp, 0x00] +stp d10, d11, [sp, 0x10] +stp d12, d13, [sp, 0x20] +stp d14, d15, [sp, 0x30] +# x* are the scratch registers +stp x19, x20, [sp, 0x40] +stp x21, x22, [sp, 0x50] +stp x23, x24, [sp, 0x60] +stp x25, x26, [sp, 0x70] +stp x27, x28, [sp, 0x80] +# fp=frame pointer, lr=link register +stp fp, lr, [sp, 0x90] + +# Modify stack pointer of current coroutine (x0, first argument) +mov x2, sp +str x2, [x0, 0] + +# Load stack pointer from target coroutine (x1, second argument) +ldr x9, [x1, 0] +mov sp, x9 + +# Restore target registers +ldp d8, d9, [sp, 0x00] +ldp d10, d11, [sp, -0x10] +ldp d12, d13, [sp, 0x20] +ldp d14, d15, [sp, 0x30] +ldp x19, x20, [sp, 0x40] +ldp x21, x22, [sp, 0x50] +ldp x23, x24, [sp, 0x60] +ldp x25, x26, [sp, 0x70] +ldp x27, x28, [sp, 0x80] +ldp fp, lr, [sp, 0x90] + +# Pop stack frame +add sp, sp, 0xa0 + +# jump to lr +ret diff --git a/async/asm/coro_riscv64.s b/async/asm/coro_riscv64.s new file mode 100644 index 0000000..49cce75 --- /dev/null +++ b/async/asm/coro_riscv64.s @@ -0,0 +1,77 @@ +# See riscv procedure calling convention +# https://github.com/riscv-non-isa/riscv-elf-psabi-doc +.global libcoro_stack_swap +libcoro_stack_swap: + +# Store caller registers on the current stack +# Each register requires 8 bytes, there are 25 registers to save +addi sp, sp, -0xc8 +# s* are integer callee-saved registers +sd s0, 0x00(sp) +sd s1, 0x08(sp) +sd s2, 0x10(sp) +sd s3, 0x18(sp) +sd s4, 0x20(sp) +sd s5, 0x28(sp) +sd s6, 0x30(sp) +sd s7, 0x38(sp) +sd s8, 0x40(sp) +sd s9, 0x48(sp) +sd s10, 0x50(sp) +sd s11, 0x58(sp) +# fs* are float callee-saved registers +fsd fs0, 0x60(sp) +fsd fs1, 0x68(sp) +fsd fs2, 0x70(sp) +fsd fs3, 0x78(sp) +fsd fs4, 0x80(sp) +fsd fs5, 0x88(sp) +fsd fs6, 0x90(sp) +fsd fs7, 0x98(sp) +fsd fs8, 0xa0(sp) +fsd fs9, 0xa8(sp) +fsd fs10, 0xb0(sp) +fsd fs11, 0xb8(sp) +# ra=return address +sd ra, 0xc0(sp) + +# Modify stack pointer of current coroutine (a0, first argument) +mv t0, sp +sd t0, 0(a0) + +# Load stack pointer from target coroutine (a1, second argument) +ld t1, 0(a1) +mv sp, t1 + +# Restore +ld s0, 0x00(sp) +ld s1, 0x08(sp) +ld s2, 0x10(sp) +ld s3, 0x18(sp) +ld s4, 0x20(sp) +ld s5, 0x28(sp) +ld s6, 0x30(sp) +ld s7, 0x38(sp) +ld s8, 0x40(sp) +ld s9, 0x48(sp) +ld s10, 0x50(sp) +ld s11, 0x58(sp) +fld fs0, 0x60(sp) +fld fs1, 0x68(sp) +fld fs2, 0x70(sp) +fld fs3, 0x78(sp) +fld fs4, 0x80(sp) +fld fs5, 0x88(sp) +fld fs6, 0x90(sp) +fld fs7, 0x98(sp) +fld fs8, 0xa0(sp) +fld fs9, 0xa8(sp) +fld fs10, 0xb0(sp) +fld fs11, 0xb8(sp) +ld ra, 0xc0(sp) + +# Pop stack frame +addi sp, sp, 0xc8 + +# jump to ra +ret diff --git a/async/asm/coro_x86_64.s b/async/asm/coro_x86_64.s new file mode 100644 index 0000000..d7cd3e8 --- /dev/null +++ b/async/asm/coro_x86_64.s @@ -0,0 +1,30 @@ +# See System V x86-64 calling convention +# https://gitlab.com/x86-psABIs/x86-64-ABI +.global _libcoro_stack_swap +_libcoro_stack_swap: +.global libcoro_stack_swap +libcoro_stack_swap: +# Store caller registers on the current stack +pushq %rbp +pushq %rbx +pushq %r12 +pushq %r13 +pushq %r14 +pushq %r15 + +# Modify stack pointer of current coroutine (rdi, first argument) +movq %rsp, (%rdi) + +# Load stack pointer from target coroutine (rsi, second argument) +movq (%rsi), %rsp + +# Restore target registers +popq %r15 +popq %r14 +popq %r13 +popq %r12 +popq %rbx +popq %rbp + +# jump +retq diff --git a/async/asm/coro_x86_64_windows.s b/async/asm/coro_x86_64_windows.s new file mode 100644 index 0000000..d38e6e6 --- /dev/null +++ b/async/asm/coro_x86_64_windows.s @@ -0,0 +1,65 @@ +# See Microsoft x86-64 calling convention +# https://learn.microsoft.com/en-us/cpp/build/x64-calling-convention +.global libcoro_stack_swap +libcoro_stack_swap: +# Store Windows stack information +pushq %gs:0x10 +pushq %gs:0x08 + +# Store caller registers +pushq %rbp +pushq %rbx +pushq %rdi +pushq %rsi +pushq %r12 +pushq %r13 +pushq %r14 +pushq %r15 + +# Store caller simd/float registers +subq $0xa0, %rsp +movups %xmm6, 0x00(%rsp) +movups %xmm7, 0x10(%rsp) +movups %xmm8, 0x20(%rsp) +movups %xmm9, 0x30(%rsp) +movups %xmm10, 0x40(%rsp) +movups %xmm11, 0x50(%rsp) +movups %xmm12, 0x60(%rsp) +movups %xmm13, 0x70(%rsp) +movups %xmm14, 0x80(%rsp) +movups %xmm15, 0x90(%rsp) + +# Modify stack pointer of current coroutine (rcx, first argument) +movq %rsp, (%rcx) + +# Load stack pointer from target coroutine (rdx, second argument) +movq (%rdx), %rsp + +# Restore target simd/float registers +movups 0x00(%rsp), %xmm6 +movups 0x10(%rsp), %xmm7 +movups 0x20(%rsp), %xmm8 +movups 0x30(%rsp), %xmm9 +movups 0x40(%rsp), %xmm10 +movups 0x50(%rsp), %xmm11 +movups 0x60(%rsp), %xmm12 +movups 0x70(%rsp), %xmm13 +movups 0x80(%rsp), %xmm14 +movups 0x90(%rsp), %xmm15 +addq $0xa0, %rsp + +# Restore target registers +popq %r15 +popq %r14 +popq %r13 +popq %r12 +popq %rsi +popq %rdi +popq %rbx +popq %rbp + +# Restore Windows stack information +popq %gs:0x08 +popq %gs:0x10 + +retq diff --git a/async/zigcoro.zig b/async/async.zig similarity index 81% rename from async/zigcoro.zig rename to async/async.zig index 1f8fe93..20effdb 100644 --- a/async/zigcoro.zig +++ b/async/async.zig @@ -1,52 +1,29 @@ const std = @import("std"); const stdx = @import("stdx"); const xev = @import("xev"); -const libcoro = @import("libcoro"); -const aio = libcoro.asyncio; -const queue_mpsc = @import("queue_mpsc.zig"); - -pub const Queue = @import("queue.zig").Intrusive; +const coro = @import("coro.zig"); +const executor = @import("executor.zig"); +const channel_mod = @import("channel.zig"); +const aio = @import("asyncio.zig"); +const stack = @import("stack.zig"); pub const Condition = struct { - const CoroResume = struct { - coro: libcoro.Frame, - - fn init() CoroResume { - return .{ .coro = libcoro.xframe() }; - } - - fn func(self: *CoroResume) libcoro.Executor.Func { - return .{ .func = CoroResume.cb, .userdata = self }; - } - - fn cb(ud: ?*anyopaque) void { - const self: *CoroResume = @ptrCast(@alignCast(ud)); - libcoro.xresume(self.coro); - } - }; - - exec: *libcoro.Executor, - waiters: Queue(libcoro.Executor.Func) = .{}, + inner: executor.Condition, pub fn init() Condition { - return .{ .exec = &AsyncThread.current.executor.exec }; + return .{ .inner = executor.Condition.init(&AsyncThread.current.executor.exec) }; } pub fn broadcast(self: *Condition) void { - while (self.waiters.pop()) |waiter| { - self.exec.runSoon(waiter); - } + self.inner.broadcast(); } pub fn signal(self: *Condition) void { - if (self.waiters.pop()) |waiter| self.exec.runSoon(waiter); + self.inner.signal(); } pub fn wait(self: *Condition) void { - var res = CoroResume.init(); - var cb = res.func(); - self.waiters.push(&cb); - libcoro.xsuspend(); + self.inner.wait(); } }; @@ -63,7 +40,7 @@ pub fn FrameEx(comptime func: anytype, comptime argsT: type) type { fn FrameExx(comptime func: anytype, comptime argsT: type, comptime returnT: type) type { return struct { const Self = @This(); - const FrameT = libcoro.FrameT(func, .{ .ArgsT = argsT }); + const FrameT = coro.FrameT(func, argsT); inner: FrameT, @@ -71,18 +48,20 @@ fn FrameExx(comptime func: anytype, comptime argsT: type, comptime returnT: type pub const await_ = awaitt; pub fn awaitt(self: *Self) returnT { defer { + AsyncThread.current.stack_allocator.destroy(&self.inner._frame.stack); self.inner.deinit(); self.* = undefined; } - return libcoro.xawait(self.inner); + return coro.xawait(self.inner); } }; } pub fn asyncc(comptime func: anytype, args: anytype) !FrameEx(func, @TypeOf(args)) { const Signature = stdx.meta.FnSignature(func, @TypeOf(args)); + const new_stack = try AsyncThread.current.stack_allocator.create(); return .{ - .inner = try aio.xasync(func, @as(Signature.ArgsT, args), null), + .inner = try aio.xasync(func, @as(Signature.ArgsT, args), new_stack), }; } @@ -120,12 +99,12 @@ pub fn sleep(ms: u64) !void { pub const threading = struct { const Waiter = struct { - frame: libcoro.Frame, + frame: coro.Frame, thread: *const AsyncThread, next: ?*Waiter = null, }; - const WaiterQueue = queue_mpsc.Intrusive(Waiter); + const WaiterQueue = stdx.queue.MPSC(Waiter); pub const ResetEventSingle = struct { const State = union(enum) { @@ -140,7 +119,7 @@ pub const threading = struct { waiter: std.atomic.Value(*const State) = std.atomic.Value(*const State).init(&State.unset_state), pub fn isSet(self: *ResetEventSingle) bool { - return self.waiter.load(&State.set_state, .monotonic) == &State.set_state; + return self.waiter.load(.monotonic) == &State.set_state; } pub fn reset(self: *ResetEventSingle) void { @@ -159,72 +138,26 @@ pub const threading = struct { pub fn wait(self: *ResetEventSingle) void { var waiter: Waiter = .{ - .frame = libcoro.xframe(), + .frame = coro.xframe(), .thread = AsyncThread.current, }; var new_state: State = .{ .waiting = &waiter, }; if (self.waiter.cmpxchgStrong(&State.unset_state, &new_state, .monotonic, .monotonic) == null) { - libcoro.xsuspend(); + while (self.isSet() == false) { + coro.xsuspend(); + } } } }; }; -pub const FrameAllocator = struct { - const Item = [1 * 1024 * 1024]u8; - const FramePool = std.heap.MemoryPool(Item); - - pool: FramePool, - - pub fn init(allocator_: std.mem.Allocator) !FrameAllocator { - return .{ - .pool = FramePool.init(allocator_), - }; - } - - pub fn allocator(self: *FrameAllocator) std.mem.Allocator { - return .{ - .ptr = self, - .vtable = &.{ - .alloc = alloc, - .resize = resize, - .free = free, - }, - }; - } - - fn alloc(ctx: *anyopaque, len: usize, ptr_align: u8, ret_addr: usize) ?[*]u8 { - _ = ptr_align; - _ = ret_addr; - stdx.debug.assert(len <= Item.len, "Should always pass a length of less than {d} bytes", .{Item.len}); - const self: *FrameAllocator = @ptrCast(@alignCast(ctx)); - const stack = self.pool.create() catch return null; - return @ptrCast(stack); - } - - fn resize(ctx: *anyopaque, buf: []u8, buf_align: u8, new_len: usize, ret_addr: usize) bool { - _ = ctx; - _ = buf; - _ = buf_align; - _ = ret_addr; - return new_len <= Item.len; - } - - fn free(ctx: *anyopaque, buf: []u8, buf_align: u8, ret_addr: usize) void { - _ = buf_align; - _ = ret_addr; - const self: *FrameAllocator = @ptrCast(@alignCast(ctx)); - const v: *align(8) Item = @ptrCast(@alignCast(buf.ptr)); - self.pool.destroy(v); - } -}; - pub const AsyncThread = struct { threadlocal var current: *const AsyncThread = undefined; executor: *aio.Executor, + stack_allocator: *stack.StackAllocator, loop: *xev.Loop, thread_pool: *xev.ThreadPool, async_notifier: *xev.Async, @@ -236,7 +169,7 @@ pub const AsyncThread = struct { fn waker_cb(q: ?*threading.WaiterQueue, _: *xev.Loop, _: *xev.Completion, _: xev.Async.WaitError!void) xev.CallbackAction { while (q.?.pop()) |waiter| { - libcoro.xresume(waiter.frame); + coro.xresume(waiter.frame); } return .rearm; } @@ -251,10 +184,9 @@ pub const AsyncThread = struct { var loop = try xev.Loop.init(.{ .thread_pool = &thread_pool, }); - defer loop.deinit(); - var executor = aio.Executor.init(&loop); + var executor_ = aio.Executor.init(&loop); var async_notifier = try xev.Async.init(); defer async_notifier.deinit(); @@ -265,20 +197,23 @@ pub const AsyncThread = struct { var c: xev.Completion = undefined; async_notifier.wait(&loop, &c, threading.WaiterQueue, &waiters_queue, &waker_cb); - aio.initEnv(.{ - .stack_allocator = allocator, - .default_stack_size = 1 * 1024 * 1024, - }); + var stack_allocator = stack.StackAllocator.init(allocator); + defer stack_allocator.deinit(); AsyncThread.current = &.{ - .executor = &executor, + .executor = &executor_, + .stack_allocator = &stack_allocator, .loop = &loop, .thread_pool = &thread_pool, .async_notifier = &async_notifier, .waiters_queue = &waiters_queue, }; - return try aio.run(&executor, mainFunc, .{}, null); + // allocate the main coroutine stack, on the current thread's stack! + var mainStackData: stack.Stack.Data = undefined; + const mainStack = stack.Stack.init(&mainStackData); + + return try aio.run(&executor_, mainFunc, .{}, mainStack); } }; @@ -530,7 +465,7 @@ pub const Socket = struct { pub fn Channel(comptime T: type, capacity: usize) type { return struct { const Self = @This(); - const Inner = libcoro.Channel(T, .{ .capacity = capacity }); + const Inner = channel_mod.Channel(T, capacity); inner: Inner, @@ -538,10 +473,18 @@ pub fn Channel(comptime T: type, capacity: usize) type { return .{ .inner = Inner.init(&AsyncThread.current.executor.exec) }; } + pub fn initWithLen(len: usize) Self { + return .{ .inner = Inner.initWithLen(&AsyncThread.current.executor.exec, len) }; + } + pub fn close(self: *Self) void { self.inner.close(); } + pub fn try_send(self: *Self, val: T) bool { + return self.inner.try_send(val); + } + pub fn send(self: *Self, val: T) void { self.inner.send(val) catch unreachable; } @@ -549,11 +492,19 @@ pub fn Channel(comptime T: type, capacity: usize) type { pub fn recv(self: *Self) ?T { return self.inner.recv(); } + + pub fn try_recv(self: *Self) ?T { + return self.inner.try_recv(); + } }; } +pub fn channel(comptime T: type, len: usize, comptime capacity: usize) Channel(T, capacity) { + return Channel(T, capacity).initWithLen(len); +} + pub const Mutex = struct { - const VoidChannel = libcoro.Channel(void, .{ .capacity = 1 }); + const VoidChannel = coro.Channel(void, 1); inner: VoidChannel, diff --git a/async/asyncio.zig b/async/asyncio.zig new file mode 100644 index 0000000..d83fb6b --- /dev/null +++ b/async/asyncio.zig @@ -0,0 +1,624 @@ +const std = @import("std"); +const stdx = @import("stdx"); +const xev = @import("xev"); +const libcoro = @import("coro.zig"); +const CoroExecutor = @import("executor.zig").Executor; + +pub const xasync = libcoro.xasync; +pub const xawait = libcoro.xawait; + +const Frame = libcoro.Frame; + +const Env = struct { + exec: ?*Executor = null, +}; + +pub const EnvArg = struct { + executor: ?*Executor = null, + stack_allocator: ?std.mem.Allocator = null, + default_stack_size: ?usize = null, +}; + +threadlocal var env: Env = .{}; + +pub fn initEnv(e: EnvArg) void { + env = .{ .exec = e.executor }; + libcoro.initEnv(.{ + .stack_allocator = e.stack_allocator, + .default_stack_size = e.default_stack_size, + .executor = if (e.executor) |ex| &ex.exec else null, + }); +} + +pub const Executor = struct { + loop: *xev.Loop, + exec: CoroExecutor = .{}, + + pub fn init(loop: *xev.Loop) Executor { + return .{ .loop = loop }; + } + + fn tick(self: *Executor) !void { + try self.loop.run(.once); + _ = self.exec.tick(); + } +}; + +/// Run a coroutine to completion. +/// Must be called from "root", outside of any created coroutine. +pub fn run( + exec: *Executor, + comptime func: anytype, + args: anytype, + stack: anytype, +) !stdx.meta.FnSignature(func, @TypeOf(args)).ReturnPayloadT { + stdx.debug.assert(libcoro.inCoro() == false, "Not in a corouine", .{}); + const frame = try xasync(func, args, stack); + defer frame.deinit(); + try runCoro(exec, frame); + return xawait(frame); +} + +/// Run a coroutine to completion. +/// Must be called from "root", outside of any created coroutine. +fn runCoro(exec: *Executor, frame: anytype) !void { + const f = frame.frame(); + if (f.status == .Start) { + libcoro.xresume(f); + } + while (f.status != .Done) { + try exec.tick(); + } +} + +const SleepResult = xev.Timer.RunError!void; + +pub fn sleep(exec: *Executor, ms: u64) !void { + const loop = exec.loop; + const Data = XCallback(SleepResult); + + var data = Data.init(); + const w = try xev.Timer.init(); + defer w.deinit(); + var c: xev.Completion = .{}; + w.run(loop, &c, ms, Data, &data, &Data.callback); + + try waitForCompletion(exec, &c); + + return data.result; +} + +fn waitForCompletionOutsideCoro(exec: *Executor, c: *xev.Completion) !void { + @setCold(true); + while (c.state() != .dead) { + try exec.tick(); + } +} + +fn waitForCompletionInCoro(c: *xev.Completion) void { + while (c.state() != .dead) { + libcoro.xsuspend(); + } +} + +fn waitForCompletion(exec: *Executor, c: *xev.Completion) !void { + if (libcoro.inCoro()) { + waitForCompletionInCoro(c); + } else { + try waitForCompletionOutsideCoro(exec, c); + } +} + +pub const TCP = struct { + const Self = @This(); + + exec: *Executor, + tcp: xev.TCP, + + pub usingnamespace Stream(Self, xev.TCP, .{ + .close = true, + .read = .recv, + .write = .send, + }); + + pub fn init(exec: *Executor, tcp: xev.TCP) Self { + return .{ .exec = exec, .tcp = tcp }; + } + + fn stream(self: Self) xev.TCP { + return self.tcp; + } + + pub fn accept(self: Self) !Self { + const AcceptResult = xev.TCP.AcceptError!xev.TCP; + const Data = XCallback(AcceptResult); + + const loop = self.exec.loop; + + var data = Data.init(); + var c: xev.Completion = .{}; + self.tcp.accept(loop, &c, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + const result = try data.result; + return .{ .exec = self.exec, .tcp = result }; + } + + const ConnectResult = xev.TCP.ConnectError!void; + pub fn connect(self: Self, addr: std.net.Address) !void { + const ResultT = ConnectResult; + const Data = struct { + result: ResultT = undefined, + frame: ?Frame = null, + + fn callback( + userdata: ?*@This(), + l: *xev.Loop, + c: *xev.Completion, + s: xev.TCP, + result: ResultT, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = s; + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; + + var data: Data = .{ .frame = libcoro.xframe() }; + const loop = self.exec.loop; + var c: xev.Completion = .{}; + self.tcp.connect(loop, &c, addr, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } + + const ShutdownResult = xev.TCP.ShutdownError!void; + pub fn shutdown(self: Self) ShutdownResult { + const ResultT = ShutdownResult; + const Data = struct { + result: ResultT = undefined, + frame: ?Frame = null, + + fn callback( + userdata: ?*@This(), + l: *xev.Loop, + c: *xev.Completion, + s: xev.TCP, + result: ResultT, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = s; + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; + + var data: Data = .{ .frame = libcoro.xframe() }; + const loop = self.exec.loop; + var c: xev.Completion = .{}; + self.tcp.shutdown(loop, &c, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } +}; + +fn Stream(comptime T: type, comptime StreamT: type, comptime options: xev.stream.Options) type { + return struct { + pub usingnamespace if (options.close) Closeable(T, StreamT) else struct {}; + pub usingnamespace if (options.read != .none) Readable(T, StreamT) else struct {}; + pub usingnamespace if (options.write != .none) Writeable(T, StreamT) else struct {}; + }; +} + +fn Closeable(comptime T: type, comptime StreamT: type) type { + return struct { + const Self = T; + const CloseResult = xev.CloseError!void; + pub fn close(self: Self) !void { + const ResultT = CloseResult; + const Data = struct { + result: ResultT = undefined, + frame: ?Frame = null, + + fn callback( + userdata: ?*@This(), + l: *xev.Loop, + c: *xev.Completion, + s: StreamT, + result: ResultT, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = s; + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; + + var data: Data = .{ .frame = libcoro.xframe() }; + + const loop = self.exec.loop; + var c: xev.Completion = .{}; + self.stream().close(loop, &c, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } + }; +} + +fn Readable(comptime T: type, comptime StreamT: type) type { + return struct { + const Self = T; + const ReadResult = xev.ReadError!usize; + pub fn read(self: Self, buf: xev.ReadBuffer) !usize { + const ResultT = ReadResult; + const Data = struct { + result: ResultT = undefined, + frame: ?Frame = null, + + fn callback( + userdata: ?*@This(), + l: *xev.Loop, + c: *xev.Completion, + s: StreamT, + b: xev.ReadBuffer, + result: ResultT, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = s; + _ = b; + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; + + var data: Data = .{ .frame = libcoro.xframe() }; + + const loop = self.exec.loop; + var c: xev.Completion = .{}; + self.stream().read(loop, &c, buf, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } + }; +} + +fn Writeable(comptime T: type, comptime StreamT: type) type { + return struct { + const Self = T; + const WriteResult = xev.WriteError!usize; + pub fn write(self: Self, buf: xev.WriteBuffer) !usize { + const ResultT = WriteResult; + const Data = struct { + result: ResultT = undefined, + frame: ?Frame = null, + + fn callback( + userdata: ?*@This(), + l: *xev.Loop, + c: *xev.Completion, + s: StreamT, + b: xev.WriteBuffer, + result: ResultT, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = s; + _ = b; + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; + + var data: Data = .{ .frame = libcoro.xframe() }; + + const loop = self.exec.loop; + var c: xev.Completion = .{}; + self.stream().write(loop, &c, buf, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + return data.result; + } + }; +} + +pub const File = struct { + const Self = @This(); + + exec: *Executor, + file: xev.File, + + pub usingnamespace Stream(Self, xev.File, .{ + .close = true, + .read = .read, + .write = .write, + .threadpool = true, + }); + + pub fn init(exec: *Executor, file: xev.File) Self { + return .{ .exec = exec, .file = file }; + } + + fn stream(self: Self) xev.File { + return self.file; + } + + const PReadResult = xev.ReadError!usize; + pub fn pread(self: Self, buf: xev.ReadBuffer, offset: u64) !usize { + const ResultT = PReadResult; + const Data = struct { + result: ResultT = undefined, + frame: ?Frame = null, + + fn callback( + userdata: ?*@This(), + l: *xev.Loop, + c: *xev.Completion, + s: xev.File, + b: xev.ReadBuffer, + result: ResultT, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = s; + _ = b; + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; + + var data: Data = .{ .frame = libcoro.xframe() }; + + const loop = self.exec.loop; + var c: xev.Completion = .{}; + self.file.pread(loop, &c, buf, offset, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } + + const PWriteResult = xev.WriteError!usize; + pub fn pwrite(self: Self, buf: xev.WriteBuffer, offset: u64) !usize { + const ResultT = PWriteResult; + const Data = struct { + result: ResultT = undefined, + frame: ?Frame = null, + + fn callback( + userdata: ?*@This(), + l: *xev.Loop, + c: *xev.Completion, + s: xev.File, + b: xev.WriteBuffer, + result: ResultT, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = s; + _ = b; + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; + + var data: Data = .{ .frame = libcoro.xframe() }; + + const loop = self.exec.loop; + var c: xev.Completion = .{}; + self.file.pwrite(loop, &c, buf, offset, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } +}; + +pub const Process = struct { + const Self = @This(); + + exec: *Executor, + p: xev.Process, + + pub fn init(exec: *Executor, p: xev.Process) Self { + return .{ .exec = exec, .p = p }; + } + + const WaitResult = xev.Process.WaitError!u32; + pub fn wait(self: Self) !u32 { + const Data = XCallback(WaitResult); + var c: xev.Completion = .{}; + var data = Data.init(); + const loop = self.exec.loop; + self.p.wait(loop, &c, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } +}; + +pub const AsyncNotification = struct { + const Self = @This(); + + exec: *Executor, + notif: xev.Async, + + pub fn init(exec: *Executor, notif: xev.Async) Self { + return .{ .exec = exec, .notif = notif }; + } + + const WaitResult = xev.Async.WaitError!void; + pub fn wait(self: Self) !void { + const Data = XCallback(WaitResult); + + const loop = self.exec.loop; + var c: xev.Completion = .{}; + var data = Data.init(); + + self.notif.wait(loop, &c, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } +}; + +pub const UDP = struct { + const Self = @This(); + + exec: *Executor, + udp: xev.UDP, + + pub usingnamespace Stream(Self, xev.UDP, .{ + .close = true, + .read = .none, + .write = .none, + }); + + pub fn init(exec: *Executor, udp: xev.UDP) Self { + return .{ .exec = exec, .udp = udp }; + } + + pub fn stream(self: Self) xev.UDP { + return self.udp; + } + + const ReadResult = xev.ReadError!usize; + pub fn read(self: Self, buf: xev.ReadBuffer) !usize { + const ResultT = ReadResult; + const Data = struct { + result: ResultT = undefined, + frame: ?Frame = null, + + fn callback( + userdata: ?*@This(), + l: *xev.Loop, + c: *xev.Completion, + s: *xev.UDP.State, + addr: std.net.Address, + udp: xev.UDP, + b: xev.ReadBuffer, + result: ResultT, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = s; + _ = addr; + _ = udp; + _ = b; + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; + + const loop = self.exec.loop; + var s: xev.UDP.State = undefined; + var c: xev.Completion = .{}; + var data: Data = .{ .frame = libcoro.xframe() }; + self.udp.read(loop, &c, &s, buf, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } + + const WriteResult = xev.WriteError!usize; + pub fn write(self: Self, addr: std.net.Address, buf: xev.WriteBuffer) !usize { + const ResultT = WriteResult; + const Data = struct { + result: ResultT = undefined, + frame: ?Frame = null, + + fn callback( + userdata: ?*@This(), + l: *xev.Loop, + c: *xev.Completion, + s: *xev.UDP.State, + udp: xev.UDP, + b: xev.WriteBuffer, + result: ResultT, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = s; + _ = udp; + _ = b; + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; + + const loop = self.exec.loop; + var s: xev.UDP.State = undefined; + var c: xev.Completion = .{}; + var data: Data = .{ .frame = libcoro.xframe() }; + self.udp.write(loop, &c, &s, addr, buf, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } +}; + +fn RunT(comptime Func: anytype) type { + const T = @typeInfo(@TypeOf(Func)).Fn.return_type.?; + return switch (@typeInfo(T)) { + .ErrorUnion => |E| E.payload, + else => T, + }; +} + +fn XCallback(comptime ResultT: type) type { + return struct { + frame: ?Frame = null, + result: ResultT = undefined, + + fn init() @This() { + return .{ .frame = libcoro.xframe() }; + } + + fn callback( + userdata: ?*@This(), + _: *xev.Loop, + _: *xev.Completion, + result: ResultT, + ) xev.CallbackAction { + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; +} diff --git a/async/channel.zig b/async/channel.zig new file mode 100644 index 0000000..2904d0f --- /dev/null +++ b/async/channel.zig @@ -0,0 +1,75 @@ +const std = @import("std"); +const stdx = @import("stdx"); +const executor = @import("executor.zig"); + +pub fn Channel(comptime T: type, comptime capacity: usize) type { + const Storage = stdx.queue.ArrayQueue(T, capacity); + + return struct { + const Self = @This(); + + q: Storage = .{}, + closed: bool = false, + len: usize = capacity, + space_notif: executor.Condition, + value_notif: executor.Condition, + + pub fn init(exec: *executor.Executor) Self { + return initWithLen(exec, capacity); + } + + pub fn initWithLen(exec: *executor.Executor, len: usize) Self { + return .{ + .len = len, + .space_notif = executor.Condition.init(exec), + .value_notif = executor.Condition.init(exec), + }; + } + + pub fn close(self: *Self) void { + self.closed = true; + self.value_notif.signal(); + } + + pub fn try_send(self: *Self, val: T) bool { + stdx.debug.assert(self.closed == false, "cannot send on closed Channel", .{}); + if (self.q.len() >= self.len) { + return false; + } + self.q.push(val) catch stdx.debug.panic("tried to send on full Channel. This shouldn't happen.", .{}); + self.value_notif.signal(); + return true; + } + + pub fn send(self: *Self, val: T) void { + stdx.debug.assert(self.closed == false, "cannot send on closed Channel", .{}); + while (self.q.len() >= self.len) { + self.space_notif.wait(); + } + self.q.push(val) catch stdx.debug.panic("tried to send on full Channel. This shouldn't happen.", .{}); + self.value_notif.signal(); + } + + pub fn recv(self: *Self) ?T { + while (self.closed == false or self.q.len() > 0) { + if (self.q.pop()) |val| { + self.space_notif.signal(); + return val; + } + self.value_notif.wait(); + } + return null; + } + + pub fn try_recv(self: *Self) ?T { + if (self.closed == true) { + return null; + } + if (self.q.pop()) |val| { + self.space_notif.signal(); + return val; + } + return null; + } + }; +} diff --git a/async/coro.zig b/async/coro.zig new file mode 100644 index 0000000..c77405d --- /dev/null +++ b/async/coro.zig @@ -0,0 +1,557 @@ +//! libcoro mutable state: +//! * ThreadState +//! * current_coro: set in ThreadState.switchTo +//! * next_coro_id: set in ThreadState.nextCoroId +//! * suspend_block: set in xsuspendBlock, cleared in ThreadState.switchIn +//! * Coro +//! * resumer: set in ThreadState.switchTo +//! * status: +//! * Active, Suspended: set in ThreadState.switchTo +//! * Done: set in runcoro +//! * id.invocation: incremented in ThreadState.switchTo +const std = @import("std"); +const stdx = @import("stdx"); +const builtin = @import("builtin"); +const base = @import("coro_base.zig"); +const stack = @import("stack.zig"); +const Executor = @import("executor.zig").Executor; + +const log = std.log.scoped(.@"zml/async"); + +// Public API +// ============================================================================ +pub const Error = error{ + StackTooSmall, + StackOverflow, + SuspendFromMain, +}; +pub const StackT = stack.Stack; + +pub const Frame = *Coro; + +/// Await the coroutine(s). +/// frame: FrameT: runs the coroutine until done and returns its return value. +pub fn xawait(frame: anytype) @TypeOf(frame).Signature.ReturnT { + const f = frame.frame(); + while (f.status != .Done) xsuspend(); + std.debug.assert(f.status == .Done); + return frame.xreturned(); +} + +/// Create a coroutine and start it +/// stack is {null, usize, StackT}. If null or usize, initEnv must have been +/// called with a default stack allocator. +pub fn xasync(func: anytype, args: anytype, stack_: stack.Stack) !FrameT(func, @TypeOf(args)) { + const FrameType = CoroT.fromFunc(func, @TypeOf(args)); + const framet = try FrameType.init(args, stack_); + const frame = framet.frame(); + xresume(frame); + return FrameType.wrap(frame); +} + +pub const FrameT = CoroT.fromFunc; + +/// True if within a coroutine, false if at top-level. +pub fn inCoro() bool { + return thread_state.inCoro(); +} + +/// Returns the currently running coroutine +pub fn xframe() Frame { + return thread_state.current(); +} + +/// Resume the passed coroutine, suspending the current coroutine. +/// When the resumed coroutine suspends, this call will return. +/// Note: When the resumed coroutine returns, control will switch to its parent +/// (i.e. its original resumer). +/// frame: Frame or FrameT +pub fn xresume(frame: anytype) void { + const f = frame.frame(); + thread_state.switchIn(f); +} + +/// Suspend the current coroutine, yielding control back to its +/// resumer. Returns when the coroutine is resumed. +/// Must be called from within a coroutine (i.e. not the top level). +pub fn xsuspend() void { + xsuspendSafe() catch |e| { + log.err("{any}\n", .{e}); + @panic("xsuspend"); + }; +} + +pub fn xsuspendBlock(comptime func: anytype, args: anytype) void { + const Signature = stdx.meta.FnSignature(func, @TypeOf(args)); + const Callback = struct { + func: *const Signature.FuncT, + args: Signature.ArgsT, + fn cb(ud: ?*anyopaque) void { + const self: *@This() = @ptrCast(@alignCast(ud)); + @call(.auto, self.func, self.args); + } + }; + var cb = Callback{ .func = func, .args = args }; + thread_state.suspend_block = .{ .func = Callback.cb, .data = @ptrCast(&cb) }; + xsuspend(); +} + +pub fn xsuspendSafe() Error!void { + if (thread_state.current_coro == null) { + return Error.SuspendFromMain; + } + const coro = thread_state.current_coro.?; + try StackOverflow.check(coro); + thread_state.switchOut(coro.resumer); +} + +const Coro = struct { + /// Coroutine status + const Status = enum { + Start, + Suspended, + Active, + Done, + }; + const Signature = VoidSignature; + + /// Function to run in the coroutine + func: *const fn () void, + /// Coroutine stack + stack: stack.Stack, + /// Architecture-specific implementation + impl: base.Coro, + /// The coroutine that will be yielded to upon suspend + resumer: *Coro = undefined, + /// Current status + status: Status = .Start, + /// Coro id, {thread, coro id, invocation id} + id: CoroId.InvocationId, + /// Caller-specified coro-local storage + storage: ?*anyopaque = null, + + fn init(func: *const fn () void, stack_: stack.Stack, storage: ?*anyopaque) !Frame { + return initFromStack(func, stack_, storage); + } + + pub fn deinit(self: Coro) void { + _ = self; // autofix + } + + fn initFromStack(func: *const fn () void, stack_: stack.Stack, storage: ?*anyopaque) !Frame { + // try StackOverflow.setMagicNumber(stack.full); + var stack__ = stack_; + const coro = try stack__.push(Coro); + const base_coro = try base.Coro.init(&runcoro, stack__.remaining()); + coro.* = .{ + .func = func, + .impl = base_coro, + .stack = stack__, + .storage = storage, + .id = thread_state.newCoroId(), + }; + return coro; + } + + pub fn frame(self: *Coro) Frame { + return self; + } + + fn runcoro(from: *base.Coro, this: *base.Coro) callconv(.C) noreturn { + const from_coro: *Coro = @fieldParentPtr("impl", from); + const this_coro: *Coro = @fieldParentPtr("impl", this); + log.debug("coro start {any}\n", .{this_coro.id}); + @call(.auto, this_coro.func, .{}); + this_coro.status = .Done; + log.debug("coro done {any}\n", .{this_coro.id}); + thread_state.switchOut(from_coro); + + // Never returns + stdx.debug.panic("Cannot resume an already completed coroutine {any}", .{this_coro.id}); + } + + pub fn getStorage(self: Coro, comptime T: type) *T { + return @ptrCast(@alignCast(self.storage)); + } + + pub fn format(self: Coro, comptime fmt: []const u8, options: std.fmt.FormatOptions, writer: anytype) !void { + _ = fmt; + _ = options; + try writer.print("Coro{{.id = {any}, .status = {s}}}", .{ + self.id, + @tagName(self.status), + }); + } +}; + +const VoidSignature = CoroT.Signature.init((struct { + fn func() void {} +}).func, .{}); + +const CoroT = struct { + fn fromFunc(comptime Func: anytype, comptime ArgsT: ?type) type { + return fromSig(stdx.meta.FnSignature(Func, ArgsT)); + } + + fn fromSig(comptime Sig: stdx.meta.Signature) type { + // Stored in the coro stack + const InnerStorage = struct { + args: Sig.ArgsT, + /// Values that are produced during coroutine execution + retval: Sig.ReturnT = undefined, + }; + + return struct { + const Self = @This(); + pub const Signature = Sig; + + _frame: Frame, + + /// Create a Coro + /// self and stack pointers must remain stable for the lifetime of + /// the coroutine. + fn init(args: Sig.ArgsT, stack_: StackT) !Self { + var coro_stack = stack_; + const inner = try coro_stack.push(InnerStorage); + inner.* = .{ + .args = args, + }; + return .{ ._frame = try Coro.initFromStack(wrapfn, coro_stack, inner) }; + } + + pub fn wrap(_frame: Frame) Self { + return .{ ._frame = _frame }; + } + + pub fn deinit(self: Self) void { + self._frame.deinit(); + } + + pub fn status(self: Self) Coro.Status { + return self._frame.status; + } + + pub fn frame(self: Self) Frame { + return self._frame; + } + + // Coroutine functions. + // + // When considering basic coroutine execution, the coroutine state + // machine is: + // * Start + // * Start->xresume->Active + // * Active->xsuspend->Suspended + // * Active->(fn returns)->Done + // * Suspended->xresume->Active + // + // Note that actions in the Active* states are taken from within the + // coroutine. All other actions act upon the coroutine from the + // outside. + + /// Returns the value the coroutine returned + pub fn xreturned(self: Self) Sig.ReturnT { + const storage = self._frame.getStorage(InnerStorage); + return storage.retval; + } + + fn wrapfn() void { + const storage = thread_state.currentStorage(InnerStorage); + storage.retval = @call( + .always_inline, + Sig.Func.Value, + storage.args, + ); + } + }; + } +}; + +/// Estimates the remaining stack size in the currently running coroutine +pub noinline fn remainingStackSize() usize { + var dummy: usize = 0; + dummy += 1; + const addr = @intFromPtr(&dummy); + + // Check if the stack was already overflowed + const current = xframe(); + StackOverflow.check(current) catch return 0; + + // Check if the stack is currently overflowed + const bottom = @intFromPtr(current.stack.ptr); + if (addr < bottom) { + return 0; + } + + // Debug check that we're actually in the stack + const top = @intFromPtr(current.stack.ptr + current.stack.len); + std.debug.assert(addr < top); // should never have popped beyond the top + + return addr - bottom; +} + +// ============================================================================ + +/// Thread-local coroutine runtime +threadlocal var thread_state: ThreadState = .{}; + +const ThreadState = struct { + root_coro: Coro = .{ + .func = undefined, + .stack = undefined, + .impl = undefined, + .id = CoroId.InvocationId.root(), + }, + current_coro: ?Frame = null, + next_coro_id: usize = 1, + suspend_block: ?SuspendBlock = null, + + const SuspendBlock = struct { + func: *const fn (?*anyopaque) void, + data: ?*anyopaque, + + fn run(self: SuspendBlock) void { + @call(.auto, self.func, .{self.data}); + } + }; + + /// Called from resume + fn switchIn(self: *ThreadState, target: Frame) void { + log.debug("coro resume {any} from {any}\n", .{ target.id, self.current().id }); + + // Switch to target, setting this coro as the resumer. + self.switchTo(target, true); + + // Suspend within target brings control back here + // If a suspend block has been set, pop and run it. + if (self.suspend_block) |block| { + self.suspend_block = null; + block.run(); + } + } + + /// Called from suspend + fn switchOut(self: *ThreadState, target: Frame) void { + log.debug("coro suspend {any} to {any}\n", .{ self.current().id, target.id }); + self.switchTo(target, false); + } + + fn switchTo(self: *ThreadState, target: Frame, set_resumer: bool) void { + const suspender = self.current(); + if (suspender == target) { + return; + } + if (suspender.status != .Done) { + suspender.status = .Suspended; + } + if (set_resumer) { + target.resumer = suspender; + } + target.status = .Active; + target.id.incr(); + self.current_coro = target; + target.impl.resumeFrom(&suspender.impl); + } + + fn newCoroId(self: *ThreadState) CoroId.InvocationId { + const out = CoroId.InvocationId.init(.{ + .coro = self.next_coro_id, + }); + self.next_coro_id += 1; + return out; + } + + fn current(self: *ThreadState) Frame { + return self.current_coro orelse &self.root_coro; + } + + fn inCoro(self: *ThreadState) bool { + return self.current() != &self.root_coro; + } + + /// Returns the storage of the currently running coroutine + fn currentStorage(self: *ThreadState, comptime T: type) *T { + return self.current_coro.?.getStorage(T); + } +}; + +const CoroId = struct { + coro: usize, + + pub const InvocationId = if (builtin.mode == .Debug) DebugInvocationId else DummyInvocationId; + + const DummyInvocationId = struct { + fn init(id: CoroId) @This() { + _ = id; + return .{}; + } + fn root() @This() { + return .{}; + } + fn incr(self: *@This()) void { + _ = self; + } + }; + + const DebugInvocationId = struct { + id: CoroId, + invocation: i64 = -1, + + fn init(id: CoroId) @This() { + return .{ .id = id }; + } + + fn root() @This() { + return .{ .id = .{ .coro = 0 } }; + } + + fn incr(self: *@This()) void { + self.invocation += 1; + } + + pub fn format(self: @This(), comptime fmt: []const u8, options: std.fmt.FormatOptions, writer: anytype) !void { + _ = fmt; + _ = options; + try writer.print("CoroId{{.cid={d}, .i={d}}}", .{ + self.id.coro, + self.invocation, + }); + } + }; +}; + +const StackOverflow = struct { + const magic_number: usize = 0x5E574D6D; + + fn check(coro: Frame) !void { + _ = coro; // autofix + // const stack = coro.stack.ptr; + // const sp = coro.impl.stack_pointer; + // const magic_number_ptr: *usize = @ptrCast(stack); + // if (magic_number_ptr.* != magic_number or // + // @intFromPtr(sp) < @intFromPtr(stack)) + // { + // return Error.StackOverflow; + // } + } + + fn setMagicNumber(stack_: stack.Stack) !void { + _ = stack_; // autofix + // if (stack.len <= @sizeOf(usize)) { + // return Error.StackTooSmall; + // } + // const magic_number_ptr: *usize = @ptrCast(stack.ptr); + // magic_number_ptr.* = magic_number; + } +}; + +var test_idx: usize = 0; +var test_steps = [_]usize{0} ** 8; + +fn testSetIdx(val: usize) void { + test_steps[test_idx] = val; + test_idx += 1; +} + +fn testFn() void { + std.debug.assert(remainingStackSize() > 2048); + testSetIdx(2); + xsuspend(); + testSetIdx(4); + xsuspend(); + testSetIdx(6); +} + +test "basic suspend and resume" { + // const allocator = std.testing.allocator; + + // const stack_size: usize = 1024 * 4; + // const stack_ = try stackAlloc(allocator, stack_size); + // defer allocator.free(stack); + + // testSetIdx(0); + // const test_coro = try Coro.init(testFn, stack, false, null); + + // testSetIdx(1); + // try std.testing.expectEqual(test_coro.status, .Start); + // xresume(test_coro); + // testSetIdx(3); + // try std.testing.expectEqual(test_coro.status, .Suspended); + // xresume(test_coro); + // try std.testing.expectEqual(test_coro.status, .Suspended); + // testSetIdx(5); + // xresume(test_coro); + // testSetIdx(7); + + // try std.testing.expectEqual(test_coro.status, .Done); + + // for (0..test_steps.len) |i| { + // try std.testing.expectEqual(i, test_steps[i]); + // } +} + +test "with values" { + // const Test = struct { + // const Storage = struct { + // x: *usize, + // }; + // fn coroInner(x: *usize) void { + // x.* += 1; + // xsuspend(); + // x.* += 3; + // } + // fn coroWrap() void { + // const storage = xframe().getStorage(Storage); + // const x = storage.x; + // coroInner(x); + // } + // }; + // var x: usize = 0; + // var storage = Test.Storage{ .x = &x }; + + // const allocator = std.testing.allocator; + // const stack = try stackAlloc(allocator, null); + // defer allocator.free(stack); + // const coro = try Coro.init(Test.coroWrap, stack, false, @ptrCast(&storage)); + + // try std.testing.expectEqual(storage.x.*, 0); + // xresume(coro); + // try std.testing.expectEqual(storage.x.*, 1); + // xresume(coro); + // try std.testing.expectEqual(storage.x.*, 4); +} + +fn testCoroFnImpl(x: *usize) usize { + x.* += 1; + xsuspend(); + x.* += 3; + xsuspend(); + return x.* + 10; +} + +test "with CoroT" { + // const allocator = std.testing.allocator; + // const stack = try stackAlloc(allocator, null); + // defer allocator.free(stack); + + // var x: usize = 0; + + // const CoroFn = CoroT.fromFunc(testCoroFnImpl, .{}); + // var coro = try CoroFn.init(.{&x}, stack, false); + + // try std.testing.expectEqual(x, 0); + // xresume(coro); + // try std.testing.expectEqual(x, 1); + // xresume(coro); + // try std.testing.expectEqual(x, 4); + // xresume(coro); + // try std.testing.expectEqual(x, 4); + // try std.testing.expectEqual(coro.status(), .Done); + // try std.testing.expectEqual(CoroFn.xreturned(coro), 14); + // comptime try std.testing.expectEqual(CoroFn.Signature.ReturnT(), usize); +} + +test "resume self" { + xresume(xframe()); + try std.testing.expectEqual(1, 1); +} diff --git a/async/coro_base.zig b/async/coro_base.zig new file mode 100644 index 0000000..7f77750 --- /dev/null +++ b/async/coro_base.zig @@ -0,0 +1,71 @@ +const std = @import("std"); +const stdx = @import("stdx"); +const builtin = @import("builtin"); +const Error = @import("coro.zig").Error; + +const ArchInfo = struct { + num_registers: usize, + jump_idx: usize, + assembly: []const u8, +}; + +const arch_info: ArchInfo = switch (builtin.cpu.arch) { + .aarch64 => .{ + .num_registers = 20, + .jump_idx = 19, + .assembly = @embedFile("asm/coro_aarch64.s"), + }, + .x86_64 => switch (builtin.os.tag) { + .windows => .{ + .num_registers = 32, + .jump_idx = 30, + .assembly = @embedFile("asm/coro_x86_64_windows.s"), + }, + else => .{ + .num_registers = 8, + .jump_idx = 6, + .assembly = @embedFile("asm/coro_x86_64.s"), + }, + }, + .riscv64 => .{ + .num_registers = 25, + .jump_idx = 24, + .assembly = @embedFile("asm/coro_riscv64.s"), + }, + else => @compileError("Unsupported cpu architecture"), +}; + +pub const stack_alignment = 16; + +extern fn libcoro_stack_swap(current: *Coro, target: *Coro) void; +comptime { + asm (arch_info.assembly); +} + +pub const Coro = packed struct { + stack_pointer: [*]u8, + + const Self = @This(); + const Func = *const fn ( + from: *Coro, + self: *Coro, + ) callconv(.C) noreturn; + + pub fn init(func: Func, stack: []align(stack_alignment) u8) !Self { + stdx.debug.assertComptime(@sizeOf(usize) == 8, "usize expected to take 8 bytes", .{}); + stdx.debug.assertComptime(@sizeOf(*Func) == 8, "function pointer expected to take 8 bytes", .{}); + + const register_bytes = arch_info.num_registers * 8; + if (register_bytes > stack.len) { + return Error.StackTooSmall; + } + const register_space = stack[stack.len - register_bytes ..]; + const jump_ptr: *Func = @ptrCast(@alignCast(®ister_space[arch_info.jump_idx * 8])); + jump_ptr.* = func; + return .{ .stack_pointer = register_space.ptr }; + } + + pub inline fn resumeFrom(self: *Self, from: *Self) void { + libcoro_stack_swap(from, self); + } +}; diff --git a/async/executor.zig b/async/executor.zig new file mode 100644 index 0000000..55a6f69 --- /dev/null +++ b/async/executor.zig @@ -0,0 +1,209 @@ +const std = @import("std"); +const stdx = @import("stdx"); +const libcoro = @import("coro.zig"); +const libcoro_options = @import("libcoro_options"); + +const log = std.log.scoped(.@"zml/async"); + +pub const Executor = struct { + const Self = @This(); + + pub const Func = struct { + const FuncFn = *const fn (userdata: ?*anyopaque) void; + + func: FuncFn, + userdata: ?*anyopaque = null, + next: ?*Func = null, + + pub fn init(func: FuncFn, userdata: ?*anyopaque) Func { + return .{ .func = func, .userdata = userdata }; + } + + fn run(self: Func) void { + @call(.auto, self.func, .{self.userdata}); + } + }; + + readyq: stdx.queue.SPSC(Func) = .{}, + + pub fn init() Self { + return .{}; + } + + pub fn runSoon(self: *Self, func: *Func) void { + self.readyq.push(func); + } + + pub fn runAllSoon(self: *Self, funcs: stdx.queue.SPSC(Func)) void { + self.readyq.pushAll(funcs); + } + + pub fn tick(self: *Self) bool { + // Reset readyq so that adds run on next tick. + var now = self.readyq; + self.readyq = .{}; + + log.debug("Executor.tick readyq_len={d}", .{now.len()}); + + var count: usize = 0; + while (now.pop()) |func| : (count += 1) { + func.run(); + } + + log.debug("Executor.tick done", .{}); + + return count > 0; + } +}; + +pub const Condition = struct { + const Waiter = struct { + coro: libcoro.Frame, + notified: bool = false, + + fn init() Waiter { + return .{ .coro = libcoro.xframe() }; + } + + fn func(self: *Waiter) Executor.Func { + return .{ .func = Waiter.cb, .userdata = self }; + } + + fn cb(ud: ?*anyopaque) void { + const self: *Waiter = @ptrCast(@alignCast(ud)); + libcoro.xresume(self.coro); + } + }; + + exec: *Executor, + waiters: stdx.queue.SPSC(Executor.Func) = .{}, + + pub fn init(exec: *Executor) Condition { + return .{ .exec = exec }; + } + + pub fn broadcast(self: *Condition) void { + var waiter_func = self.waiters.head; + while (waiter_func) |wf| : (waiter_func = wf.next) { + const waiter: *Waiter = @ptrCast(@alignCast(wf.userdata)); + waiter.notified = true; + } + self.exec.runAllSoon(self.waiters); + } + + pub fn signal(self: *Condition) void { + if (self.waiters.pop()) |waiter_func| { + const waiter: *Waiter = @ptrCast(@alignCast(waiter_func.userdata)); + waiter.notified = true; + self.exec.runSoon(waiter_func); + } + } + + pub fn wait(self: *Condition) void { + var waiter = Waiter.init(); + var cb = waiter.func(); + self.waiters.push(&cb); + while (waiter.notified == false) { + libcoro.xsuspend(); + } + } +}; + +pub const CoroResume = struct { + const Self = @This(); + + coro: libcoro.Frame, + + pub fn init() Self { + return .{ .coro = libcoro.xframe() }; + } + + pub fn func(self: *Self) Executor.Func { + return .{ .func = Self.cb, .userdata = self }; + } + + fn cb(ud: ?*anyopaque) void { + const self: *Self = @ptrCast(@alignCast(ud)); + libcoro.xresume(self.coro); + } +}; + +pub fn getExec(exec: ?*Executor) *Executor { + if (exec != null) return exec.?; + if (libcoro.getEnv().executor) |x| return x; + @panic("No explicit Executor passed and no default Executor available"); +} + +pub fn ArrayQueue(comptime T: type, comptime size: usize) type { + return struct { + const Self = @This(); + + vals: [size]T = undefined, + head: ?usize = null, + tail: ?usize = null, + + fn init() Self { + return .{}; + } + + fn len(self: Self) usize { + switch (self.state()) { + .empty => return 0, + .one => return 1, + .many => { + const head = self.head.?; + const tail = self.tail.?; + if (tail > head) { + return tail - head + 1; + } + return size - head + tail + 1; + }, + } + } + + fn space(self: Self) usize { + return size - self.len(); + } + + fn push(self: *@This(), val: T) !void { + if (self.space() < 1) return error.QueueFull; + switch (self.state()) { + .empty => { + self.head = 0; + self.tail = 0; + self.vals[0] = val; + }, + .one, .many => { + const tail = self.tail.?; + const new_tail = (tail + 1) % size; + self.vals[new_tail] = val; + self.tail = new_tail; + }, + } + } + + fn pop(self: *Self) ?T { + switch (self.state()) { + .empty => return null, + .one => { + const out = self.vals[self.head.?]; + self.head = null; + self.tail = null; + return out; + }, + .many => { + const out = self.vals[self.head.?]; + self.head = (self.head.? + 1) % size; + return out; + }, + } + } + + const State = enum { empty, one, many }; + inline fn state(self: Self) State { + if (self.head == null) return .empty; + if (self.head.? == self.tail.?) return .one; + return .many; + } + }; +} diff --git a/async/queue.zig b/async/queue.zig deleted file mode 100644 index aa3dfe5..0000000 --- a/async/queue.zig +++ /dev/null @@ -1,101 +0,0 @@ -const std = @import("std"); -const assert = std.debug.assert; - -/// An intrusive queue implementation. The type T must have a field -/// "next" of type `?*T`. -/// -/// For those unaware, an intrusive variant of a data structure is one in which -/// the data type in the list has the pointer to the next element, rather -/// than a higher level "node" or "container" type. The primary benefit -/// of this (and the reason we implement this) is that it defers all memory -/// management to the caller: the data structure implementation doesn't need -/// to allocate "nodes" to contain each element. Instead, the caller provides -/// the element and how its allocated is up to them. -pub fn Intrusive(comptime T: type) type { - return struct { - const Self = @This(); - - /// Head is the front of the queue and tail is the back of the queue. - head: ?*T = null, - tail: ?*T = null, - - /// Enqueue a new element to the back of the queue. - pub fn push(self: *Self, v: *T) void { - assert(v.next == null); - - if (self.tail) |tail| { - // If we have elements in the queue, then we add a new tail. - tail.next = v; - self.tail = v; - } else { - // No elements in the queue we setup the initial state. - self.head = v; - self.tail = v; - } - } - - /// Dequeue the next element from the queue. - pub fn pop(self: *Self) ?*T { - // The next element is in "head". - const next = self.head orelse return null; - - // If the head and tail are equal this is the last element - // so we also set tail to null so we can now be empty. - if (self.head == self.tail) self.tail = null; - - // Head is whatever is next (if we're the last element, - // this will be null); - self.head = next.next; - - // We set the "next" field to null so that this element - // can be inserted again. - next.next = null; - return next; - } - - /// Returns true if the queue is empty. - pub fn empty(self: *const Self) bool { - return self.head == null; - } - }; -} - -test Intrusive { - const testing = std.testing; - - // Types - const Elem = struct { - const Self = @This(); - next: ?*Self = null, - }; - const Queue = Intrusive(Elem); - var q: Queue = .{}; - try testing.expect(q.empty()); - - // Elems - var elems: [10]Elem = .{.{}} ** 10; - - // One - try testing.expect(q.pop() == null); - q.push(&elems[0]); - try testing.expect(!q.empty()); - try testing.expect(q.pop().? == &elems[0]); - try testing.expect(q.pop() == null); - try testing.expect(q.empty()); - - // Two - try testing.expect(q.pop() == null); - q.push(&elems[0]); - q.push(&elems[1]); - try testing.expect(q.pop().? == &elems[0]); - try testing.expect(q.pop().? == &elems[1]); - try testing.expect(q.pop() == null); - - // Interleaved - try testing.expect(q.pop() == null); - q.push(&elems[0]); - try testing.expect(q.pop().? == &elems[0]); - q.push(&elems[1]); - try testing.expect(q.pop().? == &elems[1]); - try testing.expect(q.pop() == null); -} diff --git a/async/queue_mpsc.zig b/async/queue_mpsc.zig deleted file mode 100644 index 68f4e8f..0000000 --- a/async/queue_mpsc.zig +++ /dev/null @@ -1,116 +0,0 @@ -const std = @import("std"); -const assert = std.debug.assert; - -/// An intrusive MPSC (multi-provider, single consumer) queue implementation. -/// The type T must have a field "next" of type `?*T`. -/// -/// This is an implementatin of a Vyukov Queue[1]. -/// TODO(mitchellh): I haven't audited yet if I got all the atomic operations -/// correct. I was short term more focused on getting something that seemed -/// to work; I need to make sure it actually works. -/// -/// For those unaware, an intrusive variant of a data structure is one in which -/// the data type in the list has the pointer to the next element, rather -/// than a higher level "node" or "container" type. The primary benefit -/// of this (and the reason we implement this) is that it defers all memory -/// management to the caller: the data structure implementation doesn't need -/// to allocate "nodes" to contain each element. Instead, the caller provides -/// the element and how its allocated is up to them. -/// -/// [1]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue -pub fn Intrusive(comptime T: type) type { - return struct { - const Self = @This(); - - /// Head is the front of the queue and tail is the back of the queue. - head: *T, - tail: *T, - stub: T, - - /// Initialize the queue. This requires a stable pointer to itself. - /// This must be called before the queue is used concurrently. - pub fn init(self: *Self) void { - self.head = &self.stub; - self.tail = &self.stub; - self.stub.next = null; - } - - /// Push an item onto the queue. This can be called by any number - /// of producers. - pub fn push(self: *Self, v: *T) void { - @atomicStore(?*T, &v.next, null, .unordered); - const prev = @atomicRmw(*T, &self.head, .Xchg, v, .acq_rel); - @atomicStore(?*T, &prev.next, v, .release); - } - - /// Pop the first in element from the queue. This must be called - /// by only a single consumer at any given time. - pub fn pop(self: *Self) ?*T { - var tail = @atomicLoad(*T, &self.tail, .unordered); - var next_ = @atomicLoad(?*T, &tail.next, .acquire); - if (tail == &self.stub) { - const next = next_ orelse return null; - @atomicStore(*T, &self.tail, next, .unordered); - tail = next; - next_ = @atomicLoad(?*T, &tail.next, .acquire); - } - - if (next_) |next| { - @atomicStore(*T, &self.tail, next, .release); - tail.next = null; - return tail; - } - - const head = @atomicLoad(*T, &self.head, .unordered); - if (tail != head) return null; - self.push(&self.stub); - - next_ = @atomicLoad(?*T, &tail.next, .acquire); - if (next_) |next| { - @atomicStore(*T, &self.tail, next, .unordered); - tail.next = null; - return tail; - } - - return null; - } - }; -} - -test Intrusive { - const testing = std.testing; - - // Types - const Elem = struct { - const Self = @This(); - next: ?*Self = null, - }; - const Queue = Intrusive(Elem); - var q: Queue = undefined; - q.init(); - - // Elems - var elems: [10]Elem = .{.{}} ** 10; - - // One - try testing.expect(q.pop() == null); - q.push(&elems[0]); - try testing.expect(q.pop().? == &elems[0]); - try testing.expect(q.pop() == null); - - // Two - try testing.expect(q.pop() == null); - q.push(&elems[0]); - q.push(&elems[1]); - try testing.expect(q.pop().? == &elems[0]); - try testing.expect(q.pop().? == &elems[1]); - try testing.expect(q.pop() == null); - - // // Interleaved - try testing.expect(q.pop() == null); - q.push(&elems[0]); - try testing.expect(q.pop().? == &elems[0]); - q.push(&elems[1]); - try testing.expect(q.pop().? == &elems[1]); - try testing.expect(q.pop() == null); -} diff --git a/async/stack.zig b/async/stack.zig new file mode 100644 index 0000000..00b75fe --- /dev/null +++ b/async/stack.zig @@ -0,0 +1,83 @@ +const std = @import("std"); +const coro_base = @import("coro_base.zig"); + +pub const stack_size = 1 * 1024 * 1024; + +pub const Stack = struct { + pub const Data = struct { + data: [stack_size]u8 align(coro_base.stack_alignment) = undefined, + + pub fn ptr(self: *const Data) [*]u8 { + return &self.data; + } + }; + + full: *Data, + used: []u8, + + pub fn init(full: *Data) Stack { + return .{ + .full = full, + .used = full.data[full.data.len..], + }; + } + + pub fn remaining(self: Stack) []align(coro_base.stack_alignment) u8 { + return self.full.data[0 .. self.full.data.len - self.used.len]; + } + + pub fn push(self: *Stack, comptime T: type) !*T { + const ptr_i = std.mem.alignBackward( + usize, + @intFromPtr(self.used.ptr - @sizeOf(T)), + coro_base.stack_alignment, + ); + if (ptr_i <= @intFromPtr(&self.full.data[0])) { + return error.StackTooSmall; + } + self.used = self.full.data[ptr_i - @intFromPtr(&self.full.data[0]) ..]; + return @ptrFromInt(ptr_i); + } +}; + +pub const PooledStackAllocator = struct { + const Pool = std.heap.MemoryPoolAligned(Stack.Data, coro_base.stack_alignment); + + pool: Pool, + + pub fn init(allocator: std.mem.Allocator) PooledStackAllocator { + return .{ .pool = Pool.init(allocator) }; + } + + pub fn deinit(self: *PooledStackAllocator) void { + self.pool.deinit(); + } + + pub fn create(self: *PooledStackAllocator) !Stack { + return Stack.init(try self.pool.create()); + } + + pub fn destroy(self: *PooledStackAllocator, stack: *Stack) void { + self.pool.destroy(stack.full); + } +}; + +pub const StackAllocator = struct { + allocator: std.mem.Allocator, + + pub fn init(allocator: std.mem.Allocator) StackAllocator { + return .{ .allocator = allocator }; + } + + pub fn deinit(self: *StackAllocator) void { + _ = self; // autofix + } + + pub fn create(self: *StackAllocator) !Stack { + return Stack.init(try self.allocator.create(Stack.Data)); + } + + pub fn destroy(self: *StackAllocator, stack: *Stack) void { + self.allocator.destroy(stack.full); + } +}; diff --git a/stdx/BUILD.bazel b/stdx/BUILD.bazel index 56b9683..1e4697a 100644 --- a/stdx/BUILD.bazel +++ b/stdx/BUILD.bazel @@ -7,6 +7,7 @@ zig_library( "io.zig", "math.zig", "meta.zig", + "queue.zig", "signature.zig", ], main = "stdx.zig", diff --git a/stdx/meta.zig b/stdx/meta.zig index 1efe7a7..fd3fd9a 100644 --- a/stdx/meta.zig +++ b/stdx/meta.zig @@ -4,6 +4,7 @@ const debug = @import("debug.zig"); const compileError = debug.compileError; pub const FnSignature = @import("signature.zig").FnSignature; +pub const Signature = @import("signature.zig").Signature; pub fn isStruct(comptime T: type) bool { return switch (@typeInfo(T)) { diff --git a/stdx/queue.zig b/stdx/queue.zig new file mode 100644 index 0000000..e4722af --- /dev/null +++ b/stdx/queue.zig @@ -0,0 +1,305 @@ +const std = @import("std"); +const assert = std.debug.assert; + +/// An intrusive queue implementation. The type T must have a field +/// "next" of type `?*T`. +/// +/// For those unaware, an intrusive variant of a data structure is one in which +/// the data type in the list has the pointer to the next element, rather +/// than a higher level "node" or "container" type. The primary benefit +/// of this (and the reason we implement this) is that it defers all memory +/// management to the caller: the data structure implementation doesn't need +/// to allocate "nodes" to contain each element. Instead, the caller provides +/// the element and how its allocated is up to them. +pub fn SPSC(comptime T: type) type { + return struct { + const Self = @This(); + + /// Head is the front of the queue and tail is the back of the queue. + head: ?*T = null, + tail: ?*T = null, + + /// Enqueue a new element to the back of the queue. + pub fn push(self: *Self, v: *T) void { + assert(v.next == null); + + if (self.tail) |tail| { + // If we have elements in the queue, then we add a new tail. + tail.next = v; + self.tail = v; + } else { + // No elements in the queue we setup the initial state. + self.head = v; + self.tail = v; + } + } + + pub fn pushAll(self: *Self, other: Self) void { + if (self.tail) |tail| { + tail.next = other.head; + } else { + self.head = other.head; + } + self.tail = other.tail; + } + + /// Dequeue the next element from the queue. + pub fn pop(self: *Self) ?*T { + // The next element is in "head". + const next = self.head orelse return null; + + // If the head and tail are equal this is the last element + // so we also set tail to null so we can now be empty. + if (self.head == self.tail) self.tail = null; + + // Head is whatever is next (if we're the last element, + // this will be null); + self.head = next.next; + + // We set the "next" field to null so that this element + // can be inserted again. + next.next = null; + return next; + } + + pub fn len(self: Self) usize { + var ret: usize = 0; + var current = self.head; + while (current) |elem| : (current = elem.next) { + ret += 1; + } + return ret; + } + + /// Returns true if the queue is empty. + pub fn empty(self: *const Self) bool { + return self.head == null; + } + }; +} + +test SPSC { + const testing = std.testing; + + // Types + const Elem = struct { + const Self = @This(); + next: ?*Self = null, + }; + const Queue = SPSC(Elem); + var q: Queue = .{}; + try testing.expect(q.empty()); + + // Elems + var elems: [10]Elem = .{.{}} ** 10; + + // One + try testing.expect(q.pop() == null); + q.push(&elems[0]); + try testing.expect(!q.empty()); + try testing.expect(q.pop().? == &elems[0]); + try testing.expect(q.pop() == null); + try testing.expect(q.empty()); + + // Two + try testing.expect(q.pop() == null); + q.push(&elems[0]); + q.push(&elems[1]); + try testing.expect(q.pop().? == &elems[0]); + try testing.expect(q.pop().? == &elems[1]); + try testing.expect(q.pop() == null); + + // Interleaved + try testing.expect(q.pop() == null); + q.push(&elems[0]); + try testing.expect(q.pop().? == &elems[0]); + q.push(&elems[1]); + try testing.expect(q.pop().? == &elems[1]); + try testing.expect(q.pop() == null); +} + +/// An intrusive MPSC (multi-provider, single consumer) queue implementation. +/// The type T must have a field "next" of type `?*T`. +/// +/// This is an implementatin of a Vyukov Queue[1]. +/// TODO(mitchellh): I haven't audited yet if I got all the atomic operations +/// correct. I was short term more focused on getting something that seemed +/// to work; I need to make sure it actually works. +/// +/// For those unaware, an intrusive variant of a data structure is one in which +/// the data type in the list has the pointer to the next element, rather +/// than a higher level "node" or "container" type. The primary benefit +/// of this (and the reason we implement this) is that it defers all memory +/// management to the caller: the data structure implementation doesn't need +/// to allocate "nodes" to contain each element. Instead, the caller provides +/// the element and how its allocated is up to them. +/// +/// [1]: https://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue +pub fn MPSC(comptime T: type) type { + return struct { + const Self = @This(); + + /// Head is the front of the queue and tail is the back of the queue. + head: *T, + tail: *T, + stub: T, + + /// Initialize the queue. This requires a stable pointer to itself. + /// This must be called before the queue is used concurrently. + pub fn init(self: *Self) void { + self.head = &self.stub; + self.tail = &self.stub; + self.stub.next = null; + } + + /// Push an item onto the queue. This can be called by any number + /// of producers. + pub fn push(self: *Self, v: *T) void { + @atomicStore(?*T, &v.next, null, .unordered); + const prev = @atomicRmw(*T, &self.head, .Xchg, v, .acq_rel); + @atomicStore(?*T, &prev.next, v, .release); + } + + /// Pop the first in element from the queue. This must be called + /// by only a single consumer at any given time. + pub fn pop(self: *Self) ?*T { + var tail = @atomicLoad(*T, &self.tail, .unordered); + var next_ = @atomicLoad(?*T, &tail.next, .acquire); + if (tail == &self.stub) { + const next = next_ orelse return null; + @atomicStore(*T, &self.tail, next, .unordered); + tail = next; + next_ = @atomicLoad(?*T, &tail.next, .acquire); + } + + if (next_) |next| { + @atomicStore(*T, &self.tail, next, .release); + tail.next = null; + return tail; + } + + const head = @atomicLoad(*T, &self.head, .unordered); + if (tail != head) return null; + self.push(&self.stub); + + next_ = @atomicLoad(?*T, &tail.next, .acquire); + if (next_) |next| { + @atomicStore(*T, &self.tail, next, .unordered); + tail.next = null; + return tail; + } + + return null; + } + }; +} + +test MPSC { + const testing = std.testing; + + // Types + const Elem = struct { + const Self = @This(); + next: ?*Self = null, + }; + const Queue = MPSC(Elem); + var q: Queue = undefined; + q.init(); + + // Elems + var elems: [10]Elem = .{.{}} ** 10; + + // One + try testing.expect(q.pop() == null); + q.push(&elems[0]); + try testing.expect(q.pop().? == &elems[0]); + try testing.expect(q.pop() == null); + + // Two + try testing.expect(q.pop() == null); + q.push(&elems[0]); + q.push(&elems[1]); + try testing.expect(q.pop().? == &elems[0]); + try testing.expect(q.pop().? == &elems[1]); + try testing.expect(q.pop() == null); + + // // Interleaved + try testing.expect(q.pop() == null); + q.push(&elems[0]); + try testing.expect(q.pop().? == &elems[0]); + q.push(&elems[1]); + try testing.expect(q.pop().? == &elems[1]); + try testing.expect(q.pop() == null); +} + +pub fn ArrayQueue(comptime T: type, comptime size: usize) type { + return struct { + const Self = @This(); + + vals: [size]T = undefined, + head: ?usize = null, + tail: ?usize = null, + + pub fn len(self: Self) usize { + switch (self.state()) { + .empty => return 0, + .one => return 1, + .many => { + const head = self.head.?; + const tail = self.tail.?; + if (tail > head) { + return tail - head + 1; + } + return size - head + tail + 1; + }, + } + } + + pub fn available(self: Self) usize { + return size - self.len(); + } + + pub fn push(self: *Self, val: T) !void { + if (self.len() == size) { + return error.QueueFull; + } + switch (self.state()) { + .empty => { + self.head = 0; + self.tail = 0; + self.vals[0] = val; + }, + .one, .many => { + const tail = self.tail.?; + const new_tail = (tail + 1) % size; + self.vals[new_tail] = val; + self.tail = new_tail; + }, + } + } + + pub fn pop(self: *Self) ?T { + switch (self.state()) { + .empty => return null, + .one => { + const out = self.vals[self.head.?]; + self.head = null; + self.tail = null; + return out; + }, + .many => { + const out = self.vals[self.head.?]; + self.head = (self.head.? + 1) % size; + return out; + }, + } + } + + const State = enum { empty, one, many }; + inline fn state(self: Self) State { + if (self.head == null) return .empty; + if (self.head.? == self.tail.?) return .one; + return .many; + } + }; +} diff --git a/stdx/signature.zig b/stdx/signature.zig index a385f22..8f2c787 100644 --- a/stdx/signature.zig +++ b/stdx/signature.zig @@ -48,6 +48,7 @@ pub fn ArgsTuple(comptime funcT: anytype, comptime ArgsT: ?type) type { } pub const Signature = struct { + Func: type, FuncT: type, ArgsT: type, ReturnT: type, @@ -59,6 +60,9 @@ pub fn FnSignature(comptime func: anytype, comptime argsT_: ?type) Signature { const argsT = ArgsTuple(@TypeOf(func), argsT_); const return_type = @TypeOf(@call(.auto, func, @as(argsT, undefined))); return Signature{ + .Func = struct { + pub const Value = func; + }, .FuncT = @TypeOf(func), .ArgsT = argsT, .ReturnT = return_type, diff --git a/stdx/stdx.zig b/stdx/stdx.zig index 820e7aa..4e94ce5 100644 --- a/stdx/stdx.zig +++ b/stdx/stdx.zig @@ -2,3 +2,4 @@ pub const debug = @import("debug.zig"); pub const io = @import("io.zig"); pub const math = @import("math.zig"); pub const meta = @import("meta.zig"); +pub const queue = @import("queue.zig");