tinvest
tinvest copied to clipboard
Перестают приходить обновления в Streaming во время торговой сессии
Когда вызываю метод своего класса, реализующий прослушивание событий - в какой-то момент (обычно в самый пик активности торгов) наблюдаю прекращение потока данных в Streaming без ошибок или каких-либо сообщений:
import tinvest as ti
# Пример класса Screener для запуска прослушивания событий:
class Screener:
# default settings
# ...
async def run(self):
if not self.running:
self.running = True
async with ti.Streaming(API_KEY) as streaming:
self.streaming = streaming
async for event in streaming:
payload = event.payload
if isinstance(event, ti.CandleStreamingResponse):
# ВОТ ЭТОТ ЛОГ ПЕРЕСТАЁТ ВЫВОДИТЬСЯ:
screener_logger.info(f"Screener: {payload.figi}: {payload.time} | {payload.o} | {payload.h} "
f"| {payload.l} | {payload.c} | {payload.v}")
if payload.figi not in self.signal_figi_set:
self.signal_figi_set.add(payload.figi)
self.candle_queue.put(payload)
m15 = int(payload.time.minute / 15) * 15
last_candle_m15_time = offset_aware_datetime(
datetime(payload.time.year, payload.time.month, payload.time.day,
payload.time.hour, m15))
if not self.last_m15_times.get(payload.figi):
self.last_m15_times[payload.figi] = None
# Если новая 15-ти минутка - добавляем сигнал в очередь
same_m15 = last_candle_m15_time == self.last_m15_times[payload.figi]
self.last_m15_times[payload.figi] = last_candle_m15_time
was_inserted_new = await update_candle(payload, True)
if not same_m15:
self.start_threading_calc(payload, was_inserted_new)
# Отдельный loop для обновления списков фиги для отслеживания
async def update_figis(self):
screener_logger.info('Screener: update figis')
while True:
if not self.first_init_candles and self.streaming:
await self.init_figis()
elif self.streaming:
# ВОТ ЭТОТ ЛОГ ПРОДОЛЖАЕТ ВЫВОДИТЬСЯ ПО УКАЗАННОМУ ИНТЕРВАЛУ:
screener_logger.info('Screener: New update loop')
new_figis, removed_figis = await self.get_figis()
for figi in removed_figis:
await self.unsubscribe(figi)
for figi in new_figis:
await self.get_candles(figi)
await asyncio.sleep(60 if self.first_init_candles else 5)
# ...
# Запуск Screener-а
async def main(scr: Screener):
screener_logger.info('Main started')
await asyncio.gather(
on_startup_gino(dp),
scr.run(),
)
# Класс для проверки значений свечки в параллельном процессе:
class CandlesWorker(multiprocessing.Process):
def __init__(self, num_thread, candle_queue: multiprocessing.Queue, from_candle_queue: multiprocessing.Queue):
super().__init__()
self.num_thread = num_thread
self.queue = candle_queue
self.from_candle_queue = from_candle_queue
def run(self):
t_st = time()
between_callback(cycle_candle_update, self.queue, self.from_candle_queue)
screener_logger.debug(f'Screener start candle_process #{self.num_thread} took {time() - t_st}')
# ...
# Запуск программы:
if __name__ == '__main__':
t_start = time()
queue = multiprocessing.Queue()
from_task_queue = multiprocessing.Queue()
candle_queue = multiprocessing.Queue()
from_candle_queue = multiprocessing.Queue()
t_end = time()
screener_logger.debug(f'Screener: creating Queues took {t_end - t_start}')
calculations_process = Worker(0, queue, from_task_queue)
candles_process = CandlesWorker(1, candle_queue, from_candle_queue)
screener_logger.debug(f'Screener: creating Workers took {time() - t_end}')
calculations_process.start()
candles_process.start()
screener = Screener(queue, from_task_queue, candle_queue, from_candle_queue)
asyncio.run(main(screener))
Даже не знаю, какой вопрос правильно было бы задать:
- Как отследить состояние streaming, чтобы можно было, например, переподписаться на текущий список figi? Потому что никаких сообщений об ошибках не отлавливается, когда прекращают приходить данные. При этом, если перезапустить скрипт - то все подписки заново успешно создаются.
- Может быть этот пример некорректно реализует саму подписку? Я постарался выделить все расчёты в отдельный процесс, чтобы не нагружать eventloop в текущем процессе с подписками, так как изначально подумал, что проблема может быть в переполнении asyncio.Queue.
Вот такой пример лога сегодня:
После этого задержки не начинаются?
was_inserted_new = await update_candle(payload, True)
Во время подписки получаем событие. Надо на стороне приложения знать на что подписан, а на что нет. Может быть просто срабатывает unsubscribe, я не вижу логику, где происходит subscribe
Иногда возникают: как раз в период очень активных торгов - до 12 минут максимум доходило, но потом в эту же торговую сессию задержка уменьшается, когда активность торгов падает. Период существования задержки не очень критичный.
Метод unsubscribe вызываю, только если удаляю эмитента из портфеля:
for figi in removed_figis:
await self.unsubscribe(figi) # здесь отписка при удалении
for figi in new_figis:
await self.get_candles(figi) # здесь в цикле сначала по REST скачиваю на нужный период данные, а потом создаётся подписка для каждого нового эмитента в портфеле
Это метод Screener.get_candles:
# Пример класса Screener для запуска прослушивания событий:
class Screener:
# .......
async def get_candles(self, figi: str):
now_date = offset_aware_datetime(datetime.utcnow())
days_diff = 1
days_count = 133
last_candle = await get_candles(figi, 1)
last_candle = last_candle[0] if len(last_candle) > 0 else None
last_time = None
if last_candle:
last_time = offset_aware_datetime(last_candle.time)
days_diff = days_count - min(days_count, (now_date - last_time).days)
if days_diff == 0:
last_time = None
# Асинхронный клиент:
client = ti.AsyncClient(API_KEY, use_sandbox=True)
has_error = False
i = 0
while days_diff <= days_count:
df = (last_time if last_time and i == 0 else offset_aware_datetime(now_date - timedelta(days=days_count - days_diff + 1)))
dt = offset_aware_datetime(now_date - timedelta(days=days_count - days_diff))
# ограничение частоты запросов
await self.rate_limit.acquire(self.wait_on_rate_limit)
screener_logger.info(f'Screener: make request for {figi} {df}-{dt}')
try:
candle_response = await client.get_market_candles(figi, df, dt, self.candle_resolution)
if candle_response.status == 'Ok':
payload = candle_response.payload
if len(payload.candles) > 0:
if last_candle and payload.candles[0].time == last_candle.time:
await update_candle(payload.candles[0])
new_insert_candles = payload.candles[1:]
if len(new_insert_candles) > 0:
await Candles.bulk_upsert(new_insert_candles)
else:
await Candles.bulk_upsert(payload.candles)
except InterfaceError as e:
screener_logger.error(e)
await admins_send_message(dp, f'Ошибка запроса для {figi}: {e}')
has_error = True
break
except Exception as e:
screener_logger.error(e)
await admins_send_message(dp, f'Ошибка запроса для {figi} c {df} по {dt}: {e}')
has_error = True
break
days_diff = days_diff + 1
i = i + 1
# Закрываем клиент
await client.close()
if has_error:
return
# await admins_send_message(dp, f'Данные для {figi} успешно скачаны')
# Если всё ок - подписываемся на тикер и добавляем его в список активных подписок
screener_logger.info(f'Screener: add {figi}')
self.current_tickers_figi.add(figi)
await self.streaming.candle.subscribe(figi, self.candle_resolution)
await self.streaming.instrument_info.subscribe(figi)
до 12 минут
Это много. Попробую асинхронно действия производить, сам цикл
async for event in streaming: ...
не должен лочиться
Сегодня такая задержка на текущее время:
Может просто не успевает обрабатываться очередь и переполняется? А когда лимита достигает - лочится?
If maxsize is less than or equal to zero, the queue size is infinite.
Посмотрю, что можно сделать. В будни можно будет проверить