stan.py icon indicating copy to clipboard operation
stan.py copied to clipboard

Lost data for subscription

Open hashbash opened this issue 3 years ago • 0 comments

Hello! I'm new to nats/stan and sometimes I lost part of messages from NATS. Approximative 10 minutes of data every day (messages in 10-minute interval). I'm using this python code as systemd service.

Any idea why it happens?

import traceback
import signal
import asyncio
import logging
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime

from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN


NATS_SERVERS = [
    "nats://positionaq1.dp.some_server.ru:4222",
    "nats://positionaq2.dp.some_server.ru:4222",
]

NATS_SUBJECT = 'positions.changes_by_events'
NATS_CLUSTER_ID = 'gb_analytics'
NATS_CLIENT_ID = 'olap-orr-nats-listener-service'
NATS_CONNECT_TIMEOUT = 20
NATS_MAX_RECONNECT_ATTEMPTS = 5


nats_data_path = '/opt/nats_data/dp/nats.data'
nats_logs_path = '/var/log/nats/dp/nats.log'


def setup_logger(name, log_path, log_format, when, interval=1, level=logging.INFO):
    handler = TimedRotatingFileHandler(log_path, when=when, interval=interval)
    formatter = logging.Formatter(log_format)
    handler.setFormatter(formatter)
    logger = logging.getLogger(name)
    logger.setLevel(level)
    logger.addHandler(handler)

    return logger


base_logger = setup_logger(name='Nats logger', log_path=nats_logs_path, when='midnight', interval=356,
                           log_format='%(asctime)s %(name)s %(levelname)s %(message)s')
data_writer = setup_logger(name='Data writer', log_path=nats_data_path, when='m', interval=5,
                           log_format='%(message)s')


async def run(loop):
    nc = NATS()
    sc = STAN()

    async def closed_cb():
        print("Connection to NATS is closed.")
        await asyncio.sleep(0.1, loop=loop)
        loop.stop()

    async def reconnected_cb():
        print(f"Reconnected to NATS at {nc.connected_url.netloc}...")

    try:
        await nc.connect(servers=NATS_SERVERS, io_loop=loop, connect_timeout=NATS_CONNECT_TIMEOUT,
                         closed_cb=closed_cb, reconnected_cb=reconnected_cb,
                         max_reconnect_attempts=NATS_MAX_RECONNECT_ATTEMPTS)
        await sc.connect(NATS_CLUSTER_ID, NATS_CLIENT_ID, nats=nc, connect_timeout=NATS_CONNECT_TIMEOUT)
    except Exception:
        print(traceback.format_exc())
        loop.stop()

    print(f"Connected to NATS at {nc.connected_url.netloc}")

    def signal_handler():
        if nc.is_closed:
            return
        print("Disconnecting...")
        loop.create_task(nc.close())

    for sig in ('SIGINT', 'SIGTERM'):
        loop.add_signal_handler(getattr(signal, sig), signal_handler)

    async def cb(msg):
        print(msg)  # available message meta info from journalctl
        base_logger.info('Message received. sequence: %s. time: %s. data_len: %d'
                         % (msg.seq, datetime.now().isoformat(), len(msg.data)))
        data_writer.info("""{"sequence": "%s", "time": "%s", "data": "%s"}"""
                         % (msg.seq, datetime.now().isoformat(), msg.data))

    await sc.subscribe(NATS_SUBJECT, cb=cb)


def exception_handler(loop, context):
    loop.default_exception_handler(context)
    exc = context.get('exception')
    if isinstance(exc, (asyncio.TimeoutError, asyncio.CancelledError)):
        print(context)
        loop.stop()


if __name__ == '__main__':
    base_logger.info('Run nats subscriber...')
    base_logger.info('Nats params: NATS_SUBJECT=%s, NATS_CLUSTER_ID=%s, NATS_CLIENT_ID=%s, NATS_CONNECT_TIMEOUT=%d, '
                     'NATS_SERVERS=%s' % (NATS_SUBJECT, NATS_CLUSTER_ID, NATS_CLIENT_ID,
                                          NATS_CONNECT_TIMEOUT, NATS_SERVERS))
    main_loop = asyncio.get_event_loop()
    main_loop.set_exception_handler(exception_handler)
    main_loop.run_until_complete(run(main_loop))
    try:
        main_loop.run_forever()
    finally:
        main_loop.close()

and service file:

[Unit]
Description=Nats dp subscriber daemon
After=network.target

[Service]
Type=simple
WorkingDirectory=/root/orr-nats-subscriber/dp
ExecStart=/usr/bin/python3 /root/orr-nats-subscriber/dp/nats_subscriber.py
#ExecReload=/bin/kill -s SIGHUP $MAINPID
ExecStop=/bin/kill -s SIGTERM $MAINPID
#Restart=on-failure
Restart=always
RestartSec=5s

[Install]

hashbash avatar Jul 16 '20 09:07 hashbash