arroyo
arroyo copied to clipboard
Really Confusing Error
Created this Strategy based off the examples.
import json
import logging
from typing import Callable, Mapping, TypeVar
from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import CommitOffsets, RunTask
from arroyo.processing.strategies.abstract import (
ProcessingStrategy,
ProcessingStrategyFactory,
)
from arroyo.types import Commit, Message, Partition
logger = logging.getLogger(__name__)
T = TypeVar('T')
class DBPersistStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
"""
A factory that builds a processing strategy for persisting messages to a database.
The processing function should handle the logic for transforming and saving the messages.
"""
def __init__(self, processing_function: Callable[[Message[KafkaPayload]], None]):
self.__processing_function = processing_function
def create_with_partitions(
self,
commit: Commit,
partitions: Mapping[Partition, int],
) -> ProcessingStrategy[KafkaPayload]:
return RunTask(
function=self.__processing_function,
next_step=CommitOffsets(commit),
)
Trying to run it in tests and getting this error:
except InvalidMessage as e:
self._handle_invalid_message(e)
else:
# Resume if we are currently in a paused state
if self.__is_paused:
self.__metrics_buffer.incr_counter("arroyo.consumer.resume", 1)
self.__consumer.resume([*self.__consumer.tell().keys()])
self.__is_paused = False
# Clear backpressure timestamp if it is set
self._clear_backpressure()
self.__message = None
else:
if self.__message is not None:
> raise InvalidStateError(
"received message without active processing strategy"
)
E arroyo.processing.processor.InvalidStateError: received message without active processing strategy
/usr/local/lib/python3.10/site-packages/arroyo/processing/processor.py:457: InvalidStateError
My question is - what is an active processing strategy?!
Thanks - really need some sane basic examples. The 2 currently there not enough