arroyo icon indicating copy to clipboard operation
arroyo copied to clipboard

Really Confusing Error

Open chokosabe opened this issue 8 months ago • 1 comments

Created this Strategy based off the examples.

import json
import logging
from typing import Callable, Mapping, TypeVar

from arroyo.backends.kafka.consumer import KafkaPayload
from arroyo.processing.strategies import CommitOffsets, RunTask
from arroyo.processing.strategies.abstract import (
    ProcessingStrategy,
    ProcessingStrategyFactory,
)
from arroyo.types import Commit, Message, Partition

logger = logging.getLogger(__name__)

T = TypeVar('T')

class DBPersistStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
    """
    A factory that builds a processing strategy for persisting messages to a database.
    
    The processing function should handle the logic for transforming and saving the messages.
    """

    def __init__(self, processing_function: Callable[[Message[KafkaPayload]], None]):
        self.__processing_function = processing_function

    def create_with_partitions(
        self,
        commit: Commit,
        partitions: Mapping[Partition, int],
    ) -> ProcessingStrategy[KafkaPayload]:

        return RunTask(
            function=self.__processing_function,
            next_step=CommitOffsets(commit),
        )

Trying to run it in tests and getting this error:

                except InvalidMessage as e:
                    self._handle_invalid_message(e)
    
                else:
                    # Resume if we are currently in a paused state
                    if self.__is_paused:
                        self.__metrics_buffer.incr_counter("arroyo.consumer.resume", 1)
                        self.__consumer.resume([*self.__consumer.tell().keys()])
                        self.__is_paused = False
    
                    # Clear backpressure timestamp if it is set
                    self._clear_backpressure()
    
                    self.__message = None
        else:
            if self.__message is not None:
>               raise InvalidStateError(
                    "received message without active processing strategy"
                )
E               arroyo.processing.processor.InvalidStateError: received message without active processing strategy

/usr/local/lib/python3.10/site-packages/arroyo/processing/processor.py:457: InvalidStateError

My question is - what is an active processing strategy?!

Thanks - really need some sane basic examples. The 2 currently there not enough

chokosabe avatar May 29 '24 15:05 chokosabe