ib_async icon indicating copy to clipboard operation
ib_async copied to clipboard

cannot connect to IB using 2 threads with 2 different clientId

Open damonYuan opened this issue 9 months ago • 7 comments

As you can see in the following logs, when I start 2 threads (I want different threads to manage different underlying as you can tell by the thread name in the log)

    def run(self):
        startLoop()

        async def my_loop():
            while not self.__stop_event.is_set():
                if not self.ib_client.is_connected():
                    await self.ib_client.connect_async() # it just calls `async def connectAsync(...)`
                else:
                    sleep(1)
                    logging.info("sleep 1")
            self._clean_up()

        run(my_loop())

it throws errors as below,

[2025-07-02 23:50:43,201][INFO][runner-TLT] Connecting to IB Gateway...
[2025-07-02 23:50:43,201][INFO][runner-QQQ] Connecting to IB Gateway...
[2025-07-02 23:50:43,201][INFO][runner-TLT] Connecting to 127.0.0.1:4002 with clientId 20...
[2025-07-02 23:50:43,201][INFO][runner-QQQ] Connecting to 127.0.0.1:4002 with clientId 23...
[2025-07-02 23:50:43,202][INFO][runner-TLT] Disconnecting
[2025-07-02 23:50:43,202][ERROR][runner-TLT] API connection failed: RuntimeError("Task <Task pending name='Task-1' coro=<StratRunner.run.<locals>.my_loop() running at /Users/damon/Desktop/workspace/IronCondor/src/ironcondor/service/strat_runner.py:45>> got Future <Future pending cb=[BaseSelectorEventLoop._sock_write_done()()]> attached to a different loop")
[2025-07-02 23:50:43,202][INFO][runner-QQQ] Connected
[2025-07-02 23:50:43,202][ERROR][runner-TLT] Uncaught thread exception
Traceback (most recent call last):
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/threading.py", line 1041, in _bootstrap_inner
    self.run()
    ~~~~~~~~^^
  File "/Users/damon/Desktop/workspace/IronCondor/src/ironcondor/service/strat_runner.py", line 51, in run
    run(my_loop())
    ~~~^^^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/ib_async/util.py", line 374, in run
    result = loop.run_until_complete(task)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ~~~~~~~~^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/futures.py", line 199, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/tasks.py", line 306, in __step_run_and_handle_result
    result = coro.throw(exc)
  File "/Users/damon/Desktop/workspace/IronCondor/src/ironcondor/service/strat_runner.py", line 45, in my_loop
    await self.ib_client.connect_async()
  File "/Users/damon/Desktop/workspace/IronCondor/src/ironcondor/util/ib_client.py", line 48, in connect_async
    await self.ib.connectAsync(self.host, self.port, clientId=self.client_id)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/ib_async/ib.py", line 2043, in connectAsync
    await self.client.connectAsync(host, port, clientId, timeout)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/ib_async/client.py", line 217, in connectAsync
    await asyncio.wait_for(self.conn.connectAsync(host, port), timeout)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
    return await fut
           ^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/ib_async/connection.py", line 39, in connectAsync
    self.transport, _ = await loop.create_connection(lambda: self, host, port)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/base_events.py", line 1136, in create_connection
    sock = await self._connect_sock(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
        exceptions, addrinfo, laddr_infos)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/base_events.py", line 1039, in _connect_sock
    await self.sock_connect(sock, address)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/selector_events.py", line 641, in sock_connect
    return await fut
           ^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/futures.py", line 286, in __await__
    yield self  # This tells Task to wait for completion.
    ^^^^^^^^^^
RuntimeError: Task <Task pending name='Task-1' coro=<StratRunner.run.<locals>.my_loop() running at /Users/damon/Desktop/workspace/IronCondor/src/ironcondor/service/strat_runner.py:45>> got Future <Future pending cb=[BaseSelectorEventLoop._sock_write_done()()]> attached to a different loop
[2025-07-02 23:50:47,203][INFO][runner-QQQ] Disconnecting
[2025-07-02 23:50:47,203][ERROR][runner-QQQ] API connection failed: TimeoutError()
[2025-07-02 23:50:47,203][ERROR][runner-QQQ] Uncaught thread exception
Traceback (most recent call last):
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/tasks.py", line 507, in wait_for
    return await fut
           ^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/futures.py", line 286, in __await__
    yield self  # This tells Task to wait for completion.
    ^^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/tasks.py", line 375, in __wakeup
    future.result()
    ~~~~~~~~~~~~~^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/futures.py", line 194, in result
    raise self._make_cancelled_error()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/threading.py", line 1041, in _bootstrap_inner
    self.run()
    ~~~~~~~~^^
  File "/Users/damon/Desktop/workspace/IronCondor/src/ironcondor/service/strat_runner.py", line 51, in run
    run(my_loop())
    ~~~^^^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/ib_async/util.py", line 374, in run
    result = loop.run_until_complete(task)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/nest_asyncio.py", line 98, in run_until_complete
    return f.result()
           ~~~~~~~~^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/futures.py", line 199, in result
    raise self._exception.with_traceback(self._exception_tb)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/tasks.py", line 306, in __step_run_and_handle_result
    result = coro.throw(exc)
  File "/Users/damon/Desktop/workspace/IronCondor/src/ironcondor/service/strat_runner.py", line 45, in my_loop
    await self.ib_client.connect_async()
  File "/Users/damon/Desktop/workspace/IronCondor/src/ironcondor/util/ib_client.py", line 48, in connect_async
    await self.ib.connectAsync(self.host, self.port, clientId=self.client_id)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/ib_async/ib.py", line 2043, in connectAsync
    await self.client.connectAsync(host, port, clientId, timeout)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/ib_async/client.py", line 228, in connectAsync
    await asyncio.wait_for(self.apiStart, timeout)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/tasks.py", line 506, in wait_for
    async with timeouts.timeout(timeout):
               ~~~~~~~~~~~~~~~~^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/timeouts.py", line 116, in __aexit__
    raise TimeoutError from exc_val
