workspace: bump libxev to 20250222.0-07bcffa and introduce Dynamic to select between IO Uring and Epoll on Linux based on availability

This commit is contained in:
Tarry Singh 2024-06-18 14:26:44 +00:00
parent 18eb0e5a7b
commit 5bcaf374c8
8 changed files with 106 additions and 35 deletions

View File

@ -9,7 +9,7 @@ bazel_dep(name = "aspect_bazel_lib", version = "2.11.0")
bazel_dep(name = "aspect_rules_py", version = "1.3.1") bazel_dep(name = "aspect_rules_py", version = "1.3.1")
bazel_dep(name = "bazel_skylib", version = "1.7.1") bazel_dep(name = "bazel_skylib", version = "1.7.1")
bazel_dep(name = "hermetic_cc_toolchain", version = "3.1.1") bazel_dep(name = "hermetic_cc_toolchain", version = "3.1.1")
bazel_dep(name = "libxev", version = "20250124.0-31eed4e") bazel_dep(name = "libxev", version = "20250222.0-07bcffa")
bazel_dep(name = "llvm-raw", version = "20250217.0-0e779ad") bazel_dep(name = "llvm-raw", version = "20250217.0-0e779ad")
bazel_dep(name = "patchelf", version = "0.18.0") bazel_dep(name = "patchelf", version = "0.18.0")
bazel_dep(name = "pcre2", version = "10.43") bazel_dep(name = "pcre2", version = "10.43")

View File

