RxCpp
RxCpp copied to clipboard
[Feature request] buffer with predicate which determines when buffer must be flushed
Hello!
I have simple example with character stream:
auto source = rxcpp::observable<>::iterate(
std::initializer_list<char>{ '1', '2', '3', '$', '5', '$', '7', '8', '$' }
);
I want to get std::vector
Now I have the following workaround:
auto source = rxcpp::observable<>::iterate(
std::initializer_list<char>{ '1', '2', '3', '$', '5', '$', '7', '8', '$' }
);
auto vector_stream = source
| rxcpp::operators::flat_map([](char ch) {
return rxcpp::observable<>::create<std::vector<char>>(
[ch](rxcpp::subscriber<std::vector<char>> s) {
static std::vector<char> a;
if (ch == '$') {
s.on_next(a);
a.clear();
}
else {
a.push_back(ch);
}
});
});
vector_stream
.subscribe([](const std::vector<char> &v) {
std::cout << "consume vector: ";
std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
std::cout << std::endl;
});
I am newcomer, so may be you point me more correct and elegant RX way to solve my task.
Thanks in advance, Anatoly Shirokov
The window_toggle
operator can be used to build this. In fact this is used in Readme.md to implement a variation of this exact problem.
// filter to last string in each line
auto closes = strings |
filter(
[](const string& s){
return s.back() == '\r';
}) |
Rx::map([](const string&){return 0;});
// group strings by line
auto linewindows = strings |
window_toggle(closes | start_with(0), [=](int){return closes;});
Partially applied to this case:
auto source = rxcpp::observable<>::iterate(
std::initializer_list<char>{ '1', '2', '3', '$', '5', '$', '7', '8', '$' }
) |
// share
publish() |
ref_count();
// filter to flushes
auto flushes = source |
filter(
[](char c){
return c == '$';
}) |
Rx::map([](const string&){return 0;});
// group
auto windows = source |
window_toggle(flushes | start_with(0), [=](int){return flushes;});
auto vectors = windows |
map([](observable<char> w){
return w |
filter(. . .) | // filter out '$'
reduce(. . .); // push_back each char into a vector.
});
Thank you so much, Kirk! You are kind wizard. But I cannot make it works with infinity character sequense:
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread())
.map([](long i) {
if (!(i % 10))
return (char)'$';
return (char)('A' + i);
});
std::mutex m;
source.subscribe([&m](char ch) {
std::lock_guard<std::mutex> g(m);
std::cout << "char: " << ch << std::endl;
});
// filter to flushes
auto flushes = source |
rxcpp::operators::filter(
[](char ch) {
return ch == '$';
})
| rxcpp::operators::map([](char ch) {
return 0;
});
auto windows = source |
rxcpp::operators::window_toggle(flushes | rxcpp::operators::start_with(0), [=](int) {
return flushes;
});
auto vectors = windows |
rxcpp::operators::map([](rxcpp::observable<char> w) {
return w |
// filter out '$'
rxcpp::operators::filter([](char ch) {
return ch != '$';
}) |
// reduce to vector
rxcpp::operators::reduce(
std::vector<char>(),
[](std::vector<char> v, char ch) {
v.push_back(ch);
return v;
}) | rxcpp::operators::as_dynamic();
});
vectors
.take(3)
.as_blocking()
.subscribe([&m](rxcpp::observable<std::vector<char>> o) {
o.subscribe([&m](std::vector<char> &v) {
std::lock_guard<std::mutex> g(m);
std::cout << "vector: ";
std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
std::cout << std::endl;
});
});
I got the following:
char: B
char: C
char: D
char: E
char: F
char: G
char: H
char: I
char: J
char: $
vector: B C D E F G H I J
vector:
char: L
char: M
char: N
char: O
char: P
char: Q
char: R
char: S
char: T
char: $
Any hints?
I think that the fix is to share the source
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50), rxcpp::observe_on_new_thread())
.map([](long i) {
if (!(i % 10))
return (char)'$';
return (char)('A' + i);
}) |
// share
publish() |
ref_count();
this should let you remove the mutex m as well. at the moment every subscribe to the source is starting a new thread and emitting all the chars on each thread. this leads to some coordination issues in the window_toggle operator, since it was not given a thread-safe scheduler to use to coordinate the values from different threads. It also means that the '$' from one thread is being used to close the sequence from a different thread.
Thank you, Kirk! As the result window_toggle is not suitable to solve this task. The following does not work:
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50))
.map([](long i) {
if (!(i % 10))
return (char)'$';
return (char)('A' + i);
})
.publish()
.ref_count();
// filter to flushes
auto flushes = source |
rxcpp::operators::filter(
[](char ch) {
return ch == '$';
})
| rxcpp::operators::map([](char ch) {
return 0;
});
auto windows = source |
rxcpp::operators::window_toggle(flushes | rxcpp::operators::start_with(0), [=](int) {
return flushes;
});
auto vectors = windows |
rxcpp::operators::map([](rxcpp::observable<char> w) {
return w |
// filter out '$'
rxcpp::operators::filter([](char ch) {
return ch != '$';
}) |
// reduce to vector
rxcpp::operators::reduce(
std::vector<char>(),
[](std::vector<char> v, char ch) {
v.push_back(ch);
return v;
}) | rxcpp::operators::as_dynamic();
});
vectors
.take(3)
.as_blocking()
.subscribe([](rxcpp::observable<std::vector<char>> o) {
o.subscribe([](std::vector<char> &v) {
std::cout << "vector: ";
std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
std::cout << std::endl;
});
});
return;
I got only one vector and my program is hanged:
vector: B C D E F G H I J
As for me the best solution in that case will be buffer with predicate.
I finally have a fix for an async lifetime bug in window_toggle
.
A few improvements to the code as well.
- no need to use
as_blocking()
when no threads were added - use
merge()
instead of nesting asubscribe
inon_next
auto source = rxcpp::observable<>::interval(std::chrono::milliseconds(50))
.map([](long i) {
if (!(i % 10))
return (char)'$';
return (char)('A' + i);
})
.publish()
.ref_count();
// filter to flushes
auto flushes = source |
rxcpp::operators::filter(
[](char ch) {
return ch == '$';
}) |
rxcpp::operators::map([](char ) {
return 0;
});
auto windows = source |
rxcpp::operators::window_toggle(flushes | rxcpp::operators::start_with(0), [=](int) {
return flushes;
});
auto vectors = windows |
rxcpp::operators::map([](rxcpp::observable<char> w) {
return w |
// filter out '$'
rxcpp::operators::filter([](char ch) {
return ch != '$';
}) |
// reduce to vector
rxcpp::operators::reduce(
std::vector<char>(),
[](std::vector<char> v, char ch) {
v.push_back(ch);
return v;
}) |
rxcpp::operators::as_dynamic();
}) |
merge();
vectors
.take(3)
.subscribe([](const std::vector<char> &v) {
std::cout << "vector: ";
std::copy(v.begin(), v.end(), std::ostream_iterator<char>(std::cout, " "));
std::cout << std::endl;
});
with the bug fix this results in
$ ./delimited
vector: B C D E F G H I J
vector: L M N O P Q R S T
vector: V W X Y Z [ \ ] ^
$
As for me the best solution in that case will be buffer with predicate.
Adding buffer_toggle
would allow this and remove the extra reduce. There is not a lot of code difference between the window and buffer forms of the operators. I just haven't gotten around to it yet.
Thank you so much, Kirk! This is very good news.
Thank you for pointing out the issues in window_toggle
!
With all respect to the 'creative solution', readable and understandable code looks different, so something like a buffer_if
with a predicate seems preferable and should still be considered a reasonable feature request. The proposed solution is more of a workaround imho.