TimeoutError

Process finished with exit code 0

But it works well with only 1 thread.

It worked fine with multiple threads in 1.0.3 but errors happen in 2.0.1. I have also encountered similar issue as https://github.com/ib-api-reloaded/ib_async/issues/103 if I try to set the loop to the asyncio explicitly.

damonYuan avatar Jul 02 '25 15:07 damonYuan

Thanks for checking in with some error notes (and thanks for trying the new version, sorry it doesn't appear to work right!).

Do you have a minimal working example including thread setup/connection setup/etc we can run to see the errors happen live and to debug against?

mattsta avatar Jul 03 '25 02:07 mattsta

Thanks @mattsta, here you go.

import logging
from threading import Thread

from ib_async import IB
from ib_async import util

FORMAT = '[%(asctime)s][%(levelname)s][%(threadName)s] %(message)s'

logging.basicConfig(
    level=logging.INFO,
    format=FORMAT
)
logging.getLogger('ib_async').setLevel(logging.INFO)
logging.getLogger('asyncio').setLevel(logging.INFO)


def target_func(client_id, ib):
    util.startLoop()

    async def my_loop():
        try:
            while True:
                if not ib.isConnected():
                    await ib.connectAsync('127.0.0.1', 4002, client_id)
                else:
                    util.sleep(1)
                    logging.info("sleep 1 sec")
        except Exception as e:
            logging.exception(e)

    util.run(my_loop())


def main():
    ib1 = IB()
    thread1 = Thread(target=target_func, args=(1, ib1,), name='thread1')
    thread1.start()
    
    ib2 = IB()
    thread2 = Thread(target=target_func, args=(2, ib2,), name='thread2')
    thread2.start()


if __name__ == '__main__':
    main()

And the error will be like,

[2025-07-03 22:21:17,756][INFO][thread1] Connecting to 127.0.0.1:4002 with clientId 1...
Traceback (most recent call last):
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/threading.py", line 1041, in _bootstrap_inner
    self.run()
    ~~~~~~~~^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/threading.py", line 992, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/damon/Desktop/workspace/IronCondor/src/ironcondor/util/example.py", line 31, in target_func
    util.run(my_loop())
    ~~~~~~~~^^^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/ib_async/util.py", line 374, in run
    result = loop.run_until_complete(task)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/nest_asyncio.py", line 88, in run_until_complete
    f = asyncio.ensure_future(future, loop=self)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/tasks.py", line 730, in ensure_future
    raise ValueError('The future belongs to a different loop than '
                    'the one specified as the loop argument')
ValueError: The future belongs to a different loop than the one specified as the loop argument
Exception in thread thread2:
[2025-07-03 22:21:17,759][INFO][thread1] Connected
Traceback (most recent call last):
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/threading.py", line 1041, in _bootstrap_inner
    self.run()
    ~~~~~~~~^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/threading.py", line 992, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/damon/Desktop/workspace/IronCondor/src/ironcondor/util/example.py", line 31, in target_func
    util.run(my_loop())
    ~~~~~~~~^^^^^^^^^^^
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/ib_async/util.py", line 374, in run
    result = loop.run_until_complete(task)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/ironcondor/EhF6heFF/ironcondor/lib/python3.13/site-packages/nest_asyncio.py", line 88, in run_until_complete
    f = asyncio.ensure_future(future, loop=self)
  File "/Users/damon/Library/Application Support/hatch/env/virtual/.pythons/3.13/python/lib/python3.13/asyncio/tasks.py", line 730, in ensure_future
    raise ValueError('The future belongs to a different loop than '
                    'the one specified as the loop argument')
ValueError: The future belongs to a different loop than the one specified as the loop argument
[2025-07-03 22:21:17,776][INFO][thread1] Logged on to server version 178
[2025-07-03 22:21:17,822][INFO][thread1] Warning 2104, reqId -1: Market data farm connection is OK:usfarm
[2025-07-03 22:21:17,822][INFO][thread1] Warning 2106, reqId -1: HMDS data farm connection is OK:ushmds
[2025-07-03 22:21:17,822][INFO][thread1] Warning 2158, reqId -1: Sec-def data farm connection is OK:secdefil
[2025-07-03 22:21:17,823][INFO][thread1] API connection ready

damonYuan avatar Jul 03 '25 14:07 damonYuan

Another finding in util.run() is that the loop = getLoop() will return same object in the 2 threads, verified by

print(id(loop))

my python version is Python 3.13.1

damonYuan avatar Jul 03 '25 14:07 damonYuan

@functools.cache # to be removed or same loop being returned for even different threads
def getLoop():
    """Get asyncio event loop or create one if it doesn't exist."""
    try:
        # https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.get_running_loop
        loop = asyncio.get_running_loop()  # I think we should use get_event_loop() here
    except RuntimeError:
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)

    return loop

