diff --git a/MODULE.bazel b/MODULE.bazel index bd10a25..52b88e8 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -84,7 +84,7 @@ use_repo(zls, "zls_aarch64-macos", "zls_x86_64-linux") register_toolchains("//third_party/zls:all") -bazel_dep(name = "libxev", version = "20241208.0-db6a52b") +bazel_dep(name = "libxev", version = "20241208.1-db6a52b") bazel_dep(name = "llvm-raw", version = "20250102.0-f739aa4") llvm = use_extension("@llvm-raw//utils/bazel:extension.bzl", "llvm") diff --git a/async/async.zig b/async/async.zig index 20effdb..4e795ce 100644 --- a/async/async.zig +++ b/async/async.zig @@ -48,8 +48,8 @@ fn FrameExx(comptime func: anytype, comptime argsT: type, comptime returnT: type pub const await_ = awaitt; pub fn awaitt(self: *Self) returnT { defer { - AsyncThread.current.stack_allocator.destroy(&self.inner._frame.stack); self.inner.deinit(); + AsyncThread.current.stack_allocator.destroy(&self.inner._frame.stack); self.* = undefined; } return coro.xawait(self.inner); @@ -145,16 +145,14 @@ pub const threading = struct { .waiting = &waiter, }; if (self.waiter.cmpxchgStrong(&State.unset_state, &new_state, .monotonic, .monotonic) == null) { - while (self.isSet() == false) { - coro.xsuspend(); - } + coro.xsuspend(); } } }; }; pub const AsyncThread = struct { - threadlocal var current: *const AsyncThread = undefined; + threadlocal var current: *AsyncThread = undefined; executor: *aio.Executor, stack_allocator: *stack.StackAllocator, @@ -167,8 +165,8 @@ pub const AsyncThread = struct { self.async_notifier.notify() catch {}; } - fn waker_cb(q: ?*threading.WaiterQueue, _: *xev.Loop, _: *xev.Completion, _: xev.Async.WaitError!void) xev.CallbackAction { - while (q.?.pop()) |waiter| { + fn wakerCallback(self: ?*AsyncThread, _: *xev.Loop, _: *xev.Completion, _: xev.Async.WaitError!void) xev.CallbackAction { + while (self.?.waiters_queue.pop()) |waiter| { coro.xresume(waiter.frame); } return .rearm; @@ -194,13 +192,10 @@ pub const AsyncThread = struct { var waiters_queue: threading.WaiterQueue = undefined; waiters_queue.init(); - var c: xev.Completion = undefined; - async_notifier.wait(&loop, &c, threading.WaiterQueue, &waiters_queue, &waker_cb); - var stack_allocator = stack.StackAllocator.init(allocator); defer stack_allocator.deinit(); - AsyncThread.current = &.{ + var asyncThread: AsyncThread = .{ .executor = &executor_, .stack_allocator = &stack_allocator, .loop = &loop, @@ -208,6 +203,10 @@ pub const AsyncThread = struct { .async_notifier = &async_notifier, .waiters_queue = &waiters_queue, }; + AsyncThread.current = &asyncThread; + + var c2: xev.Completion = undefined; + async_notifier.wait(AsyncThread.current.loop, &c2, AsyncThread, AsyncThread.current, &AsyncThread.wakerCallback); // allocate the main coroutine stack, on the current thread's stack! var mainStackData: stack.Stack.Data = undefined; @@ -527,6 +526,10 @@ pub fn logFn( comptime format: []const u8, args: anytype, ) void { + if (coro.inCoro() == false) { + return std.log.defaultLog(message_level, scope, format, args); + } + const level_txt = comptime message_level.asText(); const prefix2 = if (scope == .default) ": " else "(" ++ @tagName(scope) ++ "): "; const stderr = getStdErr().writer(); diff --git a/async/asyncio.zig b/async/asyncio.zig index d83fb6b..ab0fa86 100644 --- a/async/asyncio.zig +++ b/async/asyncio.zig @@ -52,7 +52,7 @@ pub fn run( args: anytype, stack: anytype, ) !stdx.meta.FnSignature(func, @TypeOf(args)).ReturnPayloadT { - stdx.debug.assert(libcoro.inCoro() == false, "Not in a corouine", .{}); + stdx.debug.assert(libcoro.inCoro() == false, "Not in a coroutine", .{}); const frame = try xasync(func, args, stack); defer frame.deinit(); try runCoro(exec, frame); @@ -88,20 +88,20 @@ pub fn sleep(exec: *Executor, ms: u64) !void { return data.result; } -fn waitForCompletionOutsideCoro(exec: *Executor, c: *xev.Completion) !void { +pub fn waitForCompletionOutsideCoro(exec: *Executor, c: *xev.Completion) !void { @setCold(true); while (c.state() != .dead) { try exec.tick(); } } -fn waitForCompletionInCoro(c: *xev.Completion) void { +pub fn waitForCompletionInCoro(c: *xev.Completion) void { while (c.state() != .dead) { libcoro.xsuspend(); } } -fn waitForCompletion(exec: *Executor, c: *xev.Completion) !void { +pub fn waitForCompletion(exec: *Executor, c: *xev.Completion) !void { if (libcoro.inCoro()) { waitForCompletionInCoro(c); } else { diff --git a/async/coro.zig b/async/coro.zig index c77405d..93d612e 100644 --- a/async/coro.zig +++ b/async/coro.zig @@ -76,7 +76,7 @@ pub fn xresume(frame: anytype) void { /// Must be called from within a coroutine (i.e. not the top level). pub fn xsuspend() void { xsuspendSafe() catch |e| { - log.err("{any}\n", .{e}); + log.err("{any}", .{e}); @panic("xsuspend"); }; } @@ -160,10 +160,10 @@ const Coro = struct { fn runcoro(from: *base.Coro, this: *base.Coro) callconv(.C) noreturn { const from_coro: *Coro = @fieldParentPtr("impl", from); const this_coro: *Coro = @fieldParentPtr("impl", this); - log.debug("coro start {any}\n", .{this_coro.id}); + log.debug("coro start {any}", .{this_coro.id}); @call(.auto, this_coro.func, .{}); this_coro.status = .Done; - log.debug("coro done {any}\n", .{this_coro.id}); + log.debug("coro done {any}", .{this_coro.id}); thread_state.switchOut(from_coro); // Never returns @@ -258,7 +258,7 @@ const CoroT = struct { fn wrapfn() void { const storage = thread_state.currentStorage(InnerStorage); storage.retval = @call( - .always_inline, + .auto, Sig.Func.Value, storage.args, ); @@ -317,7 +317,7 @@ const ThreadState = struct { /// Called from resume fn switchIn(self: *ThreadState, target: Frame) void { - log.debug("coro resume {any} from {any}\n", .{ target.id, self.current().id }); + log.debug("coro resume {any} from {any}", .{ target.id, self.current().id }); // Switch to target, setting this coro as the resumer. self.switchTo(target, true); @@ -332,7 +332,7 @@ const ThreadState = struct { /// Called from suspend fn switchOut(self: *ThreadState, target: Frame) void { - log.debug("coro suspend {any} to {any}\n", .{ self.current().id, target.id }); + log.debug("coro suspend {any} to {any}", .{ self.current().id, target.id }); self.switchTo(target, false); } diff --git a/runtimes/cpu/BUILD.bazel b/runtimes/cpu/BUILD.bazel index 8095f82..07d5858 100644 --- a/runtimes/cpu/BUILD.bazel +++ b/runtimes/cpu/BUILD.bazel @@ -21,7 +21,10 @@ zig_library( deps = [ "//pjrt", ] + select({ - "//runtimes:cpu.enabled": [":libpjrt_cpu"], + "//runtimes:cpu.enabled": [ + ":libpjrt_cpu", + "//async", + ], "//conditions:default": [":empty"], }), ) diff --git a/runtimes/cpu/cpu.zig b/runtimes/cpu/cpu.zig index e8531ae..2270398 100644 --- a/runtimes/cpu/cpu.zig +++ b/runtimes/cpu/cpu.zig @@ -1,6 +1,8 @@ const builtin = @import("builtin"); -const pjrt = @import("pjrt"); + +const asynk = @import("async"); const c = @import("c"); +const pjrt = @import("pjrt"); pub fn isEnabled() bool { return @hasDecl(c, "ZML_RUNTIME_CPU"); @@ -16,5 +18,5 @@ pub fn load() !*const pjrt.Api { .macos, .ios, .watchos => ".dylib", else => ".so", }; - return try pjrt.Api.loadFrom("libpjrt_cpu" ++ ext); + return try asynk.callBlocking(pjrt.Api.loadFrom, .{"libpjrt_cpu" ++ ext}); } diff --git a/runtimes/cuda/cuda.zig b/runtimes/cuda/cuda.zig index 7fb7b1d..4c2bc80 100644 --- a/runtimes/cuda/cuda.zig +++ b/runtimes/cuda/cuda.zig @@ -1,8 +1,8 @@ +const builtin = @import("builtin"); const std = @import("std"); const asynk = @import("async"); const bazel_builtin = @import("bazel_builtin"); -const builtin = @import("builtin"); const c = @import("c"); const pjrt = @import("pjrt"); const runfiles = @import("runfiles"); @@ -49,5 +49,5 @@ pub fn load() !*const pjrt.Api { // See https://github.com/openxla/xla/issues/21428 try setupXlaGpuCudaDirFlag(); - return try pjrt.Api.loadFrom("libpjrt_cuda.so"); + return try asynk.callBlocking(pjrt.Api.loadFrom, .{"libpjrt_cuda.so"}); } diff --git a/runtimes/neuron/neuron.zig b/runtimes/neuron/neuron.zig index 4b7e141..9fdbcc4 100644 --- a/runtimes/neuron/neuron.zig +++ b/runtimes/neuron/neuron.zig @@ -1,11 +1,12 @@ const builtin = @import("builtin"); -const asynk = @import("async"); -const pjrt = @import("pjrt"); -const c = @import("c"); const std = @import("std"); -const runfiles = @import("runfiles"); + +const asynk = @import("async"); const bazel_builtin = @import("bazel_builtin"); +const c = @import("c"); const libneuronxla_pyenv = @import("libneuronxla_pyenv"); +const pjrt = @import("pjrt"); +const runfiles = @import("runfiles"); pub fn isEnabled() bool { return @hasDecl(c, "ZML_RUNTIME_NEURON"); @@ -136,5 +137,5 @@ pub fn load() !*const pjrt.Api { setNeuronCCFlags(); try initialize(); - return try pjrt.Api.loadFrom("libpjrt_neuron.so"); + return try asynk.callBlocking(pjrt.Api.loadFrom, .{"libpjrt_neuron.so"}); } diff --git a/runtimes/rocm/rocm.zig b/runtimes/rocm/rocm.zig index 1071078..d0bb66c 100644 --- a/runtimes/rocm/rocm.zig +++ b/runtimes/rocm/rocm.zig @@ -70,5 +70,5 @@ pub fn load() !*const pjrt.Api { try setupRocmEnv(); - return try pjrt.Api.loadFrom("libpjrt_rocm.so"); + return try asynk.callBlocking(pjrt.Api.loadFrom, .{"libpjrt_rocm.so"}); } diff --git a/runtimes/tpu/tpu.zig b/runtimes/tpu/tpu.zig index 1bca0b2..980f421 100644 --- a/runtimes/tpu/tpu.zig +++ b/runtimes/tpu/tpu.zig @@ -1,8 +1,9 @@ const builtin = @import("builtin"); +const std = @import("std"); + const asynk = @import("async"); const pjrt = @import("pjrt"); const c = @import("c"); -const std = @import("std"); pub fn isEnabled() bool { return @hasDecl(c, "ZML_RUNTIME_TPU"); @@ -36,5 +37,5 @@ pub fn load() !*const pjrt.Api { return error.Unavailable; } - return try pjrt.Api.loadFrom("libpjrt_tpu.so"); + return try asynk.callBlocking(pjrt.Api.loadFrom, .{"libpjrt_tpu.so"}); } diff --git a/third_party/modules/libxev/20241208.0-db6a52b/patches/128.patch b/third_party/modules/libxev/20241208.0-db6a52b/patches/128.patch index a869595..b6b2ad0 100644 --- a/third_party/modules/libxev/20241208.0-db6a52b/patches/128.patch +++ b/third_party/modules/libxev/20241208.0-db6a52b/patches/128.patch @@ -1,7 +1,100 @@ -From 0d1c2f8258072148459d3114b9ccaf43c02e0958 Mon Sep 17 00:00:00 2001 +From 12fb50bdd4b62c8e111f12eecb171257b6af6038 Mon Sep 17 00:00:00 2001 +From: Mitchell Hashimoto +Date: Mon, 4 Nov 2024 13:30:10 -0800 +Subject: [PATCH 1/8] Revert "Fix potential ThreadPool UAF (#113)" + +This reverts commit dbe22910a43e9e8ec9948d3cbd73d8488a074967, reversing +changes made to 43c7e4b3308f359e5b758db2d824d7c447f4ed3f. +--- + src/ThreadPool.zig | 8 ++++++-- + 1 file changed, 6 insertions(+), 2 deletions(-) + +diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig +index 9e0ff02..1d0123b 100644 +--- a/src/ThreadPool.zig ++++ b/src/ThreadPool.zig +@@ -335,8 +335,12 @@ fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void { + + fn join(self: *ThreadPool) void { + // Wait for the thread pool to be shutdown() then for all threads to enter a joinable state +- self.join_event.wait(); +- const sync: Sync = @bitCast(self.sync.load(.monotonic)); ++ var sync: Sync = @bitCast(self.sync.load(.monotonic)); ++ if (!(sync.state == .shutdown and sync.spawned == 0)) { ++ self.join_event.wait(); ++ sync = @bitCast(self.sync.load(.monotonic)); ++ } ++ + assert(sync.state == .shutdown); + assert(sync.spawned == 0); + + +From f6dea8bb85593cca6d4a54d92d4b05fe15b1eb41 Mon Sep 17 00:00:00 2001 +From: Corentin Godeau +Date: Tue, 10 Sep 2024 11:23:28 +0200 +Subject: [PATCH 2/8] fix: avoid overflow in kqueue backend + +When there is more than 256 events, the `events` slice could overflow +when processing completions. +--- + src/backend/kqueue.zig | 3 +++ + 1 file changed, 3 insertions(+) + +diff --git a/src/backend/kqueue.zig b/src/backend/kqueue.zig +index 834f916..456f64f 100644 +--- a/src/backend/kqueue.zig ++++ b/src/backend/kqueue.zig +@@ -437,6 +437,9 @@ pub const Loop = struct { + // Only resubmit if we aren't already active (in the queue) + .rearm => if (!c_active) self.submissions.push(c), + } ++ ++ // If we filled the events slice, we break to avoid overflow. ++ if (changes == events.len) break; + } + + // Determine our next timeout based on the timers + +From 890e2711ff9eb82bbceda2e6229b9c9a8d0b60da Mon Sep 17 00:00:00 2001 +From: David Rubin +Date: Fri, 6 Dec 2024 20:43:52 -0800 +Subject: [PATCH 3/8] remove race from threadpool + +--- + src/ThreadPool.zig | 8 +++----- + 1 file changed, 3 insertions(+), 5 deletions(-) + +diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig +index 1d0123b..aec09d7 100644 +--- a/src/ThreadPool.zig ++++ b/src/ThreadPool.zig +@@ -291,6 +291,7 @@ pub noinline fn shutdown(self: *ThreadPool) void { + // Wake up any threads sleeping on the idle_event. + // TODO: I/O polling notification here. + if (sync.idle > 0) self.idle_event.shutdown(); ++ if (sync.spawned == 0) self.join_event.notify(); + return; + }); + } +@@ -335,11 +336,8 @@ fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void { + + fn join(self: *ThreadPool) void { + // Wait for the thread pool to be shutdown() then for all threads to enter a joinable state +- var sync: Sync = @bitCast(self.sync.load(.monotonic)); +- if (!(sync.state == .shutdown and sync.spawned == 0)) { +- self.join_event.wait(); +- sync = @bitCast(self.sync.load(.monotonic)); +- } ++ self.join_event.wait(); ++ const sync: Sync = @bitCast(self.sync.load(.monotonic)); + + assert(sync.state == .shutdown); + assert(sync.spawned == 0); + +From d82ee736a1548e6564a296315f33902ae09aa298 Mon Sep 17 00:00:00 2001 From: Steeve Morin Date: Tue, 19 Nov 2024 16:14:14 +0100 -Subject: [PATCH 1/2] backend/epoll: implement eventfd wakeup notification +Subject: [PATCH 4/8] backend/epoll: implement eventfd wakeup notification Tries to mimic what happens in backend/kqueue. @@ -118,10 +211,10 @@ index ae4ec7d..f44d326 100644 task_result: Result = undefined, -From 38d4dbed71a732b0fc30c1181354ad9d53919402 Mon Sep 17 00:00:00 2001 +From 320f1ae9ccc95ce814e246b35ca0f9fd2361f43e Mon Sep 17 00:00:00 2001 From: Corentin Godeau Date: Tue, 14 Jan 2025 14:43:54 +0000 -Subject: [PATCH 2/2] backend/epoll: read the wakeup eventfd to avoid being +Subject: [PATCH 5/8] backend/epoll: read the wakeup eventfd to avoid being awaken again --- @@ -157,3 +250,192 @@ index f44d326..f84c687 100644 const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr))); + +From 258a64180c901b5e1f9a2c75f1ac77a758411450 Mon Sep 17 00:00:00 2001 +From: Steeve Morin +Date: Fri, 17 Jan 2025 20:47:42 +0000 +Subject: [PATCH 6/8] epoll: use infinite timeout for epoll_wait + +Since eventfd is now implemented. +--- + src/backend/epoll.zig | 4 +--- + 1 file changed, 1 insertion(+), 3 deletions(-) + +diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig +index f84c687..e3eee20 100644 +--- a/src/backend/epoll.zig ++++ b/src/backend/epoll.zig +@@ -375,9 +375,7 @@ pub const Loop = struct { + const timeout: i32 = if (wait_rem == 0) 0 else timeout: { + // If we have a timer, we want to set the timeout to our next + // timer value. If we have no timer, we wait forever. +- // TODO: do not wait 100ms here, use an eventfd for our +- // thread pool to wake us up. +- const t = self.timers.peek() orelse break :timeout 100; ++ const t = self.timers.peek() orelse break :timeout -1; + + // Determine the time in milliseconds. + const ms_now = @as(u64, @intCast(self.cached_now.tv_sec)) * std.time.ms_per_s + + +From 9ff535f9b6fac454d454f2799b93863dc9aba7d4 Mon Sep 17 00:00:00 2001 +From: Steeve Morin +Date: Fri, 17 Jan 2025 20:48:27 +0000 +Subject: [PATCH 7/8] epoll,kqueue: dispatch close in threadpool + +Close might block, so dispatch it inside a threadpool. +--- + src/backend/epoll.zig | 15 +++++++++++++-- + src/backend/kqueue.zig | 6 +++++- + src/watcher/stream.zig | 16 ++++++++++++++++ + 3 files changed, 34 insertions(+), 3 deletions(-) + +diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig +index e3eee20..0f4a2ac 100644 +--- a/src/backend/epoll.zig ++++ b/src/backend/epoll.zig +@@ -700,6 +700,13 @@ pub const Loop = struct { + }, + + .close => |v| res: { ++ if (completion.flags.threadpool) { ++ if (self.thread_schedule(completion)) |_| ++ return ++ else |err| ++ break :res .{ .close = err }; ++ } ++ + posix.close(v.fd); + break :res .{ .close = {} }; + }, +@@ -909,7 +916,6 @@ pub const Completion = struct { + // This should never happen because we always do these synchronously + // or in another location. + .cancel, +- .close, + .noop, + .shutdown, + .timer, +@@ -1015,6 +1021,11 @@ pub const Completion = struct { + err, + }; + }, ++ ++ .close => |*op| res: { ++ posix.close(op.fd); ++ break :res .{ .close = {} }; ++ }, + }; + } + +@@ -1277,7 +1288,7 @@ pub const AcceptError = posix.EpollCtlError || error{ + Unknown, + }; + +-pub const CloseError = posix.EpollCtlError || error{ ++pub const CloseError = posix.EpollCtlError || ThreadPoolError || error{ + Unknown, + }; + +diff --git a/src/backend/kqueue.zig b/src/backend/kqueue.zig +index 456f64f..9914309 100644 +--- a/src/backend/kqueue.zig ++++ b/src/backend/kqueue.zig +@@ -1102,7 +1102,6 @@ pub const Completion = struct { + fn perform(self: *Completion, ev_: ?*const Kevent) Result { + return switch (self.op) { + .cancel, +- .close, + .noop, + .timer, + .shutdown, +@@ -1232,6 +1231,11 @@ pub const Completion = struct { + + break :res .{ .proc = 0 }; + }, ++ ++ .close => |*op| res: { ++ posix.close(op.fd); ++ break :res .{ .close = {} }; ++ }, + }; + } + +diff --git a/src/watcher/stream.zig b/src/watcher/stream.zig +index 7f5df6f..bc95282 100644 +--- a/src/watcher/stream.zig ++++ b/src/watcher/stream.zig +@@ -80,6 +80,22 @@ pub fn Closeable(comptime xev: type, comptime T: type, comptime options: Options + }).callback, + }; + ++ // If we're dup-ing, then we ask the backend to manage the fd. ++ switch (xev.backend) { ++ .io_uring, ++ .wasi_poll, ++ .iocp, ++ => {}, ++ ++ .epoll => { ++ c.flags.threadpool = true; ++ }, ++ ++ .kqueue => { ++ c.flags.threadpool = true; ++ }, ++ } ++ + loop.add(c); + } + }; + +From 003416b0b410e70daab0d55ddc48e8e443f4fd09 Mon Sep 17 00:00:00 2001 +From: Steeve Morin +Date: Fri, 17 Jan 2025 20:59:18 +0000 +Subject: [PATCH 8/8] epoll: don't count immediate actions + +If an immediate action is dispatched, the loop might block +on epoll_wait even though only one action was requested. +--- + src/backend/epoll.zig | 15 ++++++++++++++- + 1 file changed, 14 insertions(+), 1 deletion(-) + +diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig +index 0f4a2ac..fb7e59e 100644 +--- a/src/backend/epoll.zig ++++ b/src/backend/epoll.zig +@@ -297,6 +297,7 @@ pub const Loop = struct { + + // Submit all the submissions. We copy the submission queue so that + // any resubmits don't cause an infinite loop. ++ var wait_rem: usize = @intCast(wait); + var queued = self.submissions; + self.submissions = .{}; + while (queued.pop()) |c| { +@@ -304,6 +305,19 @@ pub const Loop = struct { + // This usually means that we switched them to be deleted or + // something. + if (c.flags.state != .adding) continue; ++ ++ // These operations happen synchronously. Ensure they are ++ // decremented from wait_rem. ++ switch (c.op) { ++ .cancel, ++ // should noop be counted? ++ // .noop, ++ .shutdown, ++ .timer, ++ => wait_rem -|= 1, ++ else => {}, ++ } ++ + self.start(c); + } + +@@ -322,7 +336,6 @@ pub const Loop = struct { + + // Wait and process events. We only do this if we have any active. + var events: [1024]linux.epoll_event = undefined; +- var wait_rem: usize = @intCast(wait); + while (self.active > 0 and (wait == 0 or wait_rem > 0)) { + self.update_now(); + const now_timer: Operation.Timer = .{ .next = self.cached_now }; diff --git a/third_party/modules/libxev/20241208.1-db6a52b/MODULE.bazel b/third_party/modules/libxev/20241208.1-db6a52b/MODULE.bazel new file mode 100644 index 0000000..ea8c55d --- /dev/null +++ b/third_party/modules/libxev/20241208.1-db6a52b/MODULE.bazel @@ -0,0 +1,7 @@ +module( + name = "libxev", + version = "20241208.1-db6a52b", + compatibility_level = 1, +) + +bazel_dep(name = "rules_zig", version = "20240904.0-010da15") diff --git a/third_party/modules/libxev/20241208.1-db6a52b/overlay/BUILD.bazel b/third_party/modules/libxev/20241208.1-db6a52b/overlay/BUILD.bazel new file mode 100644 index 0000000..7c0c4ce --- /dev/null +++ b/third_party/modules/libxev/20241208.1-db6a52b/overlay/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_zig//zig:defs.bzl", "zig_library") + +zig_library( + name = "xev", + srcs = glob([ + "src/*.zig", + "src/backend/*.zig", + "src/linux/*.zig", + "src/watcher/*.zig", + ]), + main = "main2.zig", + visibility = ["//visibility:public"], +) diff --git a/third_party/modules/libxev/20241208.1-db6a52b/overlay/MODULE.bazel b/third_party/modules/libxev/20241208.1-db6a52b/overlay/MODULE.bazel new file mode 100644 index 0000000..ce556ab --- /dev/null +++ b/third_party/modules/libxev/20241208.1-db6a52b/overlay/MODULE.bazel @@ -0,0 +1,7 @@ +module( + name = "libxev", + version = "20241208.0-db6a52b", + compatibility_level = 1, +) + +bazel_dep(name = "rules_zig", version = "20240904.0-010da15") diff --git a/third_party/modules/libxev/20241208.1-db6a52b/overlay/main2.zig b/third_party/modules/libxev/20241208.1-db6a52b/overlay/main2.zig new file mode 100644 index 0000000..4166925 --- /dev/null +++ b/third_party/modules/libxev/20241208.1-db6a52b/overlay/main2.zig @@ -0,0 +1,22 @@ +const builtin = @import("builtin"); +const root = @import("root"); + +const main = @import("src/main.zig"); + +pub const ThreadPool = main.ThreadPool; +pub const stream = main.stream; + +pub const Options = struct { + linux_backend: main.Backend = .epoll, +}; + +pub const options: Options = if (@hasDecl(root, "xev_options")) root.xev_options else .{}; + +const default: main.Backend = switch (builtin.os.tag) { + .ios, .macos => .kqueue, + .linux => options.linux_backend, + .wasi => .wasi_poll, + .windows => .iocp, + else => @compileError("Unsupported OS"), +}; +pub usingnamespace default.Api(); diff --git a/third_party/modules/libxev/20241208.1-db6a52b/patches/128.patch b/third_party/modules/libxev/20241208.1-db6a52b/patches/128.patch new file mode 100644 index 0000000..361bc9d --- /dev/null +++ b/third_party/modules/libxev/20241208.1-db6a52b/patches/128.patch @@ -0,0 +1,323 @@ +From 8927108937e23ce2ad547a310dabd976ddcaa39d Mon Sep 17 00:00:00 2001 +From: Steeve Morin +Date: Tue, 19 Nov 2024 16:14:14 +0100 +Subject: [PATCH 1/5] backend/epoll: implement eventfd wakeup notification + +Tries to mimic what happens in backend/kqueue. + +Closes #4 +--- + src/backend/epoll.zig | 42 ++++++++++++++++++++++++++++++++++++++++++ + 1 file changed, 42 insertions(+) + +diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig +index ae4ec7d..f44d326 100644 +--- a/src/backend/epoll.zig ++++ b/src/backend/epoll.zig +@@ -21,6 +21,12 @@ pub const Loop = struct { + + fd: posix.fd_t, + ++ /// The eventfd that this epoll queue always has a filter for. Writing ++ /// an empty message to this eventfd can be used to wake up the loop ++ /// at any time. Waking up the loop via this eventfd won't trigger any ++ /// particular completion, it just forces tick to cycle. ++ eventfd: xev.Async, ++ + /// The number of active completions. This DOES NOT include completions that + /// are queued in the submissions queue. + active: usize = 0, +@@ -56,8 +62,12 @@ pub const Loop = struct { + } = .{}, + + pub fn init(options: xev.Options) !Loop { ++ var eventfd = try xev.Async.init(); ++ errdefer eventfd.deinit(); ++ + var res: Loop = .{ + .fd = try posix.epoll_create1(std.os.linux.EPOLL.CLOEXEC), ++ .eventfd = eventfd, + .thread_pool = options.thread_pool, + .thread_pool_completions = undefined, + .cached_now = undefined, +@@ -68,6 +78,7 @@ pub const Loop = struct { + + pub fn deinit(self: *Loop) void { + posix.close(self.fd); ++ self.eventfd.deinit(); + } + + /// Run the event loop. See RunMode documentation for details on modes. +@@ -262,9 +273,26 @@ pub const Loop = struct { + // Initialize + if (!self.flags.init) { + self.flags.init = true; ++ + if (self.thread_pool != null) { + self.thread_pool_completions.init(); + } ++ ++ var ev: linux.epoll_event = .{ ++ .events = linux.EPOLL.IN | linux.EPOLL.RDHUP, ++ .data = .{ .ptr = 0 }, ++ }; ++ posix.epoll_ctl( ++ self.fd, ++ linux.EPOLL.CTL_ADD, ++ self.eventfd.fd, ++ &ev, ++ ) catch |err| { ++ // We reset initialization because we can't do anything ++ // safely unless we get this mach port registered! ++ self.flags.init = false; ++ return err; ++ }; + } + + // Submit all the submissions. We copy the submission queue so that +@@ -369,6 +397,10 @@ pub const Loop = struct { + + // Process all our events and invoke their completion handlers + for (events[0..n]) |ev| { ++ // Zero data values are internal events that we do nothing ++ // on such as the eventfd wakeup. ++ if (ev.data.ptr == 0) continue; ++ + const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr))); + + // We get the fd and mark this as in progress we can properly +@@ -415,6 +447,7 @@ pub const Loop = struct { + const pool = self.thread_pool orelse return error.ThreadPoolRequired; + + // Setup our completion state so that thread_perform can do stuff ++ c.task_loop = self; + c.task_completions = &self.thread_pool_completions; + c.task = .{ .callback = Loop.thread_perform }; + +@@ -436,6 +469,14 @@ pub const Loop = struct { + + // Add to our completion queue + c.task_completions.push(c); ++ ++ // Wake up our main loop ++ c.task_loop.wakeup() catch {}; ++ } ++ ++ /// Sends an empty message to this loop's eventfd so that it wakes up. ++ fn wakeup(self: *Loop) !void { ++ try self.eventfd.notify(); + } + + fn start(self: *Loop, completion: *Completion) void { +@@ -800,6 +841,7 @@ pub const Completion = struct { + /// reliable way to get access to the loop and shouldn't be used + /// except internally. + task: ThreadPool.Task = undefined, ++ task_loop: *Loop = undefined, + task_completions: *Loop.TaskCompletionQueue = undefined, + task_result: Result = undefined, + + +From bf6c19eb1c06d1fb1b3cb5a66ea2ce8f743da5ba Mon Sep 17 00:00:00 2001 +From: Corentin Godeau +Date: Tue, 14 Jan 2025 14:43:54 +0000 +Subject: [PATCH 2/5] backend/epoll: read the wakeup eventfd to avoid being + awaken again + +--- + src/backend/epoll.zig | 11 +++++++---- + 1 file changed, 7 insertions(+), 4 deletions(-) + +diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig +index f44d326..f84c687 100644 +--- a/src/backend/epoll.zig ++++ b/src/backend/epoll.zig +@@ -280,7 +280,7 @@ pub const Loop = struct { + + var ev: linux.epoll_event = .{ + .events = linux.EPOLL.IN | linux.EPOLL.RDHUP, +- .data = .{ .ptr = 0 }, ++ .data = .{ .fd = self.eventfd.fd }, + }; + posix.epoll_ctl( + self.fd, +@@ -397,9 +397,12 @@ pub const Loop = struct { + + // Process all our events and invoke their completion handlers + for (events[0..n]) |ev| { +- // Zero data values are internal events that we do nothing +- // on such as the eventfd wakeup. +- if (ev.data.ptr == 0) continue; ++ // Handle wakeup eventfd ++ if (ev.data.fd == self.eventfd.fd) { ++ var buffer: u64 = undefined; ++ _ = posix.read(self.eventfd.fd, std.mem.asBytes(&buffer)) catch {}; ++ continue; ++ } + + const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr))); + + +From c0ae753a9874bf78a48bf25b4362b6cca4de1da1 Mon Sep 17 00:00:00 2001 +From: Steeve Morin +Date: Fri, 17 Jan 2025 20:47:42 +0000 +Subject: [PATCH 3/5] epoll: use infinite timeout for epoll_wait + +Since eventfd is now implemented. +--- + src/backend/epoll.zig | 4 +--- + 1 file changed, 1 insertion(+), 3 deletions(-) + +diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig +index f84c687..e3eee20 100644 +--- a/src/backend/epoll.zig ++++ b/src/backend/epoll.zig +@@ -375,9 +375,7 @@ pub const Loop = struct { + const timeout: i32 = if (wait_rem == 0) 0 else timeout: { + // If we have a timer, we want to set the timeout to our next + // timer value. If we have no timer, we wait forever. +- // TODO: do not wait 100ms here, use an eventfd for our +- // thread pool to wake us up. +- const t = self.timers.peek() orelse break :timeout 100; ++ const t = self.timers.peek() orelse break :timeout -1; + + // Determine the time in milliseconds. + const ms_now = @as(u64, @intCast(self.cached_now.tv_sec)) * std.time.ms_per_s + + +From 84e75a10ec1c10ea2a13edfcd8a31e582458fc88 Mon Sep 17 00:00:00 2001 +From: Steeve Morin +Date: Fri, 17 Jan 2025 20:48:27 +0000 +Subject: [PATCH 4/5] epoll: dispatch close in threadpool + +Close might block, so dispatch it inside a threadpool. +--- + src/backend/epoll.zig | 15 +++++++++++++-- + src/watcher/stream.zig | 16 ++++++++++++++++ + 2 files changed, 29 insertions(+), 2 deletions(-) + +diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig +index e3eee20..0f4a2ac 100644 +--- a/src/backend/epoll.zig ++++ b/src/backend/epoll.zig +@@ -700,6 +700,13 @@ pub const Loop = struct { + }, + + .close => |v| res: { ++ if (completion.flags.threadpool) { ++ if (self.thread_schedule(completion)) |_| ++ return ++ else |err| ++ break :res .{ .close = err }; ++ } ++ + posix.close(v.fd); + break :res .{ .close = {} }; + }, +@@ -909,7 +916,6 @@ pub const Completion = struct { + // This should never happen because we always do these synchronously + // or in another location. + .cancel, +- .close, + .noop, + .shutdown, + .timer, +@@ -1015,6 +1021,11 @@ pub const Completion = struct { + err, + }; + }, ++ ++ .close => |*op| res: { ++ posix.close(op.fd); ++ break :res .{ .close = {} }; ++ }, + }; + } + +@@ -1277,7 +1288,7 @@ pub const AcceptError = posix.EpollCtlError || error{ + Unknown, + }; + +-pub const CloseError = posix.EpollCtlError || error{ ++pub const CloseError = posix.EpollCtlError || ThreadPoolError || error{ + Unknown, + }; + +diff --git a/src/watcher/stream.zig b/src/watcher/stream.zig +index 7f5df6f..bc95282 100644 +--- a/src/watcher/stream.zig ++++ b/src/watcher/stream.zig +@@ -80,6 +80,22 @@ pub fn Closeable(comptime xev: type, comptime T: type, comptime options: Options + }).callback, + }; + ++ // If we're dup-ing, then we ask the backend to manage the fd. ++ switch (xev.backend) { ++ .io_uring, ++ .wasi_poll, ++ .iocp, ++ => {}, ++ ++ .epoll => { ++ c.flags.threadpool = true; ++ }, ++ ++ .kqueue => { ++ c.flags.threadpool = true; ++ }, ++ } ++ + loop.add(c); + } + }; + +From 5bc3ad9f23a03ff3af7a4ec33c8bfa4f3cc6bae9 Mon Sep 17 00:00:00 2001 +From: Steeve Morin +Date: Fri, 17 Jan 2025 20:59:18 +0000 +Subject: [PATCH 5/5] epoll: don't count immediate actions + +If an immediate action is dispatched, the loop might block +on epoll_wait even though only one action was requested. +--- + src/backend/epoll.zig | 15 ++++++++++++++- + 1 file changed, 14 insertions(+), 1 deletion(-) + +diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig +index 0f4a2ac..fb7e59e 100644 +--- a/src/backend/epoll.zig ++++ b/src/backend/epoll.zig +@@ -297,6 +297,7 @@ pub const Loop = struct { + + // Submit all the submissions. We copy the submission queue so that + // any resubmits don't cause an infinite loop. ++ var wait_rem: usize = @intCast(wait); + var queued = self.submissions; + self.submissions = .{}; + while (queued.pop()) |c| { +@@ -304,6 +305,19 @@ pub const Loop = struct { + // This usually means that we switched them to be deleted or + // something. + if (c.flags.state != .adding) continue; ++ ++ // These operations happen synchronously. Ensure they are ++ // decremented from wait_rem. ++ switch (c.op) { ++ .cancel, ++ // should noop be counted? ++ // .noop, ++ .shutdown, ++ .timer, ++ => wait_rem -|= 1, ++ else => {}, ++ } ++ + self.start(c); + } + +@@ -322,7 +336,6 @@ pub const Loop = struct { + + // Wait and process events. We only do this if we have any active. + var events: [1024]linux.epoll_event = undefined; +- var wait_rem: usize = @intCast(wait); + while (self.active > 0 and (wait == 0 or wait_rem > 0)) { + self.update_now(); + const now_timer: Operation.Timer = .{ .next = self.cached_now }; diff --git a/third_party/modules/libxev/20241208.1-db6a52b/source.json b/third_party/modules/libxev/20241208.1-db6a52b/source.json new file mode 100644 index 0000000..f08c951 --- /dev/null +++ b/third_party/modules/libxev/20241208.1-db6a52b/source.json @@ -0,0 +1,14 @@ +{ + "strip_prefix": "libxev-db6a52bafadf00360e675fefa7926e8e6c0e9931", + "url": "https://github.com/zml/libxev/archive/db6a52bafadf00360e675fefa7926e8e6c0e9931.tar.gz", + "integrity": "sha256-4GT5wkfkZnIjNv20yDiWEzHAhbIiwHHJfS7A4u/LoNQ=", + "overlay": { + "MODULE.bazel": "", + "BUILD.bazel": "", + "main2.zig": "" + }, + "patches": { + "128.patch": "" + }, + "patch_strip": 1 +} diff --git a/zml/aio.zig b/zml/aio.zig index 0c49824..62c7e90 100644 --- a/zml/aio.zig +++ b/zml/aio.zig @@ -116,7 +116,9 @@ pub const BufferStore = struct { } pub fn deinit(self: BufferStore) void { - for (self.files) |*file| file.deinit(); + for (self.files) |*file| { + file.deinit(); + } self.arena.deinit(); }