dipdup icon indicating copy to clipboard operation
dipdup copied to clipboard

Add batch handler option in index

Open pyk opened this issue 1 year ago • 1 comments

What feature would you like to see in DipDup?

I want to create handler that handle batch of events instead of single event.

Current setup:

  raw_events_uniswap_v2_pair_sync:
    kind: evm.subsquid.events
    datasource: subsquid_node
    first_level: 10000835
    handlers:
      - callback: on_pair_sync
        contract: uniswap_v2_pair
        name: Sync

I want my handler have option like batch: true

  raw_events_uniswap_v2_pair_sync:
    kind: evm.subsquid.events
    datasource: subsquid_node
    first_level: 10000835
    handlers:
      - callback: on_pair_sync
        contract: uniswap_v2_pair
        batch: true
        name: Sync

on_pair_sync will process batch of events instead of single events.

Why do you need this feature, what's the use case?

Most of my index handlers are only inserting raw events to the postgresql table.

All processing logic are implemented in hooks.

Batch handler will speed up the indexing process.

Is there a workaround currently?

I'm experimenting with batch sender that looks like this:

import asyncio
import logging
import signal

from dipdup.context import HandlerContext
from dipdup.models.evm_subsquid import SubsquidEvent
from indexer.models import EventsUniswapV2PairSync
from indexer.types.uniswap_v2_pair.evm_events.sync import Sync

logger = logging.getLogger("dipdup")

BATCH: list[EventsUniswapV2PairSync] = []


async def batch_insert(event: EventsUniswapV2PairSync):
    BATCH.append(event)
    batch_count = len(BATCH)

    if batch_count >= 100:
        await EventsUniswapV2PairSync.bulk_create(BATCH, ignore_conflicts=True)
        BATCH.clear()
        logger.info("%s Sync events inserted to the database", batch_count)
    return


async def send_batch_on_shutdown():
    try:
        while True:
            await asyncio.sleep(60 * 60)
    finally:
        batch_count = len(BATCH)
        logger.info(
            "dipdup shutting down, inserting %s Sync events to the database...",
            batch_count,
        )
        if batch_count > 0:
            await EventsUniswapV2PairSync.bulk_create(
                BATCH, ignore_conflicts=True
            )
            logger.info(
                "dipdup shutting down, inserting %s Sync events to the database... DONE",
                batch_count,
            )
            BATCH.clear()


asyncio.create_task(send_batch_on_shutdown())  # noqa: RUF006

async def on_pair_sync(
    ctx: HandlerContext,
    event: SubsquidEvent[Sync],
) -> None:
    block_number = event.data.level
    block_timestamp = event.data.timestamp
    transaction_index = event.data.transaction_index
    log_index = event.data.log_index

    sync_id = f"{block_number}-{transaction_index}-{log_index}"
    pair_id = event.data.address.lower()
    reserve0 = str(event.payload.reserve0)
    reserve1 = str(event.payload.reserve1)

    event = EventsUniswapV2PairSync(
        id=sync_id,
        block_number=block_number,
        block_timestamp=block_timestamp,
        transaction_index=transaction_index,
        log_index=log_index,
        pair_id=pair_id,
        reserve0=reserve0,
        reserve1=reserve1,
    )
    await batch_insert(event=event)

pyk avatar Jan 20 '24 23:01 pyk

Hi @pyk!

I like your workaround, it's smart! We had already thought about the batch processing previously, but haven't done any tests or benchmarks. So, next are just my speculations.

This task likely could be solved more efficiently on the framework level, since DipDup opens new transactions for every level there are handlers matched regardless of the number of SQL ops inside. It could be that empty transactions are somehow optimized either by Tortoise or by PostgreSQL, but they are definitely not free.

If you'd like to help with this task and dive into DipDup's guts, an entry point for experiments is dipdup.transactions.TransactionsManager.in_transaction method. It should be patched to reuse the same transaction until the batch is full (or even by time intervals). Something similar to your handler-level solution.

droserasprout avatar Jan 22 '24 00:01 droserasprout

Implemented with fixed size, followup #1091

droserasprout avatar Sep 06 '24 20:09 droserasprout