183 lines
4.7 KiB
Zig
183 lines
4.7 KiB
Zig
const std = @import("std");
|
|
const stdx = @import("stdx");
|
|
const libcoro = @import("coro.zig");
|
|
const libcoro_options = @import("libcoro_options");
|
|
|
|
const log = std.log.scoped(.@"zml/async");
|
|
|
|
pub const Executor = struct {
|
|
const Self = @This();
|
|
|
|
pub const Func = struct {
|
|
const FuncFn = *const fn (userdata: ?*anyopaque) void;
|
|
|
|
func: FuncFn,
|
|
userdata: ?*anyopaque = null,
|
|
next: ?*Func = null,
|
|
|
|
pub fn init(func: FuncFn, userdata: ?*anyopaque) Func {
|
|
return .{ .func = func, .userdata = userdata };
|
|
}
|
|
|
|
fn run(self: Func) void {
|
|
@call(.auto, self.func, .{self.userdata});
|
|
}
|
|
};
|
|
|
|
readyq: stdx.queue.SPSC(Func) = .{},
|
|
|
|
pub fn init() Self {
|
|
return .{};
|
|
}
|
|
|
|
pub fn runSoon(self: *Self, func: *Func) void {
|
|
self.readyq.push(func);
|
|
}
|
|
|
|
pub fn runAllSoon(self: *Self, funcs: stdx.queue.SPSC(Func)) void {
|
|
self.readyq.pushAll(funcs);
|
|
}
|
|
|
|
pub fn tick(self: *Self) bool {
|
|
// Reset readyq so that adds run on next tick.
|
|
var now = self.readyq;
|
|
self.readyq = .{};
|
|
|
|
log.debug("Executor.tick readyq_len={d}", .{now.len()});
|
|
|
|
var count: usize = 0;
|
|
while (now.pop()) |func| : (count += 1) {
|
|
func.run();
|
|
}
|
|
|
|
log.debug("Executor.tick done", .{});
|
|
|
|
return count > 0;
|
|
}
|
|
};
|
|
|
|
pub const Condition = struct {
|
|
exec: *Executor,
|
|
waiters: stdx.queue.SPSC(Executor.Func) = .{},
|
|
|
|
pub fn init(exec: *Executor) Condition {
|
|
return .{ .exec = exec };
|
|
}
|
|
|
|
pub fn broadcast(self: *Condition) void {
|
|
self.exec.runAllSoon(self.waiters);
|
|
}
|
|
|
|
pub fn signal(self: *Condition) void {
|
|
if (self.waiters.pop()) |waiter_func| {
|
|
self.exec.runSoon(waiter_func);
|
|
}
|
|
}
|
|
|
|
pub fn wait(self: *Condition) void {
|
|
var cr = CoroResume.init();
|
|
var cb = cr.func();
|
|
self.waiters.push(&cb);
|
|
libcoro.xsuspend();
|
|
}
|
|
};
|
|
|
|
pub const CoroResume = struct {
|
|
const Self = @This();
|
|
|
|
coro: libcoro.Frame,
|
|
|
|
pub fn init() Self {
|
|
return .{ .coro = libcoro.xframe() };
|
|
}
|
|
|
|
pub fn func(self: *Self) Executor.Func {
|
|
return .{ .func = Self.cb, .userdata = self };
|
|
}
|
|
|
|
fn cb(ud: ?*anyopaque) void {
|
|
const self: *Self = @ptrCast(@alignCast(ud));
|
|
libcoro.xresume(self.coro);
|
|
}
|
|
};
|
|
|
|
pub fn getExec(exec: ?*Executor) *Executor {
|
|
if (exec != null) return exec.?;
|
|
if (libcoro.getEnv().executor) |x| return x;
|
|
@panic("No explicit Executor passed and no default Executor available");
|
|
}
|
|
|
|
pub fn ArrayQueue(comptime T: type, comptime size: usize) type {
|
|
return struct {
|
|
const Self = @This();
|
|
|
|
vals: [size]T = undefined,
|
|
head: ?usize = null,
|
|
tail: ?usize = null,
|
|
|
|
fn init() Self {
|
|
return .{};
|
|
}
|
|
|
|
fn len(self: Self) usize {
|
|
switch (self.state()) {
|
|
.empty => return 0,
|
|
.one => return 1,
|
|
.many => {
|
|
const head = self.head.?;
|
|
const tail = self.tail.?;
|
|
if (tail > head) {
|
|
return tail - head + 1;
|
|
}
|
|
return size - head + tail + 1;
|
|
},
|
|
}
|
|
}
|
|
|
|
fn space(self: Self) usize {
|
|
return size - self.len();
|
|
}
|
|
|
|
fn push(self: *@This(), val: T) !void {
|
|
if (self.space() < 1) return error.QueueFull;
|
|
switch (self.state()) {
|
|
.empty => {
|
|
self.head = 0;
|
|
self.tail = 0;
|
|
self.vals[0] = val;
|
|
},
|
|
.one, .many => {
|
|
const tail = self.tail.?;
|
|
const new_tail = (tail + 1) % size;
|
|
self.vals[new_tail] = val;
|
|
self.tail = new_tail;
|
|
},
|
|
}
|
|
}
|
|
|
|
fn pop(self: *Self) ?T {
|
|
switch (self.state()) {
|
|
.empty => return null,
|
|
.one => {
|
|
const out = self.vals[self.head.?];
|
|
self.head = null;
|
|
self.tail = null;
|
|
return out;
|
|
},
|
|
.many => {
|
|
const out = self.vals[self.head.?];
|
|
self.head = (self.head.? + 1) % size;
|
|
return out;
|
|
},
|
|
}
|
|
}
|
|
|
|
const State = enum { empty, one, many };
|
|
inline fn state(self: Self) State {
|
|
if (self.head == null) return .empty;
|
|
if (self.head.? == self.tail.?) return .one;
|
|
return .many;
|
|
}
|
|
};
|
|
}
|