RxPY icon indicating copy to clipboard operation
RxPY copied to clipboard

Observable interval appears to leak

Open AndrewLipscomb opened this issue 6 years ago • 6 comments

In this test example

from __future__ import print_function

import multiprocessing
import threading
import os
import psutil
from time import sleep

from rx import Observable

# Synchronisable heartbeat
heartbeat = Observable.interval(100).publish()

heartbeat.connect()

heartbeat.subscribe(lambda beat: None )

# Mocks a 50Hz GPS signal
provider = Observable.interval(20) \
                            .map(lambda i: (24, 24) ) \
                            .publish()

provider.connect()

heartbeat.with_latest_from(provider, lambda heartbeat, gps: gps) \
        .subscribe( lambda combined : None ) # lambda combined: print(combined) )

try:
    while True:
        sleep(0.1)
        process = psutil.Process(os.getpid())
        print(str(process.memory_info().rss/1000) + " KB in mem; " + str(threading.active_count()) + " active threads")

except KeyboardInterrupt:
    print("Caught KeyboardInterrupt")

which is running in 2.7.15 on Kubuntu 18.04 with RxPy 1.6.1, I'm seeing the memory of this application slowly tick upwards. The output of psutil (and the system profiler) shows memory ticking upwards, while the thread count stays constant at around 9.

Full disclosure - Python is not my usual language, so I may be a missing a language gotcha.

From having a read of an implementation (in Java) of interval I didn't believe that anything here needs to managed for memory growth - Googling on memory management for infinite sequences isn't giving me much either. Is this normal (and my code is badly written) or is there an issue here?

AndrewLipscomb avatar Sep 27 '18 10:09 AndrewLipscomb

FWIW wrote up a comparison test in my usual language (c++) and couldn't replicate any leaking.

#include <iostream>
#include <chrono>
#include "rxcpp/rx.hpp"

int main(int argc, char** argv) {

    auto threads = rxcpp::observe_on_new_thread();

    auto heartbeat = rxcpp::observable<>::interval( std::chrono::milliseconds(100), threads )
                        .observe_on(threads)
                        .publish();
    heartbeat.connect();

    auto gps = rxcpp::observable<>::interval( std::chrono::milliseconds(20), threads )
                        .observe_on(threads)
                        .publish();
    gps.connect();

    auto values = heartbeat.with_latest_from(gps);

    values
        .subscribe_on(threads)
        .subscribe(
            [](std::tuple<int, int> v) { printf("OnNext: %d, %d\n", std::get<0>(v), std::get<1>(v)); },
            []() { printf("OnCompleted\n"); }
        );

    int i;
    std::cout << "This blocks until Ctrl C";
    std::cin >> i;

    return 0;
}

Edit: fixed code

AndrewLipscomb avatar Sep 27 '18 11:09 AndrewLipscomb

I have the following logging format for std logging lib. logging.basicConfig(level=logging.INFO,format="[%(threadName)s] %(asctime)-15s %(message)s ")

This makes print the execution thread of the logging.info(foo).

I've created interval stream an consumer via the code below.

timer_stream = rx.Observable.interval(1000).subscribe_on(time_pool)
timer_stream.subscribe(
    lambda count: logging.info(count))

As you can see from the output every tick of interval command executes in a new thread id. I think this might be the reason of the issue. I'm using RxPY 1.6.1 version. FYI.

image

Edit: creating a scheduler and specifying it on scheduler argument of the interval solves the problem. New code is like below.

import rx.concurrency as rxc 
time_pool = rxc.ThreadPoolScheduler(1)
timer_stream = rx.Observable.interval(1000,scheduler=time_pool)
timer_stream.subscribe(
    lambda count: logging.info(count))

image

rfum avatar Mar 07 '19 09:03 rfum

Hi,

I've been running a bunch of tests with a simpler sample and a slightly different way of monitoring memory, RxPY3 (HEAD master), python 3.7.3 & linux. I must say that I'm not an expert in memory management, but I believe that the garbage collector can make memory measurements quite irrelevant (?). I have no idea how and when python frees unused memory. Also it's not clear what the rss value really means (linux) and if it's reliable in a python context.

Basically, the script consists in a function that creates one or more interval observables and which is executed on a second python process. This process is monitored from the main python process by pulling and logging memory infos (rss, vms) related to the 'observables' process. IMO, it avoids some potential unwanted interactions and measurement bias.

I've tested with 2 versions of this script:

  • one or more hot interval observables (see below)
  • one or more cold interval observables (i.e. without publish operator)
def fn_interval_hot(dt, ninstances=1):
    observables = []
    disposables = []
    for i in range(ninstances):
        observable = rx.interval(dt).pipe(ops.publish())
        d = observable.subscribe(lambda _: None)
        observables.append(observable)
        disposables.append(d)

    # connect observables
    for o in observables:
        o.connect()

    # maintain thread alive indefinitely
    while True:
        time.sleep(10.0)

Tests have been running with:

  • interval type: hot, cold
  • interval period: 1ms, 50ms
  • duration: 30mn
  • number of simultaneous observables: 1, 5, 8

Figure_1

We can see that in every cases, rss quickly grows until it reaches an almost constant value in an asymptotic fashion. So I would say that's not that bad after all.

@erikkemperman @dbrattli , interval operator has TimeoutScheduler as a default scheduler. It doesn't implement schedule_periodic so the one from SchedulerBase is used instead, calling recursively schedule_relative. Each time, schedule_relative creates a threading.Timer which seems to be a subclass of threading.Thread. In short, as noticed by @rfum, periodic scheduling with TimeoutScheduler results in creating a thread per item. I don't think this is an issue right now, but it may be non-optimal. Maybe something to keep in mind for potentially future scheduler improvements.

jcafhe avatar May 14 '19 17:05 jcafhe

Nice analysis!

Yes, TimeoutScheduler creates a lot of threads, where recycling is certainly possible. I've been meaning to try to make a pool or something, but have not found the time for that so far.

erikkemperman avatar May 14 '19 18:05 erikkemperman

Perhaps we should just retire TimeoutScheduler and instead use ThreadPoolScheduler as default. The TimeoutScheduler was ported from JS and might not make sense to use for Python since it's unbounded.

dbrattli avatar May 14 '19 18:05 dbrattli

Yes, that's one option. I was hoping to actually try something slightly more ambitious, namely to postpone mapping scheduled items to threads until they're almost due. That way we avoid claiming threads from the pool only for them to do nothing for a relatively long time. But actually, that kind of logic might make sense for the ThreadPoolScheduler as well, and in that case I guess they could be folded into a single class.

erikkemperman avatar May 14 '19 19:05 erikkemperman