RxCpp icon indicating copy to clipboard operation
RxCpp copied to clipboard

[Feature request] buffer with predicate which determines when buffer must be flushed

Open anatoly-spb opened this issue 7 years ago • 8 comments

Hello!

I have simple example with character stream:

  auto source = rxcpp::observable<>::iterate(
    std::initializer_list<char>{ '1', '2', '3', '$', '5', '$', '7', '8', '$' }
  );

I want to get std::vector for each sub sequence terminated by '$'. I know that there is buffer operator ( http://reactivex.io/documentation/operators/buffer.html ) with count and time overloads. But what about buffer with predicate which determines when buffer must be flushed?

Now I have the following workaround:

  auto source = rxcpp::observable<>::iterate(
    std::initializer_list<char>{ '1', '2', '3', '$', '5', '$', '7', '8', '$' }
  );

  auto vector_stream = source
    | rxcpp::operators::flat_map([](char ch) {
    return rxcpp::observable<>::create<std::vector<char>>(
      [ch](rxcpp::subscriber<std::vector<char>> s) {
      static std::vector<char> a;
      if (ch == '$') {
        s.on_next(a);
        a.clear();
      }
      else {
        a.push_back(ch);
      }
    });
  });

  vector_stream
    .subscribe([](const std::vector<char> &v) {
    std::cout << "consume vector: ";
    std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
    std::cout << std::endl;
  });

I am newcomer, so may be you point me more correct and elegant RX way to solve my task.

Thanks in advance, Anatoly Shirokov

anatoly-spb avatar Mar 31 '17 21:03 anatoly-spb

The window_toggle operator can be used to build this. In fact this is used in Readme.md to implement a variation of this exact problem.

    // filter to last string in each line
    auto closes = strings |
        filter(
            [](const string& s){
                return s.back() == '\r';
            }) |
        Rx::map([](const string&){return 0;});

    // group strings by line
    auto linewindows = strings |
        window_toggle(closes | start_with(0), [=](int){return closes;});

Partially applied to this case:

  auto source = rxcpp::observable<>::iterate(
    std::initializer_list<char>{ '1', '2', '3', '$', '5', '$', '7', '8', '$' }
  ) |
  // share
  publish() |
  ref_count();

    // filter to flushes
    auto flushes = source |
        filter(
            [](char c){
                return c == '$';
            }) |
        Rx::map([](const string&){return 0;});

    // group
    auto windows = source |
        window_toggle(flushes | start_with(0), [=](int){return flushes;});

  auto vectors = windows |
    map([](observable<char> w){
      return w | 
        filter(. . .) | // filter out '$'
        reduce(. . .); // push_back each char into a vector.
    });

kirkshoop avatar Apr 03 '17 14:04 kirkshoop

Thank you so much, Kirk! You are kind wizard. But I cannot make it works with infinity character sequense:

  auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread())
    .map([](long i) {
    if (!(i % 10))
      return (char)'$';
    return (char)('A' + i);
  });
  std::mutex m;
  source.subscribe([&m](char ch) {
    std::lock_guard<std::mutex> g(m);
    std::cout << "char: " << ch << std::endl;
  });

  // filter to flushes
  auto flushes = source |
    rxcpp::operators::filter(
      [](char ch) {
    return ch == '$';
  })
    | rxcpp::operators::map([](char ch) {
    return 0;
  });

  auto windows = source |
    rxcpp::operators::window_toggle(flushes | rxcpp::operators::start_with(0), [=](int) {
    return flushes;
  });

  auto vectors = windows |
    rxcpp::operators::map([](rxcpp::observable<char> w) {
    return w |
      // filter out '$'
      rxcpp::operators::filter([](char ch) {
      return ch != '$';
    }) | 
      // reduce to vector
      rxcpp::operators::reduce(
        std::vector<char>(),
        [](std::vector<char> v, char ch) {
      v.push_back(ch);
      return v;
    }) | rxcpp::operators::as_dynamic();
  });

  vectors
    .take(3)
    .as_blocking()
    .subscribe([&m](rxcpp::observable<std::vector<char>> o) {
    o.subscribe([&m](std::vector<char> &v) {
      std::lock_guard<std::mutex> g(m);
      std::cout << "vector: ";
      std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
      std::cout << std::endl;
    });
  });

I got the following:

char: B
char: C
char: D
char: E
char: F
char: G
char: H
char: I
char: J
char: $
vector: B C D E F G H I J
vector:
char: L
char: M
char: N
char: O
char: P
char: Q
char: R
char: S
char: T
char: $

Any hints?

