Fix async hangs by reworking the libxev epoll backend and using callBlocking for PJRT plugin loading, improving performance across async and runtime modules.

This commit is contained in:
Tarry Singh 2024-01-16 14:13:45 +00:00
parent 434cee3a6c
commit a7b7ae0180
18 changed files with 720 additions and 40 deletions

View File

@ -84,7 +84,7 @@ use_repo(zls, "zls_aarch64-macos", "zls_x86_64-linux")
register_toolchains("//third_party/zls:all")
bazel_dep(name = "libxev", version = "20241208.0-db6a52b")
bazel_dep(name = "libxev", version = "20241208.1-db6a52b")
bazel_dep(name = "llvm-raw", version = "20250102.0-f739aa4")
llvm = use_extension("@llvm-raw//utils/bazel:extension.bzl", "llvm")

View File

@ -48,8 +48,8 @@ fn FrameExx(comptime func: anytype, comptime argsT: type, comptime returnT: type
pub const await_ = awaitt;
pub fn awaitt(self: *Self) returnT {
defer {
AsyncThread.current.stack_allocator.destroy(&self.inner._frame.stack);
self.inner.deinit();
AsyncThread.current.stack_allocator.destroy(&self.inner._frame.stack);
self.* = undefined;
}
return coro.xawait(self.inner);
@ -145,16 +145,14 @@ pub const threading = struct {
.waiting = &waiter,
};
if (self.waiter.cmpxchgStrong(&State.unset_state, &new_state, .monotonic, .monotonic) == null) {
while (self.isSet() == false) {
coro.xsuspend();
}
}
}
};
};
pub const AsyncThread = struct {
threadlocal var current: *const AsyncThread = undefined;
threadlocal var current: *AsyncThread = undefined;
executor: *aio.Executor,
stack_allocator: *stack.StackAllocator,
@ -167,8 +165,8 @@ pub const AsyncThread = struct {
self.async_notifier.notify() catch {};
}
fn waker_cb(q: ?*threading.WaiterQueue, _: *xev.Loop, _: *xev.Completion, _: xev.Async.WaitError!void) xev.CallbackAction {
while (q.?.pop()) |waiter| {
fn wakerCallback(self: ?*AsyncThread, _: *xev.Loop, _: *xev.Completion, _: xev.Async.WaitError!void) xev.CallbackAction {
while (self.?.waiters_queue.pop()) |waiter| {
coro.xresume(waiter.frame);
}
return .rearm;
@ -194,13 +192,10 @@ pub const AsyncThread = struct {
var waiters_queue: threading.WaiterQueue = undefined;
waiters_queue.init();
var c: xev.Completion = undefined;
async_notifier.wait(&loop, &c, threading.WaiterQueue, &waiters_queue, &waker_cb);
var stack_allocator = stack.StackAllocator.init(allocator);
defer stack_allocator.deinit();
AsyncThread.current = &.{
var asyncThread: AsyncThread = .{
.executor = &executor_,
.stack_allocator = &stack_allocator,
.loop = &loop,
@ -208,6 +203,10 @@ pub const AsyncThread = struct {
.async_notifier = &async_notifier,
.waiters_queue = &waiters_queue,
};
AsyncThread.current = &asyncThread;
var c2: xev.Completion = undefined;
async_notifier.wait(AsyncThread.current.loop, &c2, AsyncThread, AsyncThread.current, &AsyncThread.wakerCallback);
// allocate the main coroutine stack, on the current thread's stack!
var mainStackData: stack.Stack.Data = undefined;
@ -527,6 +526,10 @@ pub fn logFn(
comptime format: []const u8,
args: anytype,
) void {
if (coro.inCoro() == false) {
return std.log.defaultLog(message_level, scope, format, args);
}
const level_txt = comptime message_level.asText();
const prefix2 = if (scope == .default) ": " else "(" ++ @tagName(scope) ++ "): ";
const stderr = getStdErr().writer();

View File

@ -52,7 +52,7 @@ pub fn run(
args: anytype,
stack: anytype,
) !stdx.meta.FnSignature(func, @TypeOf(args)).ReturnPayloadT {
stdx.debug.assert(libcoro.inCoro() == false, "Not in a corouine", .{});
stdx.debug.assert(libcoro.inCoro() == false, "Not in a coroutine", .{});
const frame = try xasync(func, args, stack);
defer frame.deinit();
try runCoro(exec, frame);
@ -88,20 +88,20 @@ pub fn sleep(exec: *Executor, ms: u64) !void {
return data.result;
}
fn waitForCompletionOutsideCoro(exec: *Executor, c: *xev.Completion) !void {
pub fn waitForCompletionOutsideCoro(exec: *Executor, c: *xev.Completion) !void {
@setCold(true);
while (c.state() != .dead) {
try exec.tick();
}
}
fn waitForCompletionInCoro(c: *xev.Completion) void {
pub fn waitForCompletionInCoro(c: *xev.Completion) void {
while (c.state() != .dead) {
libcoro.xsuspend();
}
}
fn waitForCompletion(exec: *Executor, c: *xev.Completion) !void {
pub fn waitForCompletion(exec: *Executor, c: *xev.Completion) !void {
if (libcoro.inCoro()) {
waitForCompletionInCoro(c);
} else {

View File

@ -76,7 +76,7 @@ pub fn xresume(frame: anytype) void {
/// Must be called from within a coroutine (i.e. not the top level).
pub fn xsuspend() void {
xsuspendSafe() catch |e| {
log.err("{any}\n", .{e});
log.err("{any}", .{e});
@panic("xsuspend");
};
}
@ -160,10 +160,10 @@ const Coro = struct {
fn runcoro(from: *base.Coro, this: *base.Coro) callconv(.C) noreturn {
const from_coro: *Coro = @fieldParentPtr("impl", from);
const this_coro: *Coro = @fieldParentPtr("impl", this);
log.debug("coro start {any}\n", .{this_coro.id});
log.debug("coro start {any}", .{this_coro.id});
@call(.auto, this_coro.func, .{});
this_coro.status = .Done;
log.debug("coro done {any}\n", .{this_coro.id});
log.debug("coro done {any}", .{this_coro.id});
thread_state.switchOut(from_coro);
// Never returns
@ -258,7 +258,7 @@ const CoroT = struct {
fn wrapfn() void {
const storage = thread_state.currentStorage(InnerStorage);
storage.retval = @call(
.always_inline,
.auto,
Sig.Func.Value,
storage.args,
);
@ -317,7 +317,7 @@ const ThreadState = struct {
/// Called from resume
fn switchIn(self: *ThreadState, target: Frame) void {
log.debug("coro resume {any} from {any}\n", .{ target.id, self.current().id });
log.debug("coro resume {any} from {any}", .{ target.id, self.current().id });
// Switch to target, setting this coro as the resumer.
self.switchTo(target, true);
@ -332,7 +332,7 @@ const ThreadState = struct {
/// Called from suspend
fn switchOut(self: *ThreadState, target: Frame) void {
log.debug("coro suspend {any} to {any}\n", .{ self.current().id, target.id });
log.debug("coro suspend {any} to {any}", .{ self.current().id, target.id });
self.switchTo(target, false);
}

View File

@ -21,7 +21,10 @@ zig_library(
deps = [
"//pjrt",
] + select({
"//runtimes:cpu.enabled": [":libpjrt_cpu"],
"//runtimes:cpu.enabled": [
":libpjrt_cpu",
"//async",
],
"//conditions:default": [":empty"],
}),
)

View File

@ -1,6 +1,8 @@
const builtin = @import("builtin");
const pjrt = @import("pjrt");
const asynk = @import("async");
const c = @import("c");
const pjrt = @import("pjrt");
pub fn isEnabled() bool {
return @hasDecl(c, "ZML_RUNTIME_CPU");
@ -16,5 +18,5 @@ pub fn load() !*const pjrt.Api {
.macos, .ios, .watchos => ".dylib",
else => ".so",
};
return try pjrt.Api.loadFrom("libpjrt_cpu" ++ ext);
return try asynk.callBlocking(pjrt.Api.loadFrom, .{"libpjrt_cpu" ++ ext});
}

View File

@ -1,8 +1,8 @@
const builtin = @import("builtin");
const std = @import("std");
const asynk = @import("async");
const bazel_builtin = @import("bazel_builtin");
const builtin = @import("builtin");
const c = @import("c");
const pjrt = @import("pjrt");
const runfiles = @import("runfiles");
@ -49,5 +49,5 @@ pub fn load() !*const pjrt.Api {
// See https://github.com/openxla/xla/issues/21428
try setupXlaGpuCudaDirFlag();
return try pjrt.Api.loadFrom("libpjrt_cuda.so");
return try asynk.callBlocking(pjrt.Api.loadFrom, .{"libpjrt_cuda.so"});
}

View File

@ -1,11 +1,12 @@
const builtin = @import("builtin");
const asynk = @import("async");
const pjrt = @import("pjrt");
const c = @import("c");
const std = @import("std");
const runfiles = @import("runfiles");
const asynk = @import("async");
const bazel_builtin = @import("bazel_builtin");
const c = @import("c");
const libneuronxla_pyenv = @import("libneuronxla_pyenv");
const pjrt = @import("pjrt");
const runfiles = @import("runfiles");
pub fn isEnabled() bool {
return @hasDecl(c, "ZML_RUNTIME_NEURON");
@ -136,5 +137,5 @@ pub fn load() !*const pjrt.Api {
setNeuronCCFlags();
try initialize();
return try pjrt.Api.loadFrom("libpjrt_neuron.so");
return try asynk.callBlocking(pjrt.Api.loadFrom, .{"libpjrt_neuron.so"});
}

View File

@ -70,5 +70,5 @@ pub fn load() !*const pjrt.Api {
try setupRocmEnv();
return try pjrt.Api.loadFrom("libpjrt_rocm.so");
return try asynk.callBlocking(pjrt.Api.loadFrom, .{"libpjrt_rocm.so"});
}

View File

@ -1,8 +1,9 @@
const builtin = @import("builtin");
const std = @import("std");
const asynk = @import("async");
const pjrt = @import("pjrt");
const c = @import("c");
const std = @import("std");
pub fn isEnabled() bool {
return @hasDecl(c, "ZML_RUNTIME_TPU");
@ -36,5 +37,5 @@ pub fn load() !*const pjrt.Api {
return error.Unavailable;
}
return try pjrt.Api.loadFrom("libpjrt_tpu.so");
return try asynk.callBlocking(pjrt.Api.loadFrom, .{"libpjrt_tpu.so"});
}

View File

@ -1,7 +1,100 @@
From 0d1c2f8258072148459d3114b9ccaf43c02e0958 Mon Sep 17 00:00:00 2001
From 12fb50bdd4b62c8e111f12eecb171257b6af6038 Mon Sep 17 00:00:00 2001
From: Mitchell Hashimoto <m@mitchellh.com>
Date: Mon, 4 Nov 2024 13:30:10 -0800
Subject: [PATCH 1/8] Revert "Fix potential ThreadPool UAF (#113)"
This reverts commit dbe22910a43e9e8ec9948d3cbd73d8488a074967, reversing
changes made to 43c7e4b3308f359e5b758db2d824d7c447f4ed3f.
---
src/ThreadPool.zig | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig
index 9e0ff02..1d0123b 100644
--- a/src/ThreadPool.zig
+++ b/src/ThreadPool.zig
@@ -335,8 +335,12 @@ fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void {
fn join(self: *ThreadPool) void {
// Wait for the thread pool to be shutdown() then for all threads to enter a joinable state
- self.join_event.wait();
- const sync: Sync = @bitCast(self.sync.load(.monotonic));
+ var sync: Sync = @bitCast(self.sync.load(.monotonic));
+ if (!(sync.state == .shutdown and sync.spawned == 0)) {
+ self.join_event.wait();
+ sync = @bitCast(self.sync.load(.monotonic));
+ }
+
assert(sync.state == .shutdown);
assert(sync.spawned == 0);
From f6dea8bb85593cca6d4a54d92d4b05fe15b1eb41 Mon Sep 17 00:00:00 2001
From: Corentin Godeau <corentin.godeau@zml.ai>
Date: Tue, 10 Sep 2024 11:23:28 +0200
Subject: [PATCH 2/8] fix: avoid overflow in kqueue backend
When there is more than 256 events, the `events` slice could overflow
when processing completions.
---
src/backend/kqueue.zig | 3 +++
1 file changed, 3 insertions(+)
diff --git a/src/backend/kqueue.zig b/src/backend/kqueue.zig
index 834f916..456f64f 100644
--- a/src/backend/kqueue.zig
+++ b/src/backend/kqueue.zig
@@ -437,6 +437,9 @@ pub const Loop = struct {
// Only resubmit if we aren't already active (in the queue)
.rearm => if (!c_active) self.submissions.push(c),
}
+
+ // If we filled the events slice, we break to avoid overflow.
+ if (changes == events.len) break;
}
// Determine our next timeout based on the timers
From 890e2711ff9eb82bbceda2e6229b9c9a8d0b60da Mon Sep 17 00:00:00 2001
From: David Rubin <daviru007@icloud.com>
Date: Fri, 6 Dec 2024 20:43:52 -0800
Subject: [PATCH 3/8] remove race from threadpool
---
src/ThreadPool.zig | 8 +++-----
1 file changed, 3 insertions(+), 5 deletions(-)
diff --git a/src/ThreadPool.zig b/src/ThreadPool.zig
index 1d0123b..aec09d7 100644
--- a/src/ThreadPool.zig
+++ b/src/ThreadPool.zig
@@ -291,6 +291,7 @@ pub noinline fn shutdown(self: *ThreadPool) void {
// Wake up any threads sleeping on the idle_event.
// TODO: I/O polling notification here.
if (sync.idle > 0) self.idle_event.shutdown();
+ if (sync.spawned == 0) self.join_event.notify();
return;
});
}
@@ -335,11 +336,8 @@ fn unregister(noalias self: *ThreadPool, noalias maybe_thread: ?*Thread) void {
fn join(self: *ThreadPool) void {
// Wait for the thread pool to be shutdown() then for all threads to enter a joinable state
- var sync: Sync = @bitCast(self.sync.load(.monotonic));
- if (!(sync.state == .shutdown and sync.spawned == 0)) {
- self.join_event.wait();
- sync = @bitCast(self.sync.load(.monotonic));
- }
+ self.join_event.wait();
+ const sync: Sync = @bitCast(self.sync.load(.monotonic));
assert(sync.state == .shutdown);
assert(sync.spawned == 0);
From d82ee736a1548e6564a296315f33902ae09aa298 Mon Sep 17 00:00:00 2001
From: Steeve Morin <steeve@zml.ai>
Date: Tue, 19 Nov 2024 16:14:14 +0100
Subject: [PATCH 1/2] backend/epoll: implement eventfd wakeup notification
Subject: [PATCH 4/8] backend/epoll: implement eventfd wakeup notification
Tries to mimic what happens in backend/kqueue.
@ -118,10 +211,10 @@ index ae4ec7d..f44d326 100644
task_result: Result = undefined,
From 38d4dbed71a732b0fc30c1181354ad9d53919402 Mon Sep 17 00:00:00 2001
From 320f1ae9ccc95ce814e246b35ca0f9fd2361f43e Mon Sep 17 00:00:00 2001
From: Corentin Godeau <corentin@zml.ai>
Date: Tue, 14 Jan 2025 14:43:54 +0000
Subject: [PATCH 2/2] backend/epoll: read the wakeup eventfd to avoid being
Subject: [PATCH 5/8] backend/epoll: read the wakeup eventfd to avoid being
awaken again
---
@ -157,3 +250,192 @@ index f44d326..f84c687 100644
const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr)));
From 258a64180c901b5e1f9a2c75f1ac77a758411450 Mon Sep 17 00:00:00 2001
From: Steeve Morin <steeve@zml.ai>
Date: Fri, 17 Jan 2025 20:47:42 +0000
Subject: [PATCH 6/8] epoll: use infinite timeout for epoll_wait
Since eventfd is now implemented.
---
src/backend/epoll.zig | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig
index f84c687..e3eee20 100644
--- a/src/backend/epoll.zig
+++ b/src/backend/epoll.zig
@@ -375,9 +375,7 @@ pub const Loop = struct {
const timeout: i32 = if (wait_rem == 0) 0 else timeout: {
// If we have a timer, we want to set the timeout to our next
// timer value. If we have no timer, we wait forever.
- // TODO: do not wait 100ms here, use an eventfd for our
- // thread pool to wake us up.
- const t = self.timers.peek() orelse break :timeout 100;
+ const t = self.timers.peek() orelse break :timeout -1;
// Determine the time in milliseconds.
const ms_now = @as(u64, @intCast(self.cached_now.tv_sec)) * std.time.ms_per_s +
From 9ff535f9b6fac454d454f2799b93863dc9aba7d4 Mon Sep 17 00:00:00 2001
From: Steeve Morin <steeve@zml.ai>
Date: Fri, 17 Jan 2025 20:48:27 +0000
Subject: [PATCH 7/8] epoll,kqueue: dispatch close in threadpool
Close might block, so dispatch it inside a threadpool.
---
src/backend/epoll.zig | 15 +++++++++++++--
src/backend/kqueue.zig | 6 +++++-
src/watcher/stream.zig | 16 ++++++++++++++++
3 files changed, 34 insertions(+), 3 deletions(-)
diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig
index e3eee20..0f4a2ac 100644
--- a/src/backend/epoll.zig
+++ b/src/backend/epoll.zig
@@ -700,6 +700,13 @@ pub const Loop = struct {
},
.close => |v| res: {
+ if (completion.flags.threadpool) {
+ if (self.thread_schedule(completion)) |_|
+ return
+ else |err|
+ break :res .{ .close = err };
+ }
+
posix.close(v.fd);
break :res .{ .close = {} };
},
@@ -909,7 +916,6 @@ pub const Completion = struct {
// This should never happen because we always do these synchronously
// or in another location.
.cancel,
- .close,
.noop,
.shutdown,
.timer,
@@ -1015,6 +1021,11 @@ pub const Completion = struct {
err,
};
},
+
+ .close => |*op| res: {
+ posix.close(op.fd);
+ break :res .{ .close = {} };
+ },
};
}
@@ -1277,7 +1288,7 @@ pub const AcceptError = posix.EpollCtlError || error{
Unknown,
};
-pub const CloseError = posix.EpollCtlError || error{
+pub const CloseError = posix.EpollCtlError || ThreadPoolError || error{
Unknown,
};
diff --git a/src/backend/kqueue.zig b/src/backend/kqueue.zig
index 456f64f..9914309 100644
--- a/src/backend/kqueue.zig
+++ b/src/backend/kqueue.zig
@@ -1102,7 +1102,6 @@ pub const Completion = struct {
fn perform(self: *Completion, ev_: ?*const Kevent) Result {
return switch (self.op) {
.cancel,
- .close,
.noop,
.timer,
.shutdown,
@@ -1232,6 +1231,11 @@ pub const Completion = struct {
break :res .{ .proc = 0 };
},
+
+ .close => |*op| res: {
+ posix.close(op.fd);
+ break :res .{ .close = {} };
+ },
};
}
diff --git a/src/watcher/stream.zig b/src/watcher/stream.zig
index 7f5df6f..bc95282 100644
--- a/src/watcher/stream.zig
+++ b/src/watcher/stream.zig
@@ -80,6 +80,22 @@ pub fn Closeable(comptime xev: type, comptime T: type, comptime options: Options
}).callback,
};
+ // If we're dup-ing, then we ask the backend to manage the fd.
+ switch (xev.backend) {
+ .io_uring,
+ .wasi_poll,
+ .iocp,
+ => {},
+
+ .epoll => {
+ c.flags.threadpool = true;
+ },
+
+ .kqueue => {
+ c.flags.threadpool = true;
+ },
+ }
+
loop.add(c);
}
};
From 003416b0b410e70daab0d55ddc48e8e443f4fd09 Mon Sep 17 00:00:00 2001
From: Steeve Morin <steeve@zml.ai>
Date: Fri, 17 Jan 2025 20:59:18 +0000
Subject: [PATCH 8/8] epoll: don't count immediate actions
If an immediate action is dispatched, the loop might block
on epoll_wait even though only one action was requested.
---
src/backend/epoll.zig | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig
index 0f4a2ac..fb7e59e 100644
--- a/src/backend/epoll.zig
+++ b/src/backend/epoll.zig
@@ -297,6 +297,7 @@ pub const Loop = struct {
// Submit all the submissions. We copy the submission queue so that
// any resubmits don't cause an infinite loop.
+ var wait_rem: usize = @intCast(wait);
var queued = self.submissions;
self.submissions = .{};
while (queued.pop()) |c| {
@@ -304,6 +305,19 @@ pub const Loop = struct {
// This usually means that we switched them to be deleted or
// something.
if (c.flags.state != .adding) continue;
+
+ // These operations happen synchronously. Ensure they are
+ // decremented from wait_rem.
+ switch (c.op) {
+ .cancel,
+ // should noop be counted?
+ // .noop,
+ .shutdown,
+ .timer,
+ => wait_rem -|= 1,
+ else => {},
+ }
+
self.start(c);
}
@@ -322,7 +336,6 @@ pub const Loop = struct {
// Wait and process events. We only do this if we have any active.
var events: [1024]linux.epoll_event = undefined;
- var wait_rem: usize = @intCast(wait);
while (self.active > 0 and (wait == 0 or wait_rem > 0)) {
self.update_now();
const now_timer: Operation.Timer = .{ .next = self.cached_now };

View File

@ -0,0 +1,7 @@
module(
name = "libxev",
version = "20241208.1-db6a52b",
compatibility_level = 1,
)
bazel_dep(name = "rules_zig", version = "20240904.0-010da15")

View File

@ -0,0 +1,13 @@
load("@rules_zig//zig:defs.bzl", "zig_library")
zig_library(
name = "xev",
srcs = glob([
"src/*.zig",
"src/backend/*.zig",
"src/linux/*.zig",
"src/watcher/*.zig",
]),
main = "main2.zig",
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,7 @@
module(
name = "libxev",
version = "20241208.0-db6a52b",
compatibility_level = 1,
)
bazel_dep(name = "rules_zig", version = "20240904.0-010da15")

View File

@ -0,0 +1,22 @@
const builtin = @import("builtin");
const root = @import("root");
const main = @import("src/main.zig");
pub const ThreadPool = main.ThreadPool;
pub const stream = main.stream;
pub const Options = struct {
linux_backend: main.Backend = .epoll,
};
pub const options: Options = if (@hasDecl(root, "xev_options")) root.xev_options else .{};
const default: main.Backend = switch (builtin.os.tag) {
.ios, .macos => .kqueue,
.linux => options.linux_backend,
.wasi => .wasi_poll,
.windows => .iocp,
else => @compileError("Unsupported OS"),
};
pub usingnamespace default.Api();

View File

@ -0,0 +1,323 @@
From 8927108937e23ce2ad547a310dabd976ddcaa39d Mon Sep 17 00:00:00 2001
From: Steeve Morin <steeve@zml.ai>
Date: Tue, 19 Nov 2024 16:14:14 +0100
Subject: [PATCH 1/5] backend/epoll: implement eventfd wakeup notification
Tries to mimic what happens in backend/kqueue.
Closes #4
---
src/backend/epoll.zig | 42 ++++++++++++++++++++++++++++++++++++++++++
1 file changed, 42 insertions(+)
diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig
index ae4ec7d..f44d326 100644
--- a/src/backend/epoll.zig
+++ b/src/backend/epoll.zig
@@ -21,6 +21,12 @@ pub const Loop = struct {
fd: posix.fd_t,
+ /// The eventfd that this epoll queue always has a filter for. Writing
+ /// an empty message to this eventfd can be used to wake up the loop
+ /// at any time. Waking up the loop via this eventfd won't trigger any
+ /// particular completion, it just forces tick to cycle.
+ eventfd: xev.Async,
+
/// The number of active completions. This DOES NOT include completions that
/// are queued in the submissions queue.
active: usize = 0,
@@ -56,8 +62,12 @@ pub const Loop = struct {
} = .{},
pub fn init(options: xev.Options) !Loop {
+ var eventfd = try xev.Async.init();
+ errdefer eventfd.deinit();
+
var res: Loop = .{
.fd = try posix.epoll_create1(std.os.linux.EPOLL.CLOEXEC),
+ .eventfd = eventfd,
.thread_pool = options.thread_pool,
.thread_pool_completions = undefined,
.cached_now = undefined,
@@ -68,6 +78,7 @@ pub const Loop = struct {
pub fn deinit(self: *Loop) void {
posix.close(self.fd);
+ self.eventfd.deinit();
}
/// Run the event loop. See RunMode documentation for details on modes.
@@ -262,9 +273,26 @@ pub const Loop = struct {
// Initialize
if (!self.flags.init) {
self.flags.init = true;
+
if (self.thread_pool != null) {
self.thread_pool_completions.init();
}
+
+ var ev: linux.epoll_event = .{
+ .events = linux.EPOLL.IN | linux.EPOLL.RDHUP,
+ .data = .{ .ptr = 0 },
+ };
+ posix.epoll_ctl(
+ self.fd,
+ linux.EPOLL.CTL_ADD,
+ self.eventfd.fd,
+ &ev,
+ ) catch |err| {
+ // We reset initialization because we can't do anything
+ // safely unless we get this mach port registered!
+ self.flags.init = false;
+ return err;
+ };
}
// Submit all the submissions. We copy the submission queue so that
@@ -369,6 +397,10 @@ pub const Loop = struct {
// Process all our events and invoke their completion handlers
for (events[0..n]) |ev| {
+ // Zero data values are internal events that we do nothing
+ // on such as the eventfd wakeup.
+ if (ev.data.ptr == 0) continue;
+
const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr)));
// We get the fd and mark this as in progress we can properly
@@ -415,6 +447,7 @@ pub const Loop = struct {
const pool = self.thread_pool orelse return error.ThreadPoolRequired;
// Setup our completion state so that thread_perform can do stuff
+ c.task_loop = self;
c.task_completions = &self.thread_pool_completions;
c.task = .{ .callback = Loop.thread_perform };
@@ -436,6 +469,14 @@ pub const Loop = struct {
// Add to our completion queue
c.task_completions.push(c);
+
+ // Wake up our main loop
+ c.task_loop.wakeup() catch {};
+ }
+
+ /// Sends an empty message to this loop's eventfd so that it wakes up.
+ fn wakeup(self: *Loop) !void {
+ try self.eventfd.notify();
}
fn start(self: *Loop, completion: *Completion) void {
@@ -800,6 +841,7 @@ pub const Completion = struct {
/// reliable way to get access to the loop and shouldn't be used
/// except internally.
task: ThreadPool.Task = undefined,
+ task_loop: *Loop = undefined,
task_completions: *Loop.TaskCompletionQueue = undefined,
task_result: Result = undefined,
From bf6c19eb1c06d1fb1b3cb5a66ea2ce8f743da5ba Mon Sep 17 00:00:00 2001
From: Corentin Godeau <corentin@zml.ai>
Date: Tue, 14 Jan 2025 14:43:54 +0000
Subject: [PATCH 2/5] backend/epoll: read the wakeup eventfd to avoid being
awaken again
---
src/backend/epoll.zig | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig
index f44d326..f84c687 100644
--- a/src/backend/epoll.zig
+++ b/src/backend/epoll.zig
@@ -280,7 +280,7 @@ pub const Loop = struct {
var ev: linux.epoll_event = .{
.events = linux.EPOLL.IN | linux.EPOLL.RDHUP,
- .data = .{ .ptr = 0 },
+ .data = .{ .fd = self.eventfd.fd },
};
posix.epoll_ctl(
self.fd,
@@ -397,9 +397,12 @@ pub const Loop = struct {
// Process all our events and invoke their completion handlers
for (events[0..n]) |ev| {
- // Zero data values are internal events that we do nothing
- // on such as the eventfd wakeup.
- if (ev.data.ptr == 0) continue;
+ // Handle wakeup eventfd
+ if (ev.data.fd == self.eventfd.fd) {
+ var buffer: u64 = undefined;
+ _ = posix.read(self.eventfd.fd, std.mem.asBytes(&buffer)) catch {};
+ continue;
+ }
const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr)));
From c0ae753a9874bf78a48bf25b4362b6cca4de1da1 Mon Sep 17 00:00:00 2001
From: Steeve Morin <steeve@zml.ai>
Date: Fri, 17 Jan 2025 20:47:42 +0000
Subject: [PATCH 3/5] epoll: use infinite timeout for epoll_wait
Since eventfd is now implemented.
---
src/backend/epoll.zig | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig
index f84c687..e3eee20 100644
--- a/src/backend/epoll.zig
+++ b/src/backend/epoll.zig
@@ -375,9 +375,7 @@ pub const Loop = struct {
const timeout: i32 = if (wait_rem == 0) 0 else timeout: {
// If we have a timer, we want to set the timeout to our next
// timer value. If we have no timer, we wait forever.
- // TODO: do not wait 100ms here, use an eventfd for our
- // thread pool to wake us up.
- const t = self.timers.peek() orelse break :timeout 100;
+ const t = self.timers.peek() orelse break :timeout -1;
// Determine the time in milliseconds.
const ms_now = @as(u64, @intCast(self.cached_now.tv_sec)) * std.time.ms_per_s +
From 84e75a10ec1c10ea2a13edfcd8a31e582458fc88 Mon Sep 17 00:00:00 2001
From: Steeve Morin <steeve@zml.ai>
Date: Fri, 17 Jan 2025 20:48:27 +0000
Subject: [PATCH 4/5] epoll: dispatch close in threadpool
Close might block, so dispatch it inside a threadpool.
---
src/backend/epoll.zig | 15 +++++++++++++--
src/watcher/stream.zig | 16 ++++++++++++++++
2 files changed, 29 insertions(+), 2 deletions(-)
diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig
index e3eee20..0f4a2ac 100644
--- a/src/backend/epoll.zig
+++ b/src/backend/epoll.zig
@@ -700,6 +700,13 @@ pub const Loop = struct {
},
.close => |v| res: {
+ if (completion.flags.threadpool) {
+ if (self.thread_schedule(completion)) |_|
+ return
+ else |err|
+ break :res .{ .close = err };
+ }
+
posix.close(v.fd);
break :res .{ .close = {} };
},
@@ -909,7 +916,6 @@ pub const Completion = struct {
// This should never happen because we always do these synchronously
// or in another location.
.cancel,
- .close,
.noop,
.shutdown,
.timer,
@@ -1015,6 +1021,11 @@ pub const Completion = struct {
err,
};
},
+
+ .close => |*op| res: {
+ posix.close(op.fd);
+ break :res .{ .close = {} };
+ },
};
}
@@ -1277,7 +1288,7 @@ pub const AcceptError = posix.EpollCtlError || error{
Unknown,
};
-pub const CloseError = posix.EpollCtlError || error{
+pub const CloseError = posix.EpollCtlError || ThreadPoolError || error{
Unknown,
};
diff --git a/src/watcher/stream.zig b/src/watcher/stream.zig
index 7f5df6f..bc95282 100644
--- a/src/watcher/stream.zig
+++ b/src/watcher/stream.zig
@@ -80,6 +80,22 @@ pub fn Closeable(comptime xev: type, comptime T: type, comptime options: Options
}).callback,
};
+ // If we're dup-ing, then we ask the backend to manage the fd.
+ switch (xev.backend) {
+ .io_uring,
+ .wasi_poll,
+ .iocp,
+ => {},
+
+ .epoll => {
+ c.flags.threadpool = true;
+ },
+
+ .kqueue => {
+ c.flags.threadpool = true;
+ },
+ }
+
loop.add(c);
}
};
From 5bc3ad9f23a03ff3af7a4ec33c8bfa4f3cc6bae9 Mon Sep 17 00:00:00 2001
From: Steeve Morin <steeve@zml.ai>
Date: Fri, 17 Jan 2025 20:59:18 +0000
Subject: [PATCH 5/5] epoll: don't count immediate actions
If an immediate action is dispatched, the loop might block
on epoll_wait even though only one action was requested.
---
src/backend/epoll.zig | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig
index 0f4a2ac..fb7e59e 100644
--- a/src/backend/epoll.zig
+++ b/src/backend/epoll.zig
@@ -297,6 +297,7 @@ pub const Loop = struct {
// Submit all the submissions. We copy the submission queue so that
// any resubmits don't cause an infinite loop.
+ var wait_rem: usize = @intCast(wait);
var queued = self.submissions;
self.submissions = .{};
while (queued.pop()) |c| {
@@ -304,6 +305,19 @@ pub const Loop = struct {
// This usually means that we switched them to be deleted or
// something.
if (c.flags.state != .adding) continue;
+
+ // These operations happen synchronously. Ensure they are
+ // decremented from wait_rem.
+ switch (c.op) {
+ .cancel,
+ // should noop be counted?
+ // .noop,
+ .shutdown,
+ .timer,
+ => wait_rem -|= 1,
+ else => {},
+ }
+
self.start(c);
}
@@ -322,7 +336,6 @@ pub const Loop = struct {
// Wait and process events. We only do this if we have any active.
var events: [1024]linux.epoll_event = undefined;
- var wait_rem: usize = @intCast(wait);
while (self.active > 0 and (wait == 0 or wait_rem > 0)) {
self.update_now();
const now_timer: Operation.Timer = .{ .next = self.cached_now };

View File

@ -0,0 +1,14 @@
{
"strip_prefix": "libxev-db6a52bafadf00360e675fefa7926e8e6c0e9931",
"url": "https://github.com/zml/libxev/archive/db6a52bafadf00360e675fefa7926e8e6c0e9931.tar.gz",
"integrity": "sha256-4GT5wkfkZnIjNv20yDiWEzHAhbIiwHHJfS7A4u/LoNQ=",
"overlay": {
"MODULE.bazel": "",
"BUILD.bazel": "",
"main2.zig": ""
},
"patches": {
"128.patch": ""
},
"patch_strip": 1
}

View File

@ -116,7 +116,9 @@ pub const BufferStore = struct {
}
pub fn deinit(self: BufferStore) void {
for (self.files) |*file| file.deinit();
for (self.files) |*file| {
file.deinit();
}
self.arena.deinit();
}