pipe icon indicating copy to clipboard operation
pipe copied to clipboard

pipe_push blocking

Open edwar64896 opened this issue 9 years ago • 10 comments

Any way of blocking pipe_push if the pipe is full?

edwar64896 avatar Dec 23 '16 07:12 edwar64896

As far as I know, pipe_push does block if full. Do you have an example showing it doesn't?

cgaebel avatar Dec 24 '16 01:12 cgaebel

Hi @cgaebel,

Here's an example where pipe_push doesn't block:

#include <stdio.h>
#include <stdlib.h>
#include "pipe.h"

int main() {
    pipe_t* pipe = pipe_new(sizeof(char), 10);
    pipe_producer_t* p = pipe_producer_new(pipe);

    char a[2000];
    for (int i = 0; i < 20; i++)
        pipe_push(p, a+i, 1);

    printf("Didn't block\n");

    return 0;
}

Furthermore, if you increase the limit in the code above, for example to 100:

    ...
    pipe_t* pipe = pipe_new(sizeof(char), 100);
    ...
    for (int i = 0; i < 200; i++)
    ...

Then this assertion is thrown:

push_block: pipe.c:518: check_invariants: Assertion `in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)' failed.

theojepsen avatar Oct 11 '17 22:10 theojepsen

Any news? Thanks.

pbtrung avatar Jan 06 '18 00:01 pbtrung

I reiterate that the example above does not block when built against master. When built against single_mutex and lock_free branches the example above blocks, however.

1pakch avatar Apr 30 '18 16:04 1pakch

Was this going to be fixed?

RSully avatar May 14 '18 01:05 RSully

I found the issue is caused by there is a minimum capacity of the pipe which is 32, defined by DEFAULT_MINCAP. So even when the test code above sets the limit to 10, it's changed to 32 in pipe_new(). So if you change the push count from 20 to a number larger than 32, it will block as expected.

gpenghe avatar Jul 02 '18 05:07 gpenghe

It still issues an assertion error even if using larger value (for example 32). I've modified the unit test:

DEF_TEST(issue_5)
{
//  static const int NUM = 32;
#define NUM 64
  pipe_t* pipe = pipe_new(sizeof(int), 32);
  pipe_producer_t* p = pipe_producer_new(pipe);
  pipe_consumer_t* c = pipe_consumer_new(pipe);
  pipe_free(pipe);

  int data[NUM];
  for(int i=0; i < NUM; ++i)
  {
    data[i] = i;
    pipe_push(p, data, 1);
  }
//  pipe_push(p, data, NUM);
  pipe_producer_free(p);

  int buf[NUM];
  size_t ret = pipe_pop(c, buf, NUM);
  assert(ret == NUM);
  for(int i=0; i < NUM; ++i)
    assert(buf[i] == data[i]);

  pipe_consumer_free(c);
}

Stack failure:

233	  pipe_t* pipe = pipe_new(sizeof(int), 32);
(gdb) c
Continuing.
pipe_test: pipe.c:519: void check_invariants(pipe_t *): Assertion `in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)' failed.
issue_5 ->
Program received signal SIGABRT, Aborted.
__GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
51	../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
(gdb) bt
#0  __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
#1  0x00007ffff734942a in __GI_abort () at abort.c:89
#2  0x00007ffff7340e67 in __assert_fail_base (fmt=<optimized out>,
    assertion=assertion@entry=0x403e35 "in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)",
    file=file@entry=0x403b7f "pipe.c", line=line@entry=519,
    function=function@entry=0x403c9b "void check_invariants(pipe_t *)") at assert.c:92
#3  0x00007ffff7340f12 in __GI___assert_fail (
    assertion=0x403e35 "in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap)", file=0x403b7f "pipe.c", line=519,
    function=0x403c9b "void check_invariants(pipe_t *)") at assert.c:101
#4  0x0000000000401d24 in check_invariants (p=0x606420) at pipe.c:519
#5  0x0000000000403289 in resize_buffer (p=0x606420, new_size=256) at pipe.c:762
#6  0x00000000004028b9 in validate_size (p=0x606420, s=..., new_bytes=4) at pipe.c:786
#7  0x0000000000402351 in __pipe_push (p=0x606420, elems=0x7fffffffe1d0, count=4) at pipe.c:883
#8  0x0000000000402b7f in pipe_push (p=0x606420, elems=0x7fffffffe1d0, count=4) at pipe.c:911
#9  0x000000000040139a in test_issue_5 () at pipe_test.c:242
#10 0x00000000004012ea in pipe_run_test_suite () at pipe_test.c:317
#11 0x0000000000400b84 in main () at main.c:5
(gdb) fr 9
#9  0x000000000040139a in test_issue_5 () at pipe_test.c:242
242	    pipe_push(p, data, 1);
(gdb) p i
$19 = 31

