rpmalloc-zig-port icon indicating copy to clipboard operation
rpmalloc-zig-port copied to clipboard

Cross-thread frees cause segfaults

Open joadnacer opened this issue 1 year ago • 1 comments

The following code takes n - number of threads as command line arg - if omitted defaults to 1

It then instantiates n allocating threads and n freeing threads, and transfers allocations from allocating threads to freeing threads via an mpmc queue, running infinitely in a while true loop

Running this using rpmalloc-zig causes a segfault very fast even for n=1

if you set use_rpmalloc=false, the c_allocator will be used and this is able to run infinitely for any number of threads without segfaulting

const std = @import("std");
const rp = @import("rpmalloc.zig");

const testing = std.testing;
const assert = std.debug.assert;
const Value = std.atomic.Value;

const queue_size = 1024;
const Queue = BoundedMpmcQueue([]u8, queue_size);

const alloc_size = 8;

// no segfault if false
const use_rpmalloc = true;

pub fn main() !void {
    const args = try std.process.argsAlloc(std.heap.page_allocator);
    defer std.process.argsFree(std.heap.page_allocator, args);

    const num_args = args.len - 1;

    if (num_args == 0) return try crossthread(1);

    const num_threads = try std.fmt.parseInt(u32, args[1], 10);

    try crossthread(num_threads);
}

pub fn crossthread(num_threads: u32) !void {
    const rpmalloc = rp.RPMalloc(.{});
    try rpmalloc.init(null, .{});

    const allocator = if (use_rpmalloc) rpmalloc.allocator() else std.heap.c_allocator;

    var queue = Queue.init();

    var workers: []std.Thread = try std.heap.page_allocator.alloc(std.Thread, num_threads * 2);
    defer std.heap.page_allocator.free(workers);

    for (0..num_threads) |i| {
        workers[i] = try std.Thread.spawn(.{}, threadAllocWorker, .{ allocator, &queue });
        workers[i * 2] = try std.Thread.spawn(.{}, threadFreeWorker, .{ allocator, &queue });
    }

    for (0..num_threads) |i| {
        workers[i].join();
    }
}

pub fn threadAllocWorker(allocator: std.mem.Allocator, queue: *Queue) !void {
    if (use_rpmalloc) try rp.RPMalloc(.{}).initThread();

    while (true) {
        const alloc = try allocator.alloc(u8, alloc_size);

        while (!queue.tryWrite(alloc)) {}
    }
}

pub fn threadFreeWorker(allocator: std.mem.Allocator, queue: *Queue) !void {
    while (true) {
        const alloc = queue.tryRead() orelse continue;

        allocator.free(alloc);
    }
}

/// Array based bounded multiple producer multiple consumer queue
/// This is a Zig port of Dmitry Vyukov's https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
pub fn BoundedMpmcQueue(comptime T: type, comptime buffer_size: usize) type {
    assert(buffer_size & (buffer_size - 1) == 0); // must be power of 2

    const buffer_mask = buffer_size - 1;

    const Cell = struct {
        sequence: Value(usize),
        data: T,
    };

    return struct {
        enqueue_pos: Value(usize) align(std.atomic.cache_line),
        dequeue_pos: Value(usize) align(std.atomic.cache_line),
        buffer: [buffer_size]Cell,

        const Self = @This();

        pub fn init() BoundedMpmcQueue(T, buffer_size) {
            var buf: [buffer_size]Cell = undefined;

            @setEvalBranchQuota(queue_size);
            for (&buf, 0..) |*cell, i| {
                cell.sequence = Value(usize).init(i);
            }

            return .{
                .enqueue_pos = Value(usize).init(0),
                .dequeue_pos = Value(usize).init(0),
                .buffer = buf,
            };
        }

        /// Attempts to write to the queue, without overwriting any data
        /// Returns `true` if the data is written, `false` if the queue was full
        pub fn tryWrite(self: *Self, data: T) bool {
            var pos = self.enqueue_pos.load(.Monotonic);

            var cell: *Cell = undefined;

            while (true) {
                cell = &self.buffer[pos & buffer_mask];
                const seq = cell.sequence.load(.Acquire);
                const diff = @as(i128, seq) - @as(i128, pos);

                if (diff == 0 and self.enqueue_pos.cmpxchgWeak(pos, pos + 1, .Monotonic, .Monotonic) == null) {
                    break;
                } else if (diff < 0) {
                    return false;
                } else {
                    pos = self.enqueue_pos.load(.Monotonic);
                }
            }

            cell.data = data;
            cell.sequence.store(pos + 1, .Release);

            return true;
        }

        /// Attempts to read and remove the head element of the queue
        /// Returns `null` if there was no element to read
        pub fn tryRead(self: *Self) ?T {
            var cell: *Cell = undefined;
            var pos = self.dequeue_pos.load(.Monotonic);

            while (true) {
                cell = &self.buffer[pos & buffer_mask];
                const seq = cell.sequence.load(.Acquire);
                const diff = @as(i128, seq) - @as(i128, (pos + 1));

                if (diff == 0 and self.dequeue_pos.cmpxchgWeak(pos, pos + 1, .Monotonic, .Monotonic) == null) {
                    break;
                } else if (diff < 0) {
                    return null;
                } else {
                    pos = self.dequeue_pos.load(.Monotonic);
                }
            }

            const res = cell.data;
            cell.sequence.store(pos + buffer_mask + 1, .Release);

            return res;
        }
    };
}

joadnacer avatar Apr 04 '24 17:04 joadnacer

pub fn threadAllocWorker(allocator: std.mem.Allocator, queue: *Queue) !void {
    if (use_rpmalloc) try rp.RPMalloc(.{}).initThread();

    while (true) {
        const alloc = try allocator.alloc(u8, alloc_size);

        while (!queue.tryWrite(alloc)) {}
    }
}

pub fn threadFreeWorker(allocator: std.mem.Allocator, queue: *Queue) !void {
    while (true) {
        const alloc = queue.tryRead() orelse continue;

        allocator.free(alloc);
    }
}

Have you tried calling initThread() and deinitThread() in both of these worker functions? I'm wondering if it might not be legal to access the allocator without calling initThread first, even if you are freeing.

swan-www avatar Jul 17 '25 22:07 swan-www