RxPY
RxPY copied to clipboard
Observable interval appears to leak
In this test example
from __future__ import print_function
import multiprocessing
import threading
import os
import psutil
from time import sleep
from rx import Observable
# Synchronisable heartbeat
heartbeat = Observable.interval(100).publish()
heartbeat.connect()
heartbeat.subscribe(lambda beat: None )
# Mocks a 50Hz GPS signal
provider = Observable.interval(20) \
.map(lambda i: (24, 24) ) \
.publish()
provider.connect()
heartbeat.with_latest_from(provider, lambda heartbeat, gps: gps) \
.subscribe( lambda combined : None ) # lambda combined: print(combined) )
try:
while True:
sleep(0.1)
process = psutil.Process(os.getpid())
print(str(process.memory_info().rss/1000) + " KB in mem; " + str(threading.active_count()) + " active threads")
except KeyboardInterrupt:
print("Caught KeyboardInterrupt")
which is running in 2.7.15 on Kubuntu 18.04 with RxPy 1.6.1, I'm seeing the memory of this application slowly tick upwards. The output of psutil (and the system profiler) shows memory ticking upwards, while the thread count stays constant at around 9.
Full disclosure - Python is not my usual language, so I may be a missing a language gotcha.
From having a read of an implementation (in Java) of interval I didn't believe that anything here needs to managed for memory growth - Googling on memory management for infinite sequences isn't giving me much either. Is this normal (and my code is badly written) or is there an issue here?
FWIW wrote up a comparison test in my usual language (c++) and couldn't replicate any leaking.
#include <iostream>
#include <chrono>
#include "rxcpp/rx.hpp"
int main(int argc, char** argv) {
auto threads = rxcpp::observe_on_new_thread();
auto heartbeat = rxcpp::observable<>::interval( std::chrono::milliseconds(100), threads )
.observe_on(threads)
.publish();
heartbeat.connect();
auto gps = rxcpp::observable<>::interval( std::chrono::milliseconds(20), threads )
.observe_on(threads)
.publish();
gps.connect();
auto values = heartbeat.with_latest_from(gps);
values
.subscribe_on(threads)
.subscribe(
[](std::tuple<int, int> v) { printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v)); },
[]() { printf("OnCompleted\n"); }
);
int i;
std::cout << "This blocks until Ctrl C";
std::cin >> i;
return 0;
}
Edit: fixed code
I have the following logging format for std logging lib. logging.basicConfig(level=logging.INFO,format="[%(threadName)s] %(asctime)-15s %(message)s ")
This makes print the execution thread of the logging.info(foo)
.
I've created interval stream an consumer via the code below.
timer_stream = rx.Observable.interval(1000).subscribe_on(time_pool)
timer_stream.subscribe(
lambda count: logging.info(count))
As you can see from the output every tick of interval command executes in a new thread id. I think this might be the reason of the issue. I'm using RxPY 1.6.1 version. FYI.
Edit: creating a scheduler and specifying it on scheduler
argument of the interval
solves the problem.
New code is like below.
import rx.concurrency as rxc
time_pool = rxc.ThreadPoolScheduler(1)
timer_stream = rx.Observable.interval(1000,scheduler=time_pool)
timer_stream.subscribe(
lambda count: logging.info(count))
Hi,
I've been running a bunch of tests with a simpler sample and a slightly different way of monitoring memory, RxPY3 (HEAD master), python 3.7.3 & linux. I must say that I'm not an expert in memory management, but I believe that the garbage collector can make memory measurements quite irrelevant (?). I have no idea how and when python frees unused memory. Also it's not clear what the rss value really means (linux) and if it's reliable in a python context.
Basically, the script consists in a function that creates one or more interval
observables and which is executed on a second python process. This process is monitored from the main python process by pulling and logging memory infos (rss, vms) related to the 'observables' process. IMO, it avoids some potential unwanted interactions and measurement bias.
I've tested with 2 versions of this script:
- one or more hot
interval
observables (see below) - one or more cold
interval
observables (i.e. withoutpublish
operator)
def fn_interval_hot(dt, ninstances=1):
observables = []
disposables = []
for i in range(ninstances):
observable = rx.interval(dt).pipe(ops.publish())
d = observable.subscribe(lambda _: None)
observables.append(observable)
disposables.append(d)
# connect observables
for o in observables:
o.connect()
# maintain thread alive indefinitely
while True:
time.sleep(10.0)
Tests have been running with:
- interval type: hot, cold
- interval period: 1ms, 50ms
- duration: 30mn
- number of simultaneous observables: 1, 5, 8
We can see that in every cases, rss quickly grows until it reaches an almost constant value in an asymptotic fashion. So I would say that's not that bad after all.
@erikkemperman @dbrattli , interval
operator has TimeoutScheduler
as a default scheduler. It doesn't implement schedule_periodic
so the one from SchedulerBase
is used instead, calling recursively schedule_relative
. Each time, schedule_relative
creates a threading.Timer
which seems to be a subclass of threading.Thread
.
In short, as noticed by @rfum, periodic scheduling with TimeoutScheduler
results in creating a thread per item. I don't think this is an issue right now, but it may be non-optimal. Maybe something to keep in mind for potentially future scheduler improvements.
Nice analysis!
Yes, TimeoutScheduler creates a lot of threads, where recycling is certainly possible. I've been meaning to try to make a pool or something, but have not found the time for that so far.
Perhaps we should just retire TimeoutScheduler
and instead use ThreadPoolScheduler
as default. The TimeoutScheduler
was ported from JS and might not make sense to use for Python since it's unbounded.
Yes, that's one option. I was hoping to actually try something slightly more ambitious, namely to postpone mapping scheduled items to threads until they're almost due. That way we avoid claiming threads from the pool only for them to do nothing for a relatively long time. But actually, that kind of logic might make sense for the ThreadPoolScheduler as well, and in that case I guess they could be folded into a single class.