git-blame avatar Mar 06 '20 16:03 git-blame

The following patch:

  • fixes the assertion and non-blocking problem if a non-zero value is used when creating pipe with pipe_new
  • can reduce the mincap to less than DEFAULT_MINCAP with pipe_reserve
diff --git a/pipe.c b/pipe.c
index 9af421a..e3c8c26 100644
--- a/pipe.c
+++ b/pipe.c
@@ -514,7 +514,7 @@ static inline void check_invariants(pipe_t* p)
     if(s.begin == s.end)
         assertume(bytes_in_use(s) == capacity(s));
 
-    assertume(in_bounds(DEFAULT_MINCAP*p->elem_size, p->min_cap, p->max_cap));
+    //assertume(in_bounds(DEFAULT_MINCAP*p->elem_size, p->min_cap, p->max_cap));
     assertume(in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap));
 }
 
@@ -743,6 +743,8 @@ static snapshot_t resize_buffer(pipe_t* p, size_t new_size)
                  elem_size = __pipe_elem_size(p);
 
     assertume(new_size >= bytes_in_use(make_snapshot(p)));
+    assertume(new_size + elem_size > new_size); // overflow
+    new_size += elem_size; // include sentinel
 
     if(unlikely(new_size >= max_cap))
         new_size = max_cap;
@@ -750,13 +752,13 @@ static snapshot_t resize_buffer(pipe_t* p, size_t new_size)
     if(new_size <= min_cap)
         return make_snapshot(p);
 
-    char* new_buf = malloc(new_size + elem_size);
+    char* new_buf = malloc(new_size);
     p->end = copy_pipe_into_new_buf(make_snapshot(p), new_buf);
 
     p->begin  =
     p->buffer = (free(p->buffer), new_buf);
 
-    p->bufend = new_buf + new_size + elem_size;
+    p->bufend = new_buf + new_size;
 
     check_invariants(p);
 
@@ -782,7 +784,7 @@ static inline snapshot_t validate_size(pipe_t* p,
             size_t elems_needed = bytes_needed / elem_size;
 
             if(likely(bytes_needed > cap))
-                s = resize_buffer(p, next_pow2(elems_needed+1)*elem_size);
+                s = resize_buffer(p, next_pow2(elems_needed)*elem_size);
         }
 
         // Unlock the pipe if requested.
@@ -844,9 +846,11 @@ static inline snapshot_t wait_for_room(pipe_t* p, size_t* max_cap)
 
     size_t consumer_refcount = p->consumer_refcount;
 
+    size_t elem_size = __pipe_elem_size(p);
+
     *max_cap = p->max_cap;
 
-    for(; unlikely(bytes_used == *max_cap) && likely(consumer_refcount > 0);
+    for(; unlikely(bytes_used + elem_size >= *max_cap) && likely(consumer_refcount > 0);
           s                 = make_snapshot(p),
           bytes_used        = bytes_in_use(s),
           consumer_refcount = p->consumer_refcount,
@@ -883,7 +887,7 @@ void __pipe_push(pipe_t* p,
         // Finally, we can now begin with pushing as many elements into the
         // queue as possible.
         p->end = process_push(s, elems,
-                     pushed = min(count, max_cap - bytes_in_use(s)));
+                     pushed = min(count, capacity(s) - bytes_in_use(s)));
     } mutex_unlock(&p->end_lock);
 
     assertume(pushed > 0);
diff --git a/pipe_test.c b/pipe_test.c
index 6c8781d..57d160b 100644
--- a/pipe_test.c
+++ b/pipe_test.c
@@ -29,6 +29,10 @@
 #include <stdlib.h>
 #include <string.h>
 
+#ifdef __GNUC__
+#pragma GCC diagnostic ignored "-Wunused-result"
+#endif
+
 #define UNUSED_PARAMETER(var) (var) = (var)
 
 // All this hackery is just to get asserts to work in release build.
@@ -245,6 +249,145 @@ DEF_TEST(issue_5)
   pipe_consumer_free(c);
 }
 
