Replace real mutex with async Mutex for logFn, add fallback logger support outside coroutines, and fix ResetCondition handling.

This commit is contained in:
Tarry Singh 2024-03-14 11:43:33 +00:00
parent 980f1b17fb
commit 754656f2f0
2 changed files with 46 additions and 52 deletions

View File

@ -145,9 +145,11 @@ pub const threading = struct {
.waiting = &waiter, .waiting = &waiter,
}; };
if (self.waiter.cmpxchgStrong(&State.unset_state, &new_state, .monotonic, .monotonic) == null) { if (self.waiter.cmpxchgStrong(&State.unset_state, &new_state, .monotonic, .monotonic) == null) {
while (self.isSet() == false) {
coro.xsuspend(); coro.xsuspend();
} }
} }
}
}; };
}; };
@ -503,16 +505,16 @@ pub fn channel(comptime T: type, len: usize, comptime capacity: usize) Channel(T
} }
pub const Mutex = struct { pub const Mutex = struct {
const VoidChannel = coro.Channel(void, 1); const VoidChannel = Channel(void, 1);
inner: VoidChannel, inner: VoidChannel,
pub fn init() Mutex { pub fn init() Mutex {
return .{ .inner = VoidChannel.init(&AsyncThread.current.executor.exec) }; return .{ .inner = VoidChannel.init() };
} }
pub fn lock(self: *Mutex) !void { pub fn lock(self: *Mutex) void {
try self.inner.send({}); self.inner.send({});
} }
pub fn unlock(self: *Mutex) void { pub fn unlock(self: *Mutex) void {
@ -520,14 +522,27 @@ pub const Mutex = struct {
} }
}; };
pub fn logFn( pub const LogFn = fn (
comptime message_level: std.log.Level, comptime message_level: std.log.Level,
comptime scope: @Type(.EnumLiteral), comptime scope: @TypeOf(.enum_literal),
comptime format: []const u8, comptime format: []const u8,
args: anytype, args: anytype,
) void { ) void;
pub fn logFn(comptime fallbackLogFn: LogFn) LogFn {
return struct {
const Self = @This();
var mu: ?Mutex = null;
pub fn call(
comptime message_level: std.log.Level,
comptime scope: @TypeOf(.enum_literal),
comptime format: []const u8,
args: anytype,
) void {
if (coro.inCoro() == false) { if (coro.inCoro() == false) {
return std.log.defaultLog(message_level, scope, format, args); return fallbackLogFn(message_level, scope, format, args);
} }
const level_txt = comptime message_level.asText(); const level_txt = comptime message_level.asText();
@ -536,10 +551,16 @@ pub fn logFn(
var bw = std.io.bufferedWriter(stderr); var bw = std.io.bufferedWriter(stderr);
const writer = bw.writer(); const writer = bw.writer();
std.debug.lockStdErr(); var mutex = Self.mu orelse blk: {
defer std.debug.unlockStdErr(); Self.mu = Mutex.init();
break :blk Self.mu.?;
};
mutex.lock();
defer mutex.unlock();
nosuspend { nosuspend {
writer.print(level_txt ++ prefix2 ++ format ++ "\n", args) catch return; writer.print(level_txt ++ prefix2 ++ format ++ "\n", args) catch return;
bw.flush() catch return; bw.flush() catch return;
} }
}
}.call;
} }

View File

@ -57,24 +57,6 @@ pub const Executor = struct {
}; };
pub const Condition = struct { pub const Condition = struct {
const Waiter = struct {
coro: libcoro.Frame,
notified: bool = false,
fn init() Waiter {
return .{ .coro = libcoro.xframe() };
}
fn func(self: *Waiter) Executor.Func {
return .{ .func = Waiter.cb, .userdata = self };
}
fn cb(ud: ?*anyopaque) void {
const self: *Waiter = @ptrCast(@alignCast(ud));
libcoro.xresume(self.coro);
}
};
exec: *Executor, exec: *Executor,
waiters: stdx.queue.SPSC(Executor.Func) = .{}, waiters: stdx.queue.SPSC(Executor.Func) = .{},
@ -83,30 +65,21 @@ pub const Condition = struct {
} }
pub fn broadcast(self: *Condition) void { pub fn broadcast(self: *Condition) void {
var waiter_func = self.waiters.head;
while (waiter_func) |wf| : (waiter_func = wf.next) {
const waiter: *Waiter = @ptrCast(@alignCast(wf.userdata));
waiter.notified = true;
}
self.exec.runAllSoon(self.waiters); self.exec.runAllSoon(self.waiters);
} }
pub fn signal(self: *Condition) void { pub fn signal(self: *Condition) void {
if (self.waiters.pop()) |waiter_func| { if (self.waiters.pop()) |waiter_func| {
const waiter: *Waiter = @ptrCast(@alignCast(waiter_func.userdata));
waiter.notified = true;
self.exec.runSoon(waiter_func); self.exec.runSoon(waiter_func);
} }
} }
pub fn wait(self: *Condition) void { pub fn wait(self: *Condition) void {
var waiter = Waiter.init(); var cr = CoroResume.init();
var cb = waiter.func(); var cb = cr.func();
self.waiters.push(&cb); self.waiters.push(&cb);
while (waiter.notified == false) {
libcoro.xsuspend(); libcoro.xsuspend();
} }
}
}; };
pub const CoroResume = struct { pub const CoroResume = struct {