A Programmer-Friendly I/O Abstraction Over io_uring and kqueue

Consider this story of I/O and performance. We’ll start by blocking I/O, explore io_ureing and kqueue, and bring home an event loop similar to some software that may sound familiar to you.

This is a twist on King’s talk at Software You Can Love Milan ’22.

When you want to read from a file you can do this open() and then call read() As many times as necessary to fill the buffer of bytes from the file. and call in the opposite direction
write() As many times as necessary until everything is written. It is similar to a TCP client with sockets, but instead of
open() you call first socket() And then
connect() On your server. Funny stuff.

However in the real world you can’t always read everything you want from a file descriptor. Nor can you always write everything you want on a file descriptor.

You can switch the file descriptor to non-blocking mode so that the call does not block if the data you requested is not available. But system calls are still expensive, causing context switches and cache misses. In fact, networks and disks have become so fast that these costs can approach the cost of doing I/O. For the period when the file descriptor is unable to be read or written, you do not want to waste time making system calls constantly attempting to read or write.

So you switch to io_uring on Linux or kqueue on FreeBSD/macOS. (I’m skipping the generation of epoll/select users.) These APIs let you submit requests to the kernel to learn about readiness: when a file descriptor is ready to be read or written. You can send readiness requests in batches (also called queues). Completion events, for each submitted request, are available in a separate queue.

Being able to perform batch I/O like this is especially important for TCP servers that want to multiplex reads and writes to multiple connected clients.

However in io_uriing, you can even go a step further. instead of calling read() Or write() In userland after the readiness event, you can request that the kernel do
read() Or write() Same with the buffer you provided. Thus almost all of your I/O is done in the kernel, amortizing the overhead of system calls.

If you haven’t seen io_uring or kqueue before, you’ll probably want an example! Consider this code: a simple, minimal, non-production-ready TCP echo server.

const std = @import("std");
const os = std.os;
const linux = os.linux;
const allocator = std.heap.page_allocator;

const State = enum{ accept, recv, send };
const Socket = struct {
    handle: os.socket_t,
    buffer: [1024]u8,
    state: State,
};

