dipdup
dipdup copied to clipboard
Add batch handler option in index
What feature would you like to see in DipDup?
I want to create handler that handle batch of events instead of single event.
Current setup:
raw_events_uniswap_v2_pair_sync:
kind: evm.subsquid.events
datasource: subsquid_node
first_level: 10000835
handlers:
- callback: on_pair_sync
contract: uniswap_v2_pair
name: Sync
I want my handler have option like batch: true
raw_events_uniswap_v2_pair_sync:
kind: evm.subsquid.events
datasource: subsquid_node
first_level: 10000835
handlers:
- callback: on_pair_sync
contract: uniswap_v2_pair
batch: true
name: Sync
on_pair_sync will process batch of events instead of single events.
Why do you need this feature, what's the use case?
Most of my index handlers are only inserting raw events to the postgresql table.
All processing logic are implemented in hooks.
Batch handler will speed up the indexing process.
Is there a workaround currently?
I'm experimenting with batch sender that looks like this:
import asyncio
import logging
import signal
from dipdup.context import HandlerContext
from dipdup.models.evm_subsquid import SubsquidEvent
from indexer.models import EventsUniswapV2PairSync
from indexer.types.uniswap_v2_pair.evm_events.sync import Sync
logger = logging.getLogger("dipdup")
BATCH: list[EventsUniswapV2PairSync] = []
async def batch_insert(event: EventsUniswapV2PairSync):
BATCH.append(event)
batch_count = len(BATCH)
if batch_count >= 100:
await EventsUniswapV2PairSync.bulk_create(BATCH, ignore_conflicts=True)
BATCH.clear()
logger.info("%s Sync events inserted to the database", batch_count)
return
async def send_batch_on_shutdown():
try:
while True:
await asyncio.sleep(60 * 60)
finally:
batch_count = len(BATCH)
logger.info(
"dipdup shutting down, inserting %s Sync events to the database...",
batch_count,
)
if batch_count > 0:
await EventsUniswapV2PairSync.bulk_create(
BATCH, ignore_conflicts=True
)
logger.info(
"dipdup shutting down, inserting %s Sync events to the database... DONE",
batch_count,
)
BATCH.clear()
asyncio.create_task(send_batch_on_shutdown()) # noqa: RUF006
async def on_pair_sync(
ctx: HandlerContext,
event: SubsquidEvent[Sync],
) -> None:
block_number = event.data.level
block_timestamp = event.data.timestamp
transaction_index = event.data.transaction_index
log_index = event.data.log_index
sync_id = f"{block_number}-{transaction_index}-{log_index}"
pair_id = event.data.address.lower()
reserve0 = str(event.payload.reserve0)
reserve1 = str(event.payload.reserve1)
event = EventsUniswapV2PairSync(
id=sync_id,
block_number=block_number,
block_timestamp=block_timestamp,
transaction_index=transaction_index,
log_index=log_index,
pair_id=pair_id,
reserve0=reserve0,
reserve1=reserve1,
)
await batch_insert(event=event)
Hi @pyk!
I like your workaround, it's smart! We had already thought about the batch processing previously, but haven't done any tests or benchmarks. So, next are just my speculations.
This task likely could be solved more efficiently on the framework level, since DipDup opens new transactions for every level there are handlers matched regardless of the number of SQL ops inside. It could be that empty transactions are somehow optimized either by Tortoise or by PostgreSQL, but they are definitely not free.
If you'd like to help with this task and dive into DipDup's guts, an entry point for experiments is dipdup.transactions.TransactionsManager.in_transaction method. It should be patched to reuse the same transaction until the batch is full (or even by time intervals). Something similar to your handler-level solution.
Implemented with fixed size, followup #1091