160 lines
5.7 KiB
Diff
160 lines
5.7 KiB
Diff
From 0d1c2f8258072148459d3114b9ccaf43c02e0958 Mon Sep 17 00:00:00 2001
|
|
From: Steeve Morin <steeve@zml.ai>
|
|
Date: Tue, 19 Nov 2024 16:14:14 +0100
|
|
Subject: [PATCH 1/2] backend/epoll: implement eventfd wakeup notification
|
|
|
|
Tries to mimic what happens in backend/kqueue.
|
|
|
|
Closes #4
|
|
---
|
|
src/backend/epoll.zig | 42 ++++++++++++++++++++++++++++++++++++++++++
|
|
1 file changed, 42 insertions(+)
|
|
|
|
diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig
|
|
index ae4ec7d..f44d326 100644
|
|
--- a/src/backend/epoll.zig
|
|
+++ b/src/backend/epoll.zig
|
|
@@ -21,6 +21,12 @@ pub const Loop = struct {
|
|
|
|
fd: posix.fd_t,
|
|
|
|
+ /// The eventfd that this epoll queue always has a filter for. Writing
|
|
+ /// an empty message to this eventfd can be used to wake up the loop
|
|
+ /// at any time. Waking up the loop via this eventfd won't trigger any
|
|
+ /// particular completion, it just forces tick to cycle.
|
|
+ eventfd: xev.Async,
|
|
+
|
|
/// The number of active completions. This DOES NOT include completions that
|
|
/// are queued in the submissions queue.
|
|
active: usize = 0,
|
|
@@ -56,8 +62,12 @@ pub const Loop = struct {
|
|
} = .{},
|
|
|
|
pub fn init(options: xev.Options) !Loop {
|
|
+ var eventfd = try xev.Async.init();
|
|
+ errdefer eventfd.deinit();
|
|
+
|
|
var res: Loop = .{
|
|
.fd = try posix.epoll_create1(std.os.linux.EPOLL.CLOEXEC),
|
|
+ .eventfd = eventfd,
|
|
.thread_pool = options.thread_pool,
|
|
.thread_pool_completions = undefined,
|
|
.cached_now = undefined,
|
|
@@ -68,6 +78,7 @@ pub const Loop = struct {
|
|
|
|
pub fn deinit(self: *Loop) void {
|
|
posix.close(self.fd);
|
|
+ self.eventfd.deinit();
|
|
}
|
|
|
|
/// Run the event loop. See RunMode documentation for details on modes.
|
|
@@ -262,9 +273,26 @@ pub const Loop = struct {
|
|
// Initialize
|
|
if (!self.flags.init) {
|
|
self.flags.init = true;
|
|
+
|
|
if (self.thread_pool != null) {
|
|
self.thread_pool_completions.init();
|
|
}
|
|
+
|
|
+ var ev: linux.epoll_event = .{
|
|
+ .events = linux.EPOLL.IN | linux.EPOLL.RDHUP,
|
|
+ .data = .{ .ptr = 0 },
|
|
+ };
|
|
+ posix.epoll_ctl(
|
|
+ self.fd,
|
|
+ linux.EPOLL.CTL_ADD,
|
|
+ self.eventfd.fd,
|
|
+ &ev,
|
|
+ ) catch |err| {
|
|
+ // We reset initialization because we can't do anything
|
|
+ // safely unless we get this mach port registered!
|
|
+ self.flags.init = false;
|
|
+ return err;
|
|
+ };
|
|
}
|
|
|
|
// Submit all the submissions. We copy the submission queue so that
|
|
@@ -369,6 +397,10 @@ pub const Loop = struct {
|
|
|
|
// Process all our events and invoke their completion handlers
|
|
for (events[0..n]) |ev| {
|
|
+ // Zero data values are internal events that we do nothing
|
|
+ // on such as the eventfd wakeup.
|
|
+ if (ev.data.ptr == 0) continue;
|
|
+
|
|
const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr)));
|
|
|
|
// We get the fd and mark this as in progress we can properly
|
|
@@ -415,6 +447,7 @@ pub const Loop = struct {
|
|
const pool = self.thread_pool orelse return error.ThreadPoolRequired;
|
|
|
|
// Setup our completion state so that thread_perform can do stuff
|
|
+ c.task_loop = self;
|
|
c.task_completions = &self.thread_pool_completions;
|
|
c.task = .{ .callback = Loop.thread_perform };
|
|
|
|
@@ -436,6 +469,14 @@ pub const Loop = struct {
|
|
|
|
// Add to our completion queue
|
|
c.task_completions.push(c);
|
|
+
|
|
+ // Wake up our main loop
|
|
+ c.task_loop.wakeup() catch {};
|
|
+ }
|
|
+
|
|
+ /// Sends an empty message to this loop's eventfd so that it wakes up.
|
|
+ fn wakeup(self: *Loop) !void {
|
|
+ try self.eventfd.notify();
|
|
}
|
|
|
|
fn start(self: *Loop, completion: *Completion) void {
|
|
@@ -800,6 +841,7 @@ pub const Completion = struct {
|
|
/// reliable way to get access to the loop and shouldn't be used
|
|
/// except internally.
|
|
task: ThreadPool.Task = undefined,
|
|
+ task_loop: *Loop = undefined,
|
|
task_completions: *Loop.TaskCompletionQueue = undefined,
|
|
task_result: Result = undefined,
|
|
|
|
|
|
From 38d4dbed71a732b0fc30c1181354ad9d53919402 Mon Sep 17 00:00:00 2001
|
|
From: Corentin Godeau <corentin@zml.ai>
|
|
Date: Tue, 14 Jan 2025 14:43:54 +0000
|
|
Subject: [PATCH 2/2] backend/epoll: read the wakeup eventfd to avoid being
|
|
awaken again
|
|
|
|
---
|
|
src/backend/epoll.zig | 11 +++++++----
|
|
1 file changed, 7 insertions(+), 4 deletions(-)
|
|
|
|
diff --git a/src/backend/epoll.zig b/src/backend/epoll.zig
|
|
index f44d326..f84c687 100644
|
|
--- a/src/backend/epoll.zig
|
|
+++ b/src/backend/epoll.zig
|
|
@@ -280,7 +280,7 @@ pub const Loop = struct {
|
|
|
|
var ev: linux.epoll_event = .{
|
|
.events = linux.EPOLL.IN | linux.EPOLL.RDHUP,
|
|
- .data = .{ .ptr = 0 },
|
|
+ .data = .{ .fd = self.eventfd.fd },
|
|
};
|
|
posix.epoll_ctl(
|
|
self.fd,
|
|
@@ -397,9 +397,12 @@ pub const Loop = struct {
|
|
|
|
// Process all our events and invoke their completion handlers
|
|
for (events[0..n]) |ev| {
|
|
- // Zero data values are internal events that we do nothing
|
|
- // on such as the eventfd wakeup.
|
|
- if (ev.data.ptr == 0) continue;
|
|
+ // Handle wakeup eventfd
|
|
+ if (ev.data.fd == self.eventfd.fd) {
|
|
+ var buffer: u64 = undefined;
|
|
+ _ = posix.read(self.eventfd.fd, std.mem.asBytes(&buffer)) catch {};
|
|
+ continue;
|
|
+ }
|
|
|
|
const c: *Completion = @ptrFromInt(@as(usize, @intCast(ev.data.ptr)));
|
|
|