get_event_loop() will check the running loop first and if not found will check the event loop policy. If we create a loop manually and set it to the asyncio, we can get it from the event loop policy but not the running loop as it has not been run when util.run() is called.

And @functools.cache will make the calls from different threads getting the same loop...

damonYuan avatar Jul 03 '25 15:07 damonYuan

Thanks for the example and tracking down what broke for you there!

I'll look into a bit more, but short answer is also "threads are a bad design choice for this usage, so probably don't use threads" — the entire reason the async system exists in the first place is because it is safer and easier to understand than getting too involved with threads and locking/synchronization issues between everything.

The async work is fast and reliable and greatly reduces complexity over any thread introductions. In my own system I manage watching 20 to 60 live symbols per client all with pure 100% event loop async updates in a single thread. There's no problems or performance issues around data or updates or latency or speed issues. Computers are fast, etc.

Of course, the big design decision is around: what does an IBKR client do exactly? Also in my systems, my client ONLY does market data display and trade execution. All other indicators and algos and signal generation is performed by another "signal generator" server (which acts as its own IBKR client but in "readonly mode" just for data subscriptions). The IBKR trade clients then subscribe to the signal generator for updates, saving them from any potentially blocking algo processing overhead. The IBKR clients just parse an inbound websocket datafeed in the same async way it processes market and account and trade updates.

Threads could potentially do typical "thread things" like background I/O work if necessary, but launching multiple primary-network-client threads into a single concurrent system is a good way to increase complexity unnecessarily over time — and when code problems can cause irreversible monetary losses, developing for stability and understandability is more important than ever.

It's preferred to run completely independent clients with their own input arguments/parameters instead of running it all under one process. The IBKR API supports this well by just using different client IDs per process and then you get better for stability, partial platform updates by restarting individual components, more isolation and control over individual actions, etc.

Then what about "synchronization" and "sharing state" of course, but then we're back to logic of: if everything needs to share immediate state as safely as possible, running everything async is preferred. Reliance on concurrent shared memory or shared data structures ends up with locks and slowdowns behind the scenes anyway. The other alternative is constructing an external "portfolio microservice" or, as mentioned previously, "algo/indicator/event trigger" service which all the clients subscribe to for receiving/sending centralized updates (also increasing the stability of the trading interface since there's no data processing/algo generation done internally to conflict with trade processing or receiving portfolio or market event updates).

mattsta avatar Jul 04 '25 16:07 mattsta

@mattsta thanks for the reply while let's pull back a bit from the the design as it's better to reflect the business reasons in different cases.

In my case I use python and ib_insync/ib_async and in sync mode because it's simple, and I just want to test the idea quickly; and I am using the threading model to isolate the same strategy for different symbols, no new process because it's too much for research. Also I have more symbols to monitor as I am tracking the options, 1 underlying can have more than 60 symobls, subscription mode won't work as IB blocks more than 100 subscriptions for my mkt data plan.

We need to clarify what the def getLoop() want to do. IHO, I think it intends to get the running loop for the current calling thread, if on one found, create a NEW one and return it.

If my understand is correct, there are two issues to be addressed. 1. @functools.cache could cache one loop which had been returned to other thread; 2. asyncio.get_running_loop() won't check the new created UNRUNNING loop which is also problematic, as the new created one could just have not been started yet, but it's ready to use and we don't need to create a new one.

damonYuan avatar Jul 06 '25 04:07 damonYuan

Same issue here, this is really a big problem for me, it breaks everything for our system.

I get it that threads are not good design here, but we have system up and running, we cannot just "dont use threads". Threads are still common thing, like a web application usually have multiple threads, and people have reasons to use threads.

Can you guys please make it so that it at least can work with threads just like version 1.0.3? If your decision is "don't use threads", then please update the readme to highlight this, and we will just stick to 1.0.3 for now.

Thanks.

tsunamilx avatar Jul 28 '25 12:07 tsunamilx