pub fn main() !void {
    const entries = 32;
    const flags = 0;
    var ring = try linux.IO_Uring.init(entries, flags);
    defer ring.deinit();

    var server: Socket = undefined;
    server.handle = try os.socket(os.AF.INET, os.SOCK.STREAM, os.IPPROTO.TCP);
    defer os.closeSocket(server.handle);

    const port = 12345;
    var addr = std.net.Address.initIp4(.{127, 0, 0, 1}, port);
    var addr_len: os.socklen_t = addr.getOsSockLen();

    try os.setsockopt(server.handle, os.SOL.SOCKET, os.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try os.bind(server.handle, &addr.any, addr_len);
    const backlog = 128;
    try os.listen(server.handle, backlog);

    server.state = .accept;
    _ = try ring.accept(@ptrToInt(&server), server.handle, &addr.any, &addr_len, 0);

    while (true) {
        _ = try ring.submit_and_wait(1);

        while (ring.cq_ready() > 0) {
            const cqe = try ring.copy_cqe();
            var client = @intToPtr(*Socket, @intCast(usize, cqe.user_data));

            if (cqe.res < 0) std.debug.panic("{}({}): {}", .{
                client.state,
                client.handle,
                @intToEnum(os.E, -cqe.res),
            });

            switch (client.state) {
                .accept => {
                    client = try allocator.create(Socket);
                    client.handle = @intCast(os.socket_t, cqe.res);
                    client.state = .recv;
                    _ = try ring.recv(@ptrToInt(client), client.handle, .{.buffer = &client.buffer}, 0);
                    _ = try ring.accept(@ptrToInt(&server), server.handle, &addr.any, &addr_len, 0);
                },
                .recv => {
                    const read = @intCast(usize, cqe.res);
                    client.state = .send;
                    _ = try ring.send(@ptrToInt(client), client.handle, client.buffer[0..read], 0);
                },
                .send => {
                    os.closeSocket(client.handle);
                    allocator.destroy(client);
                },
            }
        }
    }
}

This is a great, minimalist example. But note that this code connects io_ure behavior directly to business logic (in this case, handling echo data between request and response). This is fine for a small example like this. But in a large application you may want to do I/O across the entire code base, not just in one place. You may not want to continue adding business logic to this single loop.

Instead, you’ll want to be able to schedule I/O and pass a callback (and sometimes along with some application context) when the event completes.

The interface may look like this:

io_dispatch.dispatch({
    // some big struct/union with relevant fields for all event types
}, my_callback);

This is great! Now your business logic can schedule and handle I/O, no matter where it is in the code base.

Under the hood it can decide whether to use io_uring or kqueue depending on which kernel it is running on. Dispatch can also batch these individual calls via io_ure or kqueue to refine system calls. The application no longer needs to know the details.

Additionally, we can use this wrapper to stop thinking only about I/O completion preparation events. That is, if we send a read event, the io_ure implementation will actually ask the kernel to read the data into the buffer. Whereas the kqueue implementation will send a “read” readiness event, perform the read back in userland, and then call our callback.

And finally, now that we’ve got this central dispatcher, we don’t need spaghetti code in the loop switching on every possible submission and completion event.

Every time we call io_uring or kqueue we both submit event requests and poll for completed events. The io_uring and kqueue APIs combine both of these actions into a single system call.

To sync our requests with io_uring or kqueue we will create a
flush Function that submits requests and polls for completion events. (In the next section we will talk about how the user of the central dispatch learns about closing events.)

to make flush More conveniently, we’ll create a nice wrapper around it so that we can submit as many requests as possible (and process as many completion events as possible). We will also enforce a time limit to avoid accidentally being blocked indefinitely. we will call the rapper
run_for_ns,

Ultimately we will put the user in charge of setting up a loop to call it run_for_ns Function, independent of normal program execution.

This is now your traditional event loop.

You might have noticed that in the above API we have passed a callback. The idea is that after the requested I/O is complete, our callback should be invoked. But the question still remains: how to track this callback between submission and completion queue?

Thankfully, the io_ure and kqueue events have user data fields. User data fields are opaque to the kernel. When the submitted event completes, the kernel sends a completed event back to userland containing the user data values ​​from the submitted event.

We can store the callback in the user data field by setting the callback to a pointer to the cast callback. When it comes to completion of a requested event, we cast back the callback pointer from an integer in the user data field. Then, we initiate the callback.

As described above, the structure for io_dispatch.dispatch
All the different types of I/O events and their arguments can be managed to a great extent. We can make our API a bit more expressive by creating wrapper functions for each event type.

So if we want to schedule a read function we can call:

io_dispatch.read(fd, &buf, nBytesToRead, callback);

Or to write, similarly:

io_dispatch.write(fd, buf, nBytesToWrite, callback);

Another thing we need to worry about is that the batch we pass to io_ureing or kqueue has a fixed size (technically, kqueue allows any batch size but using it may lead to unnecessary allocation). So we would create our own queue on top of our I/O abstraction to keep track of requests that we couldn’t immediately submit to io_reing or kqueue.

To keep this API simple we can make an allocation for each entry in the queue. or we can modify it io_dispatch.X A bit is done to accept a structure that can be used in an interleaved linked list to contain all request contexts, including callbacks. The latter is what we do at TigerBeetle.

In other words: every time the code is called io_dispatchWe will try to submit the requested event to io_uring or kqueue immediately. But if there is no space, we store the event in the overflow queue.

The overflow queue eventually needs to be processed, so we update our
flush Function to pull as many events as possible from our overflow queue before submitting the batch to io_uring or kqueue (described in the callback and reference above).

Now we’ve created something similar to libuv, the I/O library that Node.js uses. And if you squint, it’s basically TigerBeetle’s I/O library! (And interestingly, TigerBeetle’s I/O code was adopted into BUN! Open-source for the win!)

Let’s see how the Darwin version of TigerBeetle’s I/O library (with kqueue) differs from the Linux version. As mentioned, complete send In the Darwin implementation the call waits for the file descriptor to be prepared (via kqueue). Once ready, the actual send The call is returned to userland:

pub fn send(
    self: *IO,
    comptime Context: type,
    context: Context,
    comptime callback: fn (
        context: Context,
        completion: *Completion,
        result: SendError!usize,
    ) void,
    completion: *Completion,
    socket: os.socket_t,
    buffer: []const u8,
) void {
    self.submit(
        context,
        callback,
        completion,
        .send,
        .{
            .socket = socket,
            .buf = buffer.ptr,
            .len = @intCast(u32, buffer_limit(buffer.len)),
        },
        struct {
            fn do_operation(op: anytype) SendError!usize {
                return os.send(op.socket, op.buf[0..op.len], 0);
            }
        },
    );
}

Compare this to the Linux version (with io_ureing) where the kernel handles everything and there are no send system calls in userland:

pub fn send(
    self: *IO,
    comptime Context: type,
    context: Context,
    comptime callback: fn (
        context: Context,
        completion: *Completion,
        result: SendError!usize,
    ) void,
    completion: *Completion,
    socket: os.socket_t,
    buffer: []const u8,
) void {
    completion.* = .{
        .io = self,
        .context = context,
        .callback = struct {
            fn wrapper(ctx: ?*anyopaque, comp: *Completion, res: *const anyopaque) void {
                callback(
                    @intToPtr(Context, @ptrToInt(ctx)),
                    comp,
                    @intToPtr(*const SendError!usize, @ptrToInt(res)).*,
                );
            }
        }.wrapper,
        .operation = .{
            .send = .{
                .socket = socket,
                .buffer = buffer,
            },
        },
    };
    // Fill out a submission immediately if possible, otherwise adds to overflow buffer
    self.enqueue(completion);
}

look at it this way flush On Linux and macOS for event processing. See run_for_ns Users on Linux and macOS will have to make calls to the public API. And finally, see what all this brings into practice, loop calling run_for_ns In src/main.zig.

We’ve come this far and you might be wondering – what about cross-platform support for Windows? The good news is that Windows also has a completion based system similar to io_uring but without batching, called IOCP. And for bonus points, TigerBeetle offers similar I/O abstraction at that! But it is enough to cover only Linux and macOS in this post. ,

In both this blog post and TigerBeetle, we have implemented a single-threaded event loop. It is beneficial to keep I/O code in userspace single-threaded (whether I/O processing in the kernel is single-threaded or not is not our concern). This is the simplest code and is best for workloads that are not embarrassingly parallel. It is also best suited for determinism, which is integral to TigerBeetle’s design as it enables us to perform deterministic simulation tests

But there are other valid architectures for other workloads.

For workloads that are embarrassingly parallel, like multiple web servers, you can instead use multiple threads where each thread has its own queue. Under optimal conditions, this architecture has the highest I/O throughput possible.

But if each thread has its own queue, different threads may become starved if unequal amounts of work are scheduled on one thread. In case of dynamic volume of work, a better architecture would be to have a single queue but multiple worker threads doing the work available on the queue.

Hey, maybe we’ll split it up so you can use it too. It is written in Zig so we can easily expose the C API. Any language (ie every language) with a C foreign function interface should work well with it. Keep an eye on our GitHub. ,

additional resources:



<a href

Leave a Comment