pipe_push blocking
Any way of blocking pipe_push if the pipe is full?
As far as I know, pipe_push does block if full. Do you have an example showing it doesn't?
Hi @cgaebel,
Here's an example where pipe_push doesn't block:
#include <stdio.h>
#include <stdlib.h>
#include "pipe.h"
int main() {
pipe_t* pipe = pipe_new(sizeof(char), 10);
pipe_producer_t* p = pipe_producer_new(pipe);
char a[2000];
for (int i = 0; i < 20; i++)
pipe_push(p, a+i, 1);
printf("Didn't block\n");
return 0;
}
Furthermore, if you increase the limit in the code above, for example to 100:
...
pipe_t* pipe = pipe_new(sizeof(char), 100);
...
for (int i = 0; i < 200; i++)
...
Then this assertion is thrown:
push_block: pipe.c:518: check_invariants: Assertion `in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)' failed.
Any news? Thanks.
I reiterate that the example above does not block when built against master. When built against single_mutex and lock_free branches the example above blocks, however.
Was this going to be fixed?
I found the issue is caused by there is a minimum capacity of the pipe which is 32, defined by DEFAULT_MINCAP. So even when the test code above sets the limit to 10, it's changed to 32 in pipe_new(). So if you change the push count from 20 to a number larger than 32, it will block as expected.
It still issues an assertion error even if using larger value (for example 32). I've modified the unit test:
DEF_TEST(issue_5)
{
// static const int NUM = 32;
#define NUM 64
pipe_t* pipe = pipe_new(sizeof(int), 32);
pipe_producer_t* p = pipe_producer_new(pipe);
pipe_consumer_t* c = pipe_consumer_new(pipe);
pipe_free(pipe);
int data[NUM];
for(int i=0; i < NUM; ++i)
{
data[i] = i;
pipe_push(p, data, 1);
}
// pipe_push(p, data, NUM);
pipe_producer_free(p);
int buf[NUM];
size_t ret = pipe_pop(c, buf, NUM);
assert(ret == NUM);
for(int i=0; i < NUM; ++i)
assert(buf[i] == data[i]);
pipe_consumer_free(c);
}
Stack failure:
233 pipe_t* pipe = pipe_new(sizeof(int), 32);
(gdb) c
Continuing.
pipe_test: pipe.c:519: void check_invariants(pipe_t *): Assertion `in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)' failed.
issue_5 ->
Program received signal SIGABRT, Aborted.
__GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
51 ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0 __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
#1 0x00007ffff734942a in __GI_abort () at abort.c:89
#2 0x00007ffff7340e67 in __assert_fail_base (fmt=<optimized out>,
assertion=assertion@entry=0x403e35 "in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)",
file=file@entry=0x403b7f "pipe.c", line=line@entry=519,
function=function@entry=0x403c9b "void check_invariants(pipe_t *)") at assert.c:92
#3 0x00007ffff7340f12 in __GI___assert_fail (
assertion=0x403e35 "in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)", file=0x403b7f "pipe.c", line=519,
function=0x403c9b "void check_invariants(pipe_t *)") at assert.c:101
#4 0x0000000000401d24 in check_invariants (p=0x606420) at pipe.c:519
#5 0x0000000000403289 in resize_buffer (p=0x606420, new_size=256) at pipe.c:762
#6 0x00000000004028b9 in validate_size (p=0x606420, s=..., new_bytes=4) at pipe.c:786
#7 0x0000000000402351 in __pipe_push (p=0x606420, elems=0x7fffffffe1d0, count=4) at pipe.c:883
#8 0x0000000000402b7f in pipe_push (p=0x606420, elems=0x7fffffffe1d0, count=4) at pipe.c:911
#9 0x000000000040139a in test_issue_5 () at pipe_test.c:242
#10 0x00000000004012ea in pipe_run_test_suite () at pipe_test.c:317
#11 0x0000000000400b84 in main () at main.c:5
(gdb) fr 9
#9 0x000000000040139a in test_issue_5 () at pipe_test.c:242
242 pipe_push(p, data, 1);
(gdb) p i
$19 = 31
The following patch:
- fixes the assertion and non-blocking problem if a non-zero value is used when creating pipe with
pipe_new - can reduce the mincap to less than DEFAULT_MINCAP with
pipe_reserve
diff --git a/pipe.c b/pipe.c
index 9af421a..e3c8c26 100644
--- a/pipe.c
+++ b/pipe.c
@@ -514,7 +514,7 @@ static inline void check_invariants(pipe_t* p)
if(s.begin == s.end)
assertume(bytes_in_use(s) == capacity(s));
- assertume(in_bounds(DEFAULT_MINCAP*p->elem_size, p->min_cap, p->max_cap));
+ //assertume(in_bounds(DEFAULT_MINCAP*p->elem_size, p->min_cap, p->max_cap));
assertume(in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap));
}
@@ -743,6 +743,8 @@ static snapshot_t resize_buffer(pipe_t* p, size_t new_size)
elem_size = __pipe_elem_size(p);
assertume(new_size >= bytes_in_use(make_snapshot(p)));
+ assertume(new_size + elem_size > new_size); // overflow
+ new_size += elem_size; // include sentinel
if(unlikely(new_size >= max_cap))
new_size = max_cap;
@@ -750,13 +752,13 @@ static snapshot_t resize_buffer(pipe_t* p, size_t new_size)
if(new_size <= min_cap)
return make_snapshot(p);
- char* new_buf = malloc(new_size + elem_size);
+ char* new_buf = malloc(new_size);
p->end = copy_pipe_into_new_buf(make_snapshot(p), new_buf);
p->begin =
p->buffer = (free(p->buffer), new_buf);
- p->bufend = new_buf + new_size + elem_size;
+ p->bufend = new_buf + new_size;
check_invariants(p);
@@ -782,7 +784,7 @@ static inline snapshot_t validate_size(pipe_t* p,
size_t elems_needed = bytes_needed / elem_size;
if(likely(bytes_needed > cap))
- s = resize_buffer(p, next_pow2(elems_needed+1)*elem_size);
+ s = resize_buffer(p, next_pow2(elems_needed)*elem_size);
}
// Unlock the pipe if requested.
@@ -844,9 +846,11 @@ static inline snapshot_t wait_for_room(pipe_t* p, size_t* max_cap)
size_t consumer_refcount = p->consumer_refcount;
+ size_t elem_size = __pipe_elem_size(p);
+
*max_cap = p->max_cap;
- for(; unlikely(bytes_used == *max_cap) && likely(consumer_refcount > 0);
+ for(; unlikely(bytes_used + elem_size >= *max_cap) && likely(consumer_refcount > 0);
s = make_snapshot(p),
bytes_used = bytes_in_use(s),
consumer_refcount = p->consumer_refcount,
@@ -883,7 +887,7 @@ void __pipe_push(pipe_t* p,
// Finally, we can now begin with pushing as many elements into the
// queue as possible.
p->end = process_push(s, elems,
- pushed = min(count, max_cap - bytes_in_use(s)));
+ pushed = min(count, capacity(s) - bytes_in_use(s)));
} mutex_unlock(&p->end_lock);
assertume(pushed > 0);
diff --git a/pipe_test.c b/pipe_test.c
index 6c8781d..57d160b 100644
--- a/pipe_test.c
+++ b/pipe_test.c
@@ -29,6 +29,10 @@
#include <stdlib.h>
#include <string.h>
+#ifdef __GNUC__
+#pragma GCC diagnostic ignored "-Wunused-result"
+#endif
+
#define UNUSED_PARAMETER(var) (var) = (var)
// All this hackery is just to get asserts to work in release build.
@@ -245,6 +249,145 @@ DEF_TEST(issue_5)
pipe_consumer_free(c);
}
+// set max cap (not infinite)
+DEF_TEST(issue_6_a)
+{
+ static const int NUM = 32;
+ pipe_t* pipe = pipe_new(sizeof(int), NUM);
+ pipe_producer_t* p = pipe_producer_new(pipe);
+ pipe_consumer_t* c = pipe_consumer_new(pipe);
+ pipe_free(pipe);
+
+ int data[NUM];
+ for(int i=0; i < NUM; ++i)
+ data[i] = i;
+ pipe_push(p, data, NUM);
+ pipe_producer_free(p);
+
+ int buf[NUM];
+ size_t ret = pipe_pop(c, buf, NUM);
+ assert(ret == NUM);
+ for(int i=0; i < NUM; ++i)
+ assert(buf[i] == data[i]);
+
+ pipe_consumer_free(c);
+}
+
+// set smaller min cap
+DEF_TEST(issue_6_b)
+{
+ static const int NUM = 16;
+ pipe_t* pipe = pipe_new(sizeof(int), NUM * 2);
+ pipe_reserve(PIPE_GENERIC(pipe), NUM);
+ pipe_producer_t* p = pipe_producer_new(pipe);
+ pipe_consumer_t* c = pipe_consumer_new(pipe);
+ pipe_free(pipe);
+
+ int data[NUM];
+ for(int i=0; i < NUM; ++i)
+ data[i] = i;
+ pipe_push(p, data, NUM);
+ pipe_producer_free(p);
+
+ int buf[NUM];
+ size_t ret = pipe_pop(c, buf, NUM);
+ assert(ret == NUM);
+ for(int i=0; i < NUM; ++i)
+ assert(buf[i] == data[i]);
+
+ pipe_consumer_free(c);
+}
+
+#ifdef _WIN32 // use the native win32 API on Windows
+
+#include <windows.h>
+
+#define thread_create(f, p) CloseHandle( \
+ CreateThread(NULL, \
+ 0, \
+ (LPTHREAD_START_ROUTINE)(f), \
+ (p), \
+ 0, \
+ NULL))
+
+#define thread_sleep(s) Sleep((s) * 1000)
+
+#else // fall back on pthreads
+
+#include <pthread.h>
+#include <unistd.h>
+
+static inline void thread_create(void *(*f) (void*), void* p)
+{
+ pthread_t t;
+ pthread_create(&t, NULL, f, p);
+}
+
+#define thread_sleep(s) sleep(s)
+
+#endif
+
+typedef struct __issue_6_t {
+ pipe_consumer_t* c;
+ int writing;
+ int read;
+} issue_6_t;
+
+static void* process_pipe_issue_6_c(void* param)
+{
+ static const int NUM = 32;
+ issue_6_t* v = (issue_6_t *)param;
+
+ //printf("Consumer waiting for a bit ...\n");
+ //printf("Consumer starts to read pipe ...\n");
+ thread_sleep(1);
+ assert(v->writing); // producer still writing, blocked from finishing
+ int buf[NUM];
+ size_t ret = pipe_pop(v->c, buf, NUM);
+ assert(ret == NUM);
+ for(int i=0; i < NUM; ++i)
+ assert(buf[i] == i);
+
+ v->read = NUM;
+ pipe_consumer_free(v->c);
+ return NULL;
+}
+
+// producer blocking on push
+// Note: pipe rounds up to power of 2 so initial
+// value of 32 is rounded up to 64 so we block on
+// writing 64 values, not 32
+DEF_TEST(issue_6_c)
+{
+ static const int NUM = 32;
+ pipe_t* pipe = pipe_new(sizeof(int), NUM);
+ pipe_producer_t* p = pipe_producer_new(pipe);
+
+ issue_6_t* params = malloc(sizeof(*params));
+ memset(params, 0, sizeof(*params));
+ params->c = pipe_consumer_new(pipe);
+
+ pipe_free(pipe);
+
+ thread_create(&process_pipe_issue_6_c, params);
+
+ int data[NUM];
+ for(int i=0; i < NUM; ++i)
+ data[i] = i;
+ //printf("Producer pushing ok ...\n");
+ params->writing = 1;
+ pipe_push(p, data, NUM);
+ //printf("Producer pushing should be blocked ...\n");
+ pipe_push(p, data, NUM);
+ //printf("Producer unblocked ...\n");
+ params->writing = 0;
+ thread_sleep(1);
+ assert(params->read == NUM);
+
+ free(params);
+ pipe_producer_free(p);
+}
+
/*
// This test is only legal if DEFAULT_MINCAP is less than or equal to 8.
//
@@ -307,6 +450,9 @@ void pipe_run_test_suite(void)
RUN_TEST(parallel_multiplier);
RUN_TEST(issue_4);
RUN_TEST(issue_5);
+ RUN_TEST(issue_6_a);
+ RUN_TEST(issue_6_b);
+ RUN_TEST(issue_6_c);
/*
#ifdef PIPE_DEBUG
RUN_TEST(clobbering);
The following patch:
* fixes the assertion and non-blocking problem if a non-zero value is used when creating pipe with `pipe_new` * can reduce the mincap to less than DEFAULT_MINCAP with `pipe_reserve`
applied in https://github.com/mgood7123/pipe
also see https://github.com/cgaebel/pipe/issues/11
you do not join your threads, which also creates a memory leak
There is a commit on https://github.com/git-blame/pipe.
This returns thread handles so that in pipe_test.c you can join the child threads and clean up resources.