From 754656f2f0b5fa3a9047b0c5c7fda3377dc11b9f Mon Sep 17 00:00:00 2001 From: Tarry Singh Date: Thu, 14 Mar 2024 11:43:33 +0000 Subject: [PATCH] Replace real mutex with async Mutex for logFn, add fallback logger support outside coroutines, and fix ResetCondition handling. --- async/async.zig | 65 ++++++++++++++++++++++++++++++---------------- async/executor.zig | 33 +++-------------------- 2 files changed, 46 insertions(+), 52 deletions(-) diff --git a/async/async.zig b/async/async.zig index 2222c48..79026da 100644 --- a/async/async.zig +++ b/async/async.zig @@ -145,7 +145,9 @@ pub const threading = struct { .waiting = &waiter, }; if (self.waiter.cmpxchgStrong(&State.unset_state, &new_state, .monotonic, .monotonic) == null) { - coro.xsuspend(); + while (self.isSet() == false) { + coro.xsuspend(); + } } } }; @@ -503,16 +505,16 @@ pub fn channel(comptime T: type, len: usize, comptime capacity: usize) Channel(T } pub const Mutex = struct { - const VoidChannel = coro.Channel(void, 1); + const VoidChannel = Channel(void, 1); inner: VoidChannel, pub fn init() Mutex { - return .{ .inner = VoidChannel.init(&AsyncThread.current.executor.exec) }; + return .{ .inner = VoidChannel.init() }; } - pub fn lock(self: *Mutex) !void { - try self.inner.send({}); + pub fn lock(self: *Mutex) void { + self.inner.send({}); } pub fn unlock(self: *Mutex) void { @@ -520,26 +522,45 @@ pub const Mutex = struct { } }; -pub fn logFn( +pub const LogFn = fn ( comptime message_level: std.log.Level, - comptime scope: @Type(.EnumLiteral), + comptime scope: @TypeOf(.enum_literal), comptime format: []const u8, args: anytype, -) void { - if (coro.inCoro() == false) { - return std.log.defaultLog(message_level, scope, format, args); - } +) void; - const level_txt = comptime message_level.asText(); - const prefix2 = if (scope == .default) ": " else "(" ++ @tagName(scope) ++ "): "; - const stderr = getStdErr().writer(); - var bw = std.io.bufferedWriter(stderr); - const writer = bw.writer(); +pub fn logFn(comptime fallbackLogFn: LogFn) LogFn { + return struct { + const Self = @This(); - std.debug.lockStdErr(); - defer std.debug.unlockStdErr(); - nosuspend { - writer.print(level_txt ++ prefix2 ++ format ++ "\n", args) catch return; - bw.flush() catch return; - } + var mu: ?Mutex = null; + + pub fn call( + comptime message_level: std.log.Level, + comptime scope: @TypeOf(.enum_literal), + comptime format: []const u8, + args: anytype, + ) void { + if (coro.inCoro() == false) { + return fallbackLogFn(message_level, scope, format, args); + } + + const level_txt = comptime message_level.asText(); + const prefix2 = if (scope == .default) ": " else "(" ++ @tagName(scope) ++ "): "; + const stderr = getStdErr().writer(); + var bw = std.io.bufferedWriter(stderr); + const writer = bw.writer(); + + var mutex = Self.mu orelse blk: { + Self.mu = Mutex.init(); + break :blk Self.mu.?; + }; + mutex.lock(); + defer mutex.unlock(); + nosuspend { + writer.print(level_txt ++ prefix2 ++ format ++ "\n", args) catch return; + bw.flush() catch return; + } + } + }.call; } diff --git a/async/executor.zig b/async/executor.zig index 55a6f69..9739681 100644 --- a/async/executor.zig +++ b/async/executor.zig @@ -57,24 +57,6 @@ pub const Executor = struct { }; pub const Condition = struct { - const Waiter = struct { - coro: libcoro.Frame, - notified: bool = false, - - fn init() Waiter { - return .{ .coro = libcoro.xframe() }; - } - - fn func(self: *Waiter) Executor.Func { - return .{ .func = Waiter.cb, .userdata = self }; - } - - fn cb(ud: ?*anyopaque) void { - const self: *Waiter = @ptrCast(@alignCast(ud)); - libcoro.xresume(self.coro); - } - }; - exec: *Executor, waiters: stdx.queue.SPSC(Executor.Func) = .{}, @@ -83,29 +65,20 @@ pub const Condition = struct { } pub fn broadcast(self: *Condition) void { - var waiter_func = self.waiters.head; - while (waiter_func) |wf| : (waiter_func = wf.next) { - const waiter: *Waiter = @ptrCast(@alignCast(wf.userdata)); - waiter.notified = true; - } self.exec.runAllSoon(self.waiters); } pub fn signal(self: *Condition) void { if (self.waiters.pop()) |waiter_func| { - const waiter: *Waiter = @ptrCast(@alignCast(waiter_func.userdata)); - waiter.notified = true; self.exec.runSoon(waiter_func); } } pub fn wait(self: *Condition) void { - var waiter = Waiter.init(); - var cb = waiter.func(); + var cr = CoroResume.init(); + var cb = cr.func(); self.waiters.push(&cb); - while (waiter.notified == false) { - libcoro.xsuspend(); - } + libcoro.xsuspend(); } };