tinvest icon indicating copy to clipboard operation
tinvest copied to clipboard

Перестают приходить обновления в Streaming во время торговой сессии

Open PavelMinev opened this issue 3 years ago • 6 comments

Когда вызываю метод своего класса, реализующий прослушивание событий - в какой-то момент (обычно в самый пик активности торгов) наблюдаю прекращение потока данных в Streaming без ошибок или каких-либо сообщений:

import tinvest as ti

# Пример класса Screener для запуска прослушивания событий:
class Screener:
    # default settings
    # ...
    async def run(self):
        if not self.running:
            self.running = True
            async with ti.Streaming(API_KEY) as streaming:
                self.streaming = streaming
                async for event in streaming:
                    payload = event.payload
                    if isinstance(event, ti.CandleStreamingResponse):
                        # ВОТ ЭТОТ ЛОГ ПЕРЕСТАЁТ ВЫВОДИТЬСЯ:
                        screener_logger.info(f"Screener: {payload.figi}: {payload.time} | {payload.o} | {payload.h} "
                                             f"| {payload.l} | {payload.c} | {payload.v}")
                        if payload.figi not in self.signal_figi_set:
                            self.signal_figi_set.add(payload.figi)
                            self.candle_queue.put(payload)
                        m15 = int(payload.time.minute / 15) * 15
                        last_candle_m15_time = offset_aware_datetime(
                            datetime(payload.time.year, payload.time.month, payload.time.day,
                                     payload.time.hour, m15))
                        if not self.last_m15_times.get(payload.figi):
                            self.last_m15_times[payload.figi] = None

                        # Если новая 15-ти минутка - добавляем сигнал в очередь
                        same_m15 = last_candle_m15_time == self.last_m15_times[payload.figi]
                        self.last_m15_times[payload.figi] = last_candle_m15_time
                        was_inserted_new = await update_candle(payload, True)
                        if not same_m15:
                            self.start_threading_calc(payload, was_inserted_new)

    # Отдельный loop для обновления списков фиги для отслеживания
    async def update_figis(self):
        screener_logger.info('Screener: update figis')
        while True:
            if not self.first_init_candles and self.streaming:
                await self.init_figis()
            elif self.streaming:
                # ВОТ ЭТОТ ЛОГ ПРОДОЛЖАЕТ ВЫВОДИТЬСЯ ПО УКАЗАННОМУ ИНТЕРВАЛУ:
                screener_logger.info('Screener: New update loop')
                new_figis, removed_figis = await self.get_figis()
                for figi in removed_figis:
                    await self.unsubscribe(figi)
                for figi in new_figis:
                    await self.get_candles(figi)
            await asyncio.sleep(60 if self.first_init_candles else 5)
    # ...


# Запуск Screener-а
async def main(scr: Screener):
    screener_logger.info('Main started')
    await asyncio.gather(
        on_startup_gino(dp),
        scr.run(),
    )


# Класс для проверки значений свечки в параллельном процессе:
class CandlesWorker(multiprocessing.Process):
    def __init__(self, num_thread, candle_queue: multiprocessing.Queue, from_candle_queue: multiprocessing.Queue):
        super().__init__()
        self.num_thread = num_thread
        self.queue = candle_queue
        self.from_candle_queue = from_candle_queue

    def run(self):
        t_st = time()
        between_callback(cycle_candle_update, self.queue, self.from_candle_queue)
        screener_logger.debug(f'Screener start candle_process #{self.num_thread} took {time() - t_st}')
# ...

# Запуск программы:
if __name__ == '__main__':
    t_start = time()
    queue = multiprocessing.Queue()
    from_task_queue = multiprocessing.Queue()
    candle_queue = multiprocessing.Queue()
    from_candle_queue = multiprocessing.Queue()
    t_end = time()
    screener_logger.debug(f'Screener: creating Queues took {t_end - t_start}')

    calculations_process = Worker(0, queue, from_task_queue)
    candles_process = CandlesWorker(1, candle_queue, from_candle_queue)
    screener_logger.debug(f'Screener: creating Workers took {time() - t_end}')
    calculations_process.start()
    candles_process.start()

    screener = Screener(queue, from_task_queue, candle_queue, from_candle_queue)

    asyncio.run(main(screener))

Даже не знаю, какой вопрос правильно было бы задать:

  1. Как отследить состояние streaming, чтобы можно было, например, переподписаться на текущий список figi? Потому что никаких сообщений об ошибках не отлавливается, когда прекращают приходить данные. При этом, если перезапустить скрипт - то все подписки заново успешно создаются.
  2. Может быть этот пример некорректно реализует саму подписку? Я постарался выделить все расчёты в отдельный процесс, чтобы не нагружать eventloop в текущем процессе с подписками, так как изначально подумал, что проблема может быть в переполнении asyncio.Queue.

