datayoga
datayoga copied to clipboard
streaming data pipeline platform
using with "@namespace/pipeline" or "@pipeline
how can we produce submodules / namespaces?
So only the problematic record will be rejected and not the entire batch it's part of as result.
`redis.read_stream` producer sends messages one by one, should be in batches based on batch_size arg
Should be changed to: ```python async def produce(self) -> AsyncGenerator[List[Message], None]: logger.debug(f"Running {self.get_block_name()}") read_pending = True while True: # Read pending messages (fetched by us before but not acknowledged) in...