+// set max cap (not infinite)
+DEF_TEST(issue_6_a)
+{
+  static const int NUM = 32;
+  pipe_t* pipe = pipe_new(sizeof(int), NUM);
+  pipe_producer_t* p = pipe_producer_new(pipe);
+  pipe_consumer_t* c = pipe_consumer_new(pipe);
+  pipe_free(pipe);
+
+  int data[NUM];
+  for(int i=0; i < NUM; ++i)
+    data[i] = i;
+  pipe_push(p, data, NUM);
+  pipe_producer_free(p);
+
+  int buf[NUM];
+  size_t ret = pipe_pop(c, buf, NUM);
+  assert(ret == NUM);
+  for(int i=0; i < NUM; ++i)
+    assert(buf[i] == data[i]);
+
+  pipe_consumer_free(c);
+}
+
+// set smaller min cap
+DEF_TEST(issue_6_b)
+{
+  static const int NUM = 16;
+  pipe_t* pipe = pipe_new(sizeof(int), NUM * 2);
+  pipe_reserve(PIPE_GENERIC(pipe), NUM);
+  pipe_producer_t* p = pipe_producer_new(pipe);
+  pipe_consumer_t* c = pipe_consumer_new(pipe);
+  pipe_free(pipe);
+
+  int data[NUM];
+  for(int i=0; i < NUM; ++i)
+    data[i] = i;
+  pipe_push(p, data, NUM);
+  pipe_producer_free(p);
+
+  int buf[NUM];
+  size_t ret = pipe_pop(c, buf, NUM);
+  assert(ret == NUM);
+  for(int i=0; i < NUM; ++i)
+    assert(buf[i] == data[i]);
+
+  pipe_consumer_free(c);
+}
+
+#ifdef _WIN32 // use the native win32 API on Windows
+
+#include <windows.h>
+
+#define thread_create(f, p) CloseHandle(            \
+        CreateThread(NULL,                          \
+                     0,                             \
+                     (LPTHREAD_START_ROUTINE)(f),   \
+                     (p),                           \
+                     0,                             \
+                     NULL))
+
+#define thread_sleep(s) Sleep((s) * 1000)
+
+#else // fall back on pthreads
+
+#include <pthread.h>
+#include <unistd.h>
+
+static inline void thread_create(void *(*f) (void*), void* p)
+{
+    pthread_t t;
+    pthread_create(&t, NULL, f, p);
+}
+
+#define thread_sleep(s) sleep(s)
+
+#endif
+
+typedef struct __issue_6_t {
+  pipe_consumer_t* c;
+  int writing;
+  int read;
+} issue_6_t;
+
+static void* process_pipe_issue_6_c(void* param)
+{
+  static const int NUM = 32;
+  issue_6_t* v = (issue_6_t *)param;
+
+  //printf("Consumer waiting for a bit ...\n");
+  //printf("Consumer starts to read pipe ...\n");
+  thread_sleep(1);
+  assert(v->writing); // producer still writing, blocked from finishing
+  int buf[NUM];
+  size_t ret = pipe_pop(v->c, buf, NUM);
+  assert(ret == NUM);
+  for(int i=0; i < NUM; ++i)
+    assert(buf[i] == i);
+
+  v->read = NUM;
+  pipe_consumer_free(v->c);
+  return NULL;
+}
+
+// producer blocking on push
+// Note: pipe rounds up to power of 2 so initial
+// value of 32 is rounded up to 64 so we block on
+// writing 64 values, not 32
+DEF_TEST(issue_6_c)
+{
+  static const int NUM = 32;
+  pipe_t* pipe = pipe_new(sizeof(int), NUM);
+  pipe_producer_t* p = pipe_producer_new(pipe);
+
+  issue_6_t* params = malloc(sizeof(*params));
+  memset(params, 0, sizeof(*params));
+  params->c = pipe_consumer_new(pipe);
+
+  pipe_free(pipe);
+
+  thread_create(&process_pipe_issue_6_c, params);
+
+  int data[NUM];
+  for(int i=0; i < NUM; ++i)
+    data[i] = i;
+  //printf("Producer pushing ok ...\n");
+  params->writing = 1;
+  pipe_push(p, data, NUM);
+  //printf("Producer pushing should be blocked ...\n");
+  pipe_push(p, data, NUM);
+  //printf("Producer unblocked ...\n");
+  params->writing = 0;
+  thread_sleep(1);
+  assert(params->read == NUM);
+
+  free(params);
+  pipe_producer_free(p);
+}
+
 /*
 // This test is only legal if DEFAULT_MINCAP is less than or equal to 8.
 //
@@ -307,6 +450,9 @@ void pipe_run_test_suite(void)
     RUN_TEST(parallel_multiplier);
     RUN_TEST(issue_4);
     RUN_TEST(issue_5);
+    RUN_TEST(issue_6_a);
+    RUN_TEST(issue_6_b);
+    RUN_TEST(issue_6_c);
 /*
 #ifdef PIPE_DEBUG
     RUN_TEST(clobbering);

git-blame avatar Mar 12 '20 11:03 git-blame

The following patch:

* fixes the assertion and non-blocking problem if a non-zero value is used when creating pipe with `pipe_new`

* can reduce the mincap to less than DEFAULT_MINCAP with `pipe_reserve`

applied in https://github.com/mgood7123/pipe

also see https://github.com/cgaebel/pipe/issues/11

you do not join your threads, which also creates a memory leak

mgood7123 avatar Aug 30 '20 05:08 mgood7123

There is a commit on https://github.com/git-blame/pipe. This returns thread handles so that in pipe_test.c you can join the child threads and clean up resources.

git-blame avatar Apr 25 '22 13:04 git-blame