diff --git a/MODULE.bazel b/MODULE.bazel index 29489a0..67a9bc8 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -9,7 +9,7 @@ bazel_dep(name = "aspect_bazel_lib", version = "2.11.0") bazel_dep(name = "aspect_rules_py", version = "1.3.1") bazel_dep(name = "bazel_skylib", version = "1.7.1") bazel_dep(name = "hermetic_cc_toolchain", version = "3.1.1") -bazel_dep(name = "libxev", version = "20250124.0-31eed4e") +bazel_dep(name = "libxev", version = "20250222.0-07bcffa") bazel_dep(name = "llvm-raw", version = "20250217.0-0e779ad") bazel_dep(name = "patchelf", version = "0.18.0") bazel_dep(name = "pcre2", version = "10.43") diff --git a/async/async.zig b/async/async.zig index 79026da..588b9e3 100644 --- a/async/async.zig +++ b/async/async.zig @@ -1,12 +1,14 @@ const std = @import("std"); const stdx = @import("stdx"); -const xev = @import("xev"); +const xev = @import("xev").Dynamic; const coro = @import("coro.zig"); const executor = @import("executor.zig"); const channel_mod = @import("channel.zig"); const aio = @import("asyncio.zig"); const stack = @import("stack.zig"); +const XevThreadPool = @import("xev").ThreadPool; + pub const Condition = struct { inner: executor.Condition, @@ -71,13 +73,13 @@ pub fn callBlocking(comptime func: anytype, args: anytype) stdx.meta.FnSignature const TaskT = struct { const Self = @This(); - _task: xev.ThreadPool.Task = .{ .callback = &Self.run }, + _task: XevThreadPool.Task = .{ .callback = &Self.run }, event: threading.ResetEventSingle = .{}, args: Signature.ArgsT, result: Signature.ReturnT = undefined, - pub fn run(task_: *xev.ThreadPool.Task) void { + pub fn run(task_: *XevThreadPool.Task) void { const task: *Self = @alignCast(@fieldParentPtr("_task", task_)); task.result = @call(.auto, func, task.args); task.event.set(); @@ -87,7 +89,7 @@ pub fn callBlocking(comptime func: anytype, args: anytype) stdx.meta.FnSignature var newtask: TaskT = .{ .args = args, }; - AsyncThread.current.thread_pool.schedule(xev.ThreadPool.Batch.from(&newtask._task)); + AsyncThread.current.thread_pool.schedule(XevThreadPool.Batch.from(&newtask._task)); newtask.event.wait(); return newtask.result; @@ -159,7 +161,7 @@ pub const AsyncThread = struct { executor: *aio.Executor, stack_allocator: *stack.StackAllocator, loop: *xev.Loop, - thread_pool: *xev.ThreadPool, + thread_pool: *XevThreadPool, async_notifier: *xev.Async, waiters_queue: *threading.WaiterQueue, @@ -175,7 +177,7 @@ pub const AsyncThread = struct { } pub fn main(allocator: std.mem.Allocator, comptime mainFunc: fn () anyerror!void) !void { - var thread_pool = xev.ThreadPool.init(.{}); + var thread_pool = XevThreadPool.init(.{}); defer { thread_pool.shutdown(); thread_pool.deinit(); @@ -245,18 +247,22 @@ pub const File = struct { getEndPos, ); + _handle: std.fs.File.Handle, inner: aio.File, fn asFile(self: File) std.fs.File { - return .{ .handle = self.inner.file.fd }; + return .{ .handle = self._handle }; } pub fn handle(self: File) std.fs.File.Handle { - return self.inner.file.fd; + return self._handle; } pub fn init(file_: std.fs.File) !File { - return .{ .inner = aio.File.init(AsyncThread.current.executor, try xev.File.init(file_)) }; + return .{ + ._handle = file_.handle, + .inner = aio.File.init(AsyncThread.current.executor, try xev.File.init(file_)), + }; } pub fn open(path: []const u8, flags: std.fs.File.OpenFlags) !File { diff --git a/async/asyncio.zig b/async/asyncio.zig index ab0fa86..c6d2535 100644 --- a/async/asyncio.zig +++ b/async/asyncio.zig @@ -1,6 +1,6 @@ const std = @import("std"); const stdx = @import("stdx"); -const xev = @import("xev"); +const xev = @import("xev").Dynamic; const libcoro = @import("coro.zig"); const CoroExecutor = @import("executor.zig").Executor; @@ -115,11 +115,10 @@ pub const TCP = struct { exec: *Executor, tcp: xev.TCP, - pub usingnamespace Stream(Self, xev.TCP, .{ - .close = true, - .read = .recv, - .write = .send, - }); + pub usingnamespace Closeable(Self, xev.TCP); + pub usingnamespace Pollable(Self, xev.TCP); + pub usingnamespace Readable(Self, xev.TCP); + pub usingnamespace Writeable(Self, xev.TCP); pub fn init(exec: *Executor, tcp: xev.TCP) Self { return .{ .exec = exec, .tcp = tcp }; @@ -214,14 +213,6 @@ pub const TCP = struct { } }; -fn Stream(comptime T: type, comptime StreamT: type, comptime options: xev.stream.Options) type { - return struct { - pub usingnamespace if (options.close) Closeable(T, StreamT) else struct {}; - pub usingnamespace if (options.read != .none) Readable(T, StreamT) else struct {}; - pub usingnamespace if (options.write != .none) Writeable(T, StreamT) else struct {}; - }; -} - fn Closeable(comptime T: type, comptime StreamT: type) type { return struct { const Self = T; @@ -262,6 +253,46 @@ fn Closeable(comptime T: type, comptime StreamT: type) type { }; } +fn Pollable(comptime T: type, comptime StreamT: type) type { + return struct { + const Self = T; + const PollResult = xev.PollError!void; + pub fn poll(self: Self) !void { + const ResultT = PollResult; + const Data = struct { + result: ResultT = undefined, + frame: ?Frame = null, + + fn callback( + userdata: ?*@This(), + l: *xev.Loop, + c: *xev.Completion, + s: StreamT, + result: ResultT, + ) xev.CallbackAction { + _ = l; + _ = c; + _ = s; + const data = userdata.?; + data.result = result; + if (data.frame != null) libcoro.xresume(data.frame.?); + return .disarm; + } + }; + + var data: Data = .{ .frame = libcoro.xframe() }; + + const loop = self.exec.loop; + var c: xev.Completion = .{}; + self.stream().poll(loop, &c, Data, &data, &Data.callback); + + try waitForCompletion(self.exec, &c); + + return data.result; + } + }; +} + fn Readable(comptime T: type, comptime StreamT: type) type { return struct { const Self = T; @@ -351,12 +382,10 @@ pub const File = struct { exec: *Executor, file: xev.File, - pub usingnamespace Stream(Self, xev.File, .{ - .close = true, - .read = .read, - .write = .write, - .threadpool = true, - }); + pub usingnamespace Closeable(Self, xev.File); + pub usingnamespace Pollable(Self, xev.File); + pub usingnamespace Readable(Self, xev.File); + pub usingnamespace Writeable(Self, xev.File); pub fn init(exec: *Executor, file: xev.File) Self { return .{ .exec = exec, .file = file }; @@ -497,11 +526,10 @@ pub const UDP = struct { exec: *Executor, udp: xev.UDP, - pub usingnamespace Stream(Self, xev.UDP, .{ - .close = true, - .read = .none, - .write = .none, - }); + pub usingnamespace Closeable(Self, xev.TCP); + pub usingnamespace Pollable(Self, xev.TCP); + pub usingnamespace Readable(Self, xev.TCP); + pub usingnamespace Writeable(Self, xev.TCP); pub fn init(exec: *Executor, udp: xev.UDP) Self { return .{ .exec = exec, .udp = udp }; diff --git a/third_party/modules/libxev/20250222.0-07bcffa/MODULE.bazel b/third_party/modules/libxev/20250222.0-07bcffa/MODULE.bazel new file mode 100644 index 0000000..424792e --- /dev/null +++ b/third_party/modules/libxev/20250222.0-07bcffa/MODULE.bazel @@ -0,0 +1,7 @@ +module( + name = "libxev", + version = "20250222.0-07bcffa", + compatibility_level = 1, +) + +bazel_dep(name = "rules_zig", version = "20240904.0-010da15") diff --git a/third_party/modules/libxev/20250222.0-07bcffa/overlay/BUILD.bazel b/third_party/modules/libxev/20250222.0-07bcffa/overlay/BUILD.bazel new file mode 100644 index 0000000..872ef7f --- /dev/null +++ b/third_party/modules/libxev/20250222.0-07bcffa/overlay/BUILD.bazel @@ -0,0 +1,13 @@ +load("@rules_zig//zig:defs.bzl", "zig_library") + +zig_library( + name = "xev", + srcs = glob([ + "src/*.zig", + "src/backend/*.zig", + "src/linux/*.zig", + "src/watcher/*.zig", + ]), + main = "src/main.zig", + visibility = ["//visibility:public"], +) diff --git a/third_party/modules/libxev/20250222.0-07bcffa/overlay/MODULE.bazel b/third_party/modules/libxev/20250222.0-07bcffa/overlay/MODULE.bazel new file mode 100644 index 0000000..424792e --- /dev/null +++ b/third_party/modules/libxev/20250222.0-07bcffa/overlay/MODULE.bazel @@ -0,0 +1,7 @@ +module( + name = "libxev", + version = "20250222.0-07bcffa", + compatibility_level = 1, +) + +bazel_dep(name = "rules_zig", version = "20240904.0-010da15") diff --git a/third_party/modules/libxev/20250222.0-07bcffa/source.json b/third_party/modules/libxev/20250222.0-07bcffa/source.json new file mode 100644 index 0000000..1483ad3 --- /dev/null +++ b/third_party/modules/libxev/20250222.0-07bcffa/source.json @@ -0,0 +1,9 @@ +{ + "strip_prefix": "libxev-07bcffa0f63054152a9883baa42bce5faad297e6", + "url": "https://github.com/mitchellh/libxev/archive/07bcffa0f63054152a9883baa42bce5faad297e6.tar.gz", + "integrity": "sha256-H80IoKcTeJFwpL5i4S9zyMuyWaQCK173ppoAO/CYXjc=", + "overlay": { + "MODULE.bazel": "", + "BUILD.bazel": "" + } +} diff --git a/third_party/modules/libxev/metadata.json b/third_party/modules/libxev/metadata.json index 5e355c1..90c1a0e 100644 --- a/third_party/modules/libxev/metadata.json +++ b/third_party/modules/libxev/metadata.json @@ -18,6 +18,7 @@ "20241208.1-db6a52b", "20241208.2-db6a52b", "20250124.0-31eed4e", + "20250222.0-07bcffa" ], "yanked_versions": {} }