anatoly-spb avatar Apr 03 '17 18:04 anatoly-spb

I think that the fix is to share the source

  auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread())
    .map([](long i) {
    if (!(i % 10))
      return (char)'$';
    return (char)('A' + i);
  }) |
  // share
  publish() |
  ref_count();

this should let you remove the mutex m as well. at the moment every subscribe to the source is starting a new thread and emitting all the chars on each thread. this leads to some coordination issues in the window_toggle operator, since it was not given a thread-safe scheduler to use to coordinate the values from different threads. It also means that the '$' from one thread is being used to close the sequence from a different thread.

kirkshoop avatar Apr 03 '17 19:04 kirkshoop

Thank you, Kirk! As the result window_toggle is not suitable to solve this task. The following does not work:

  auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50))
    .map([](long i) {
    if (!(i % 10))
      return (char)'$';
    return (char)('A' + i);
  })
    .publish()
    .ref_count();

  // filter to flushes
  auto flushes = source |
    rxcpp::operators::filter(
      [](char ch) {
    return ch == '$';
  })
    | rxcpp::operators::map([](char ch) {
    return 0;
  });

  auto windows = source |
    rxcpp::operators::window_toggle(flushes | rxcpp::operators::start_with(0), [=](int) {
    return flushes;
  });

  auto vectors = windows |
    rxcpp::operators::map([](rxcpp::observable<char> w) {
    return w |
      // filter out '$'
      rxcpp::operators::filter([](char ch) {
      return ch != '$';
    }) | 
      // reduce to vector
      rxcpp::operators::reduce(
        std::vector<char>(),
        [](std::vector<char> v, char ch) {
      v.push_back(ch);
      return v;
    }) | rxcpp::operators::as_dynamic();
  });

  vectors
    .take(3)
    .as_blocking()
    .subscribe([](rxcpp::observable<std::vector<char>> o) {
    o.subscribe([](std::vector<char> &v) {
      std::cout << "vector: ";
      std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
      std::cout << std::endl;
    });
  });
  return;

I got only one vector and my program is hanged:

vector: B C D E F G H I J

As for me the best solution in that case will be buffer with predicate.

anatoly-spb avatar Apr 03 '17 20:04 anatoly-spb

I finally have a fix for an async lifetime bug in window_toggle.

A few improvements to the code as well.

  • no need to use as_blocking() when no threads were added
  • use merge() instead of nesting a subscribe in on_next
  auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50))
    .map([](long i) {
    if (!(i % 10))
      return (char)'$';
    return (char)('A' + i);
  })
    .publish()
    .ref_count();

  // filter to flushes
  auto flushes = source |
    rxcpp::operators::filter(
      [](char ch) {
    return ch == '$';
  }) |
    rxcpp::operators::map([](char ) {
    return 0;
  });

  auto windows = source |
    rxcpp::operators::window_toggle(flushes | rxcpp::operators::start_with(0), [=](int) {
    return flushes;
  });

  auto vectors = windows |
    rxcpp::operators::map([](rxcpp::observable<char> w) {
    return w |
      // filter out '$'
      rxcpp::operators::filter([](char ch) {
      return ch != '$';
    }) |
      // reduce to vector
      rxcpp::operators::reduce(
        std::vector<char>(),
        [](std::vector<char> v, char ch) {
      v.push_back(ch);
      return v;
    }) |
      rxcpp::operators::as_dynamic();
  }) |
  merge();

  vectors
    .take(3)
    .subscribe([](const std::vector<char> &v) {
      std::cout << "vector: ";
      std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
      std::cout << std::endl;
    });

with the bug fix this results in

$ ./delimited 
vector: B C D E F G H I J 
vector: L M N O P Q R S T 
vector: V W X Y Z [ \ ] ^ 
$ 

As for me the best solution in that case will be buffer with predicate.

Adding buffer_toggle would allow this and remove the extra reduce. There is not a lot of code difference between the window and buffer forms of the operators. I just haven't gotten around to it yet.

kirkshoop avatar Apr 07 '17 02:04 kirkshoop

Thank you so much, Kirk! This is very good news.

anatoly-spb avatar Apr 07 '17 04:04 anatoly-spb

Thank you for pointing out the issues in window_toggle!

kirkshoop avatar Apr 07 '17 06:04 kirkshoop

With all respect to the 'creative solution', readable and understandable code looks different, so something like a buffer_if with a predicate seems preferable and should still be considered a reasonable feature request. The proposed solution is more of a workaround imho.

tnovotny avatar Feb 27 '18 18:02 tnovotny