@ -1,12 +1,14 @@
const std = @import("std"); const std = @import("std");
const stdx = @import("stdx"); const stdx = @import("stdx");
const xev = @import("xev"); const xev = @import("xev").Dynamic;
const coro = @import("coro.zig"); const coro = @import("coro.zig");
const executor = @import("executor.zig"); const executor = @import("executor.zig");
const channel_mod = @import("channel.zig"); const channel_mod = @import("channel.zig");
const aio = @import("asyncio.zig"); const aio = @import("asyncio.zig");
const stack = @import("stack.zig"); const stack = @import("stack.zig");
const XevThreadPool = @import("xev").ThreadPool;
pub const Condition = struct { pub const Condition = struct {
inner: executor.Condition, inner: executor.Condition,
@ -71,13 +73,13 @@ pub fn callBlocking(comptime func: anytype, args: anytype) stdx.meta.FnSignature
const TaskT = struct { const TaskT = struct {
const Self = @This(); const Self = @This();
_task: xev.ThreadPool.Task = .{ .callback = &Self.run }, _task: XevThreadPool.Task = .{ .callback = &Self.run },
event: threading.ResetEventSingle = .{}, event: threading.ResetEventSingle = .{},
args: Signature.ArgsT, args: Signature.ArgsT,
result: Signature.ReturnT = undefined, result: Signature.ReturnT = undefined,
pub fn run(task_: *xev.ThreadPool.Task) void { pub fn run(task_: *XevThreadPool.Task) void {
const task: *Self = @alignCast(@fieldParentPtr("_task", task_)); const task: *Self = @alignCast(@fieldParentPtr("_task", task_));
task.result = @call(.auto, func, task.args); task.result = @call(.auto, func, task.args);
task.event.set(); task.event.set();
@ -87,7 +89,7 @@ pub fn callBlocking(comptime func: anytype, args: anytype) stdx.meta.FnSignature
var newtask: TaskT = .{ var newtask: TaskT = .{
.args = args, .args = args,
}; };
AsyncThread.current.thread_pool.schedule(xev.ThreadPool.Batch.from(&newtask._task)); AsyncThread.current.thread_pool.schedule(XevThreadPool.Batch.from(&newtask._task));
newtask.event.wait(); newtask.event.wait();
return newtask.result; return newtask.result;
@ -159,7 +161,7 @@ pub const AsyncThread = struct {
executor: *aio.Executor, executor: *aio.Executor,
stack_allocator: *stack.StackAllocator, stack_allocator: *stack.StackAllocator,
loop: *xev.Loop, loop: *xev.Loop,
thread_pool: *xev.ThreadPool, thread_pool: *XevThreadPool,
async_notifier: *xev.Async, async_notifier: *xev.Async,
waiters_queue: *threading.WaiterQueue, waiters_queue: *threading.WaiterQueue,
@ -175,7 +177,7 @@ pub const AsyncThread = struct {
} }
pub fn main(allocator: std.mem.Allocator, comptime mainFunc: fn () anyerror!void) !void { pub fn main(allocator: std.mem.Allocator, comptime mainFunc: fn () anyerror!void) !void {
var thread_pool = xev.ThreadPool.init(.{}); var thread_pool = XevThreadPool.init(.{});
defer { defer {
thread_pool.shutdown(); thread_pool.shutdown();
thread_pool.deinit(); thread_pool.deinit();
@ -245,18 +247,22 @@ pub const File = struct {
getEndPos, getEndPos,
); );
_handle: std.fs.File.Handle,
inner: aio.File, inner: aio.File,
fn asFile(self: File) std.fs.File { fn asFile(self: File) std.fs.File {
return .{ .handle = self.inner.file.fd }; return .{ .handle = self._handle };
} }
pub fn handle(self: File) std.fs.File.Handle { pub fn handle(self: File) std.fs.File.Handle {
return self.inner.file.fd; return self._handle;
} }
pub fn init(file_: std.fs.File) !File { pub fn init(file_: std.fs.File) !File {
return .{ .inner = aio.File.init(AsyncThread.current.executor, try xev.File.init(file_)) }; return .{
._handle = file_.handle,
.inner = aio.File.init(AsyncThread.current.executor, try xev.File.init(file_)),
};
} }
pub fn open(path: []const u8, flags: std.fs.File.OpenFlags) !File { pub fn open(path: []const u8, flags: std.fs.File.OpenFlags) !File {

View File

@ -1,6 +1,6 @@
const std = @import("std"); const std = @import("std");
const stdx = @import("stdx"); const stdx = @import("stdx");
const xev = @import("xev"); const xev = @import("xev").Dynamic;
const libcoro = @import("coro.zig"); const libcoro = @import("coro.zig");
const CoroExecutor = @import("executor.zig").Executor; const CoroExecutor = @import("executor.zig").Executor;
@ -115,11 +115,10 @@ pub const TCP = struct {
exec: *Executor, exec: *Executor,
tcp: xev.TCP, tcp: xev.TCP,
pub usingnamespace Stream(Self, xev.TCP, .{ pub usingnamespace Closeable(Self, xev.TCP);
.close = true, pub usingnamespace Pollable(Self, xev.TCP);
.read = .recv, pub usingnamespace Readable(Self, xev.TCP);
.write = .send, pub usingnamespace Writeable(Self, xev.TCP);
});
pub fn init(exec: *Executor, tcp: xev.TCP) Self { pub fn init(exec: *Executor, tcp: xev.TCP) Self {
return .{ .exec = exec, .tcp = tcp }; return .{ .exec = exec, .tcp = tcp };
@ -214,14 +213,6 @@ pub const TCP = struct {
} }
}; };
fn Stream(comptime T: type, comptime StreamT: type, comptime options: xev.stream.Options) type {
return struct {
pub usingnamespace if (options.close) Closeable(T, StreamT) else struct {};
pub usingnamespace if (options.read != .none) Readable(T, StreamT) else struct {};
pub usingnamespace if (options.write != .none) Writeable(T, StreamT) else struct {};
};
}
fn Closeable(comptime T: type, comptime StreamT: type) type { fn Closeable(comptime T: type, comptime StreamT: type) type {
return struct { return struct {
const Self = T; const Self = T;
@ -262,6 +253,46 @@ fn Closeable(comptime T: type, comptime StreamT: type) type {
}; };
} }
fn Pollable(comptime T: type, comptime StreamT: type) type {
return struct {
const Self = T;
const PollResult = xev.PollError!void;
pub fn poll(self: Self) !void {
const ResultT = PollResult;
const Data = struct {
result: ResultT = undefined,
frame: ?Frame = null,
fn callback(
userdata: ?*@This(),
l: *xev.Loop,
c: *xev.Completion,
s: StreamT,
result: ResultT,
) xev.CallbackAction {
_ = l;
_ = c;
_ = s;
const data = userdata.?;
data.result = result;
if (data.frame != null) libcoro.xresume(data.frame.?);
return .disarm;
}
};
var data: Data = .{ .frame = libcoro.xframe() };
const loop = self.exec.loop;
var c: xev.Completion = .{};
self.stream().poll(loop, &c, Data, &data, &Data.callback);
try waitForCompletion(self.exec, &c);
return data.result;
}
};
}
fn Readable(comptime T: type, comptime StreamT: type) type { fn Readable(comptime T: type, comptime StreamT: type) type {
return struct { return struct {
const Self = T; const Self = T;
@ -351,12 +382,10 @@ pub const File = struct {
exec: *Executor, exec: *Executor,
file: xev.File, file: xev.File,
pub usingnamespace Stream(Self, xev.File, .{ pub usingnamespace Closeable(Self, xev.File);
.close = true, pub usingnamespace Pollable(Self, xev.File);
.read = .read, pub usingnamespace Readable(Self, xev.File);
.write = .write, pub usingnamespace Writeable(Self, xev.File);
.threadpool = true,
});
pub fn init(exec: *Executor, file: xev.File) Self { pub fn init(exec: *Executor, file: xev.File) Self {
return .{ .exec = exec, .file = file }; return .{ .exec = exec, .file = file };
@ -497,11 +526,10 @@ pub const UDP = struct {
exec: *Executor, exec: *Executor,
udp: xev.UDP, udp: xev.UDP,
pub usingnamespace Stream(Self, xev.UDP, .{ pub usingnamespace Closeable(Self, xev.TCP);
.close = true, pub usingnamespace Pollable(Self, xev.TCP);
.read = .none, pub usingnamespace Readable(Self, xev.TCP);
.write = .none, pub usingnamespace Writeable(Self, xev.TCP);
});
pub fn init(exec: *Executor, udp: xev.UDP) Self { pub fn init(exec: *Executor, udp: xev.UDP) Self {
return .{ .exec = exec, .udp = udp }; return .{ .exec = exec, .udp = udp };

View File

@ -0,0 +1,7 @@
module(
name = "libxev",
version = "20250222.0-07bcffa",
compatibility_level = 1,
)
bazel_dep(name = "rules_zig", version = "20240904.0-010da15")

View File

@ -0,0 +1,13 @@
load("@rules_zig//zig:defs.bzl", "zig_library")
zig_library(
name = "xev",
srcs = glob([
"src/*.zig",
"src/backend/*.zig",
"src/linux/*.zig",
"src/watcher/*.zig",
]),
main = "src/main.zig",
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,7 @@
module(
name = "libxev",
version = "20250222.0-07bcffa",
compatibility_level = 1,
)
bazel_dep(name = "rules_zig", version = "20240904.0-010da15")

View File

@ -0,0 +1,9 @@
{
"strip_prefix": "libxev-07bcffa0f63054152a9883baa42bce5faad297e6",
"url": "https://github.com/mitchellh/libxev/archive/07bcffa0f63054152a9883baa42bce5faad297e6.tar.gz",
"integrity": "sha256-H80IoKcTeJFwpL5i4S9zyMuyWaQCK173ppoAO/CYXjc=",
"overlay": {
"MODULE.bazel": "",
"BUILD.bazel": ""
}
}

View File

@ -18,6 +18,7 @@
"20241208.1-db6a52b", "20241208.1-db6a52b",
"20241208.2-db6a52b", "20241208.2-db6a52b",
"20250124.0-31eed4e", "20250124.0-31eed4e",
"20250222.0-07bcffa"
], ],
"yanked_versions": {} "yanked_versions": {}
} }