Cross-thread frees cause segfaults
The following code takes n - number of threads as command line arg - if omitted defaults to 1
It then instantiates n allocating threads and n freeing threads, and transfers allocations from allocating threads to freeing threads via an mpmc queue, running infinitely in a while true loop
Running this using rpmalloc-zig causes a segfault very fast even for n=1
if you set use_rpmalloc=false, the c_allocator will be used and this is able to run infinitely for any number of threads without segfaulting
const std = @import("std");
const rp = @import("rpmalloc.zig");
const testing = std.testing;
const assert = std.debug.assert;
const Value = std.atomic.Value;
const queue_size = 1024;
const Queue = BoundedMpmcQueue([]u8, queue_size);
const alloc_size = 8;
// no segfault if false
const use_rpmalloc = true;
pub fn main() !void {
const args = try std.process.argsAlloc(std.heap.page_allocator);
defer std.process.argsFree(std.heap.page_allocator, args);
const num_args = args.len - 1;
if (num_args == 0) return try crossthread(1);
const num_threads = try std.fmt.parseInt(u32, args[1], 10);
try crossthread(num_threads);
}
pub fn crossthread(num_threads: u32) !void {
const rpmalloc = rp.RPMalloc(.{});
try rpmalloc.init(null, .{});
const allocator = if (use_rpmalloc) rpmalloc.allocator() else std.heap.c_allocator;
var queue = Queue.init();
var workers: []std.Thread = try std.heap.page_allocator.alloc(std.Thread, num_threads * 2);
defer std.heap.page_allocator.free(workers);
for (0..num_threads) |i| {
workers[i] = try std.Thread.spawn(.{}, threadAllocWorker, .{ allocator, &queue });
workers[i * 2] = try std.Thread.spawn(.{}, threadFreeWorker, .{ allocator, &queue });
}
for (0..num_threads) |i| {
workers[i].join();
}
}
pub fn threadAllocWorker(allocator: std.mem.Allocator, queue: *Queue) !void {
if (use_rpmalloc) try rp.RPMalloc(.{}).initThread();
while (true) {
const alloc = try allocator.alloc(u8, alloc_size);
while (!queue.tryWrite(alloc)) {}
}
}
pub fn threadFreeWorker(allocator: std.mem.Allocator, queue: *Queue) !void {
while (true) {
const alloc = queue.tryRead() orelse continue;
allocator.free(alloc);
}
}
/// Array based bounded multiple producer multiple consumer queue
/// This is a Zig port of Dmitry Vyukov's https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
pub fn BoundedMpmcQueue(comptime T: type, comptime buffer_size: usize) type {
assert(buffer_size & (buffer_size - 1) == 0); // must be power of 2
const buffer_mask = buffer_size - 1;
const Cell = struct {
sequence: Value(usize),
data: T,
};
return struct {
enqueue_pos: Value(usize) align(std.atomic.cache_line),
dequeue_pos: Value(usize) align(std.atomic.cache_line),
buffer: [buffer_size]Cell,
const Self = @This();
pub fn init() BoundedMpmcQueue(T, buffer_size) {
var buf: [buffer_size]Cell = undefined;
@setEvalBranchQuota(queue_size);
for (&buf, 0..) |*cell, i| {
cell.sequence = Value(usize).init(i);
}
return .{
.enqueue_pos = Value(usize).init(0),
.dequeue_pos = Value(usize).init(0),
.buffer = buf,
};
}
/// Attempts to write to the queue, without overwriting any data
/// Returns `true` if the data is written, `false` if the queue was full
pub fn tryWrite(self: *Self, data: T) bool {
var pos = self.enqueue_pos.load(.Monotonic);
var cell: *Cell = undefined;
while (true) {
cell = &self.buffer[pos & buffer_mask];
const seq = cell.sequence.load(.Acquire);
const diff = @as(i128, seq) - @as(i128, pos);
if (diff == 0 and self.enqueue_pos.cmpxchgWeak(pos, pos + 1, .Monotonic, .Monotonic) == null) {
break;
} else if (diff < 0) {
return false;
} else {
pos = self.enqueue_pos.load(.Monotonic);
}
}
cell.data = data;
cell.sequence.store(pos + 1, .Release);
return true;
}
/// Attempts to read and remove the head element of the queue
/// Returns `null` if there was no element to read
pub fn tryRead(self: *Self) ?T {
var cell: *Cell = undefined;
var pos = self.dequeue_pos.load(.Monotonic);
while (true) {
cell = &self.buffer[pos & buffer_mask];
const seq = cell.sequence.load(.Acquire);
const diff = @as(i128, seq) - @as(i128, (pos + 1));
if (diff == 0 and self.dequeue_pos.cmpxchgWeak(pos, pos + 1, .Monotonic, .Monotonic) == null) {
break;
} else if (diff < 0) {
return null;
} else {
pos = self.dequeue_pos.load(.Monotonic);
}
}
const res = cell.data;
cell.sequence.store(pos + buffer_mask + 1, .Release);
return res;
}
};
}
pub fn threadAllocWorker(allocator: std.mem.Allocator, queue: *Queue) !void {
if (use_rpmalloc) try rp.RPMalloc(.{}).initThread();
while (true) {
const alloc = try allocator.alloc(u8, alloc_size);
while (!queue.tryWrite(alloc)) {}
}
}
pub fn threadFreeWorker(allocator: std.mem.Allocator, queue: *Queue) !void {
while (true) {
const alloc = queue.tryRead() orelse continue;
allocator.free(alloc);
}
}
Have you tried calling initThread() and deinitThread() in both of these worker functions? I'm wondering if it might not be legal to access the allocator without calling initThread first, even if you are freeing.