PavelMinev avatar Jul 27 '21 09:07 PavelMinev

Вот такой пример лога сегодня: image

PavelMinev avatar Jul 27 '21 11:07 PavelMinev

После этого задержки не начинаются?

was_inserted_new = await update_candle(payload, True)

Во время подписки получаем событие. Надо на стороне приложения знать на что подписан, а на что нет. Может быть просто срабатывает unsubscribe, я не вижу логику, где происходит subscribe

daxartio avatar Jul 28 '21 21:07 daxartio

Иногда возникают: как раз в период очень активных торгов - до 12 минут максимум доходило, но потом в эту же торговую сессию задержка уменьшается, когда активность торгов падает. Период существования задержки не очень критичный.

Метод unsubscribe вызываю, только если удаляю эмитента из портфеля:

for figi in removed_figis:
    await self.unsubscribe(figi)  # здесь отписка при удалении
for figi in new_figis:
    await self.get_candles(figi)  # здесь в цикле сначала по REST скачиваю на нужный период данные, а потом создаётся подписка для каждого нового эмитента в портфеле

Это метод Screener.get_candles:

# Пример класса Screener для запуска прослушивания событий:
class Screener:
    # .......

    async def get_candles(self, figi: str):
        now_date = offset_aware_datetime(datetime.utcnow())
        days_diff = 1
        days_count = 133
        last_candle = await get_candles(figi, 1)
        last_candle = last_candle[0] if len(last_candle) > 0 else None
        last_time = None
        if last_candle:
            last_time = offset_aware_datetime(last_candle.time)
            days_diff = days_count - min(days_count, (now_date - last_time).days)
            if days_diff == 0:
                last_time = None
        # Асинхронный клиент:
        client = ti.AsyncClient(API_KEY, use_sandbox=True)

        has_error = False
        i = 0
        while days_diff <= days_count:
            df = (last_time if last_time and i == 0 else offset_aware_datetime(now_date - timedelta(days=days_count - days_diff + 1)))
            dt = offset_aware_datetime(now_date - timedelta(days=days_count - days_diff))
            # ограничение частоты запросов
            await self.rate_limit.acquire(self.wait_on_rate_limit)
            screener_logger.info(f'Screener: make request for {figi} {df}-{dt}')

            try:
                candle_response = await client.get_market_candles(figi, df, dt, self.candle_resolution)
                if candle_response.status == 'Ok':
                    payload = candle_response.payload
                    if len(payload.candles) > 0:
                        if last_candle and payload.candles[0].time == last_candle.time:
                            await update_candle(payload.candles[0])
                            new_insert_candles = payload.candles[1:]
                            if len(new_insert_candles) > 0:
                                await Candles.bulk_upsert(new_insert_candles)
                        else:
                            await Candles.bulk_upsert(payload.candles)
            except InterfaceError as e:
                screener_logger.error(e)
                await admins_send_message(dp, f'Ошибка запроса для {figi}: {e}')
                has_error = True
                break
            except Exception as e:
                screener_logger.error(e)
                await admins_send_message(dp, f'Ошибка запроса для {figi} c {df} по {dt}: {e}')
                has_error = True
                break
            days_diff = days_diff + 1
            i = i + 1
        # Закрываем клиент
        await client.close()
        if has_error:
            return
        # await admins_send_message(dp, f'Данные для {figi} успешно скачаны')

        # Если всё ок - подписываемся на тикер и добавляем его в список активных подписок
        screener_logger.info(f'Screener: add {figi}')
        self.current_tickers_figi.add(figi)
        await self.streaming.candle.subscribe(figi, self.candle_resolution)
        await self.streaming.instrument_info.subscribe(figi)

PavelMinev avatar Jul 29 '21 08:07 PavelMinev

до 12 минут

Это много. Попробую асинхронно действия производить, сам цикл

async for event in streaming: ...

не должен лочиться

daxartio avatar Jul 29 '21 19:07 daxartio

Сегодня такая задержка на текущее время: image

Может просто не успевает обрабатываться очередь и переполняется? А когда лимита достигает - лочится?

PavelMinev avatar Jul 30 '21 13:07 PavelMinev

If maxsize is less than or equal to zero, the queue size is infinite.

Посмотрю, что можно сделать. В будни можно будет проверить

daxartio avatar Jul 31 '21 12:07 daxartio