django-eventstream icon indicating copy to clipboard operation
django-eventstream copied to clipboard

RedisStorage seems to be broken

Open allo- opened this issue 1 year ago • 4 comments

When using RedisStorage, I get the error: unsupported operand type(s) for -: 'Pipeline' and 'int' at https://github.com/fanout/django-eventstream/blob/2b61b2915e7440629cf73938a530f27ed7705501/django_eventstream/eventstream.py#L58

Testing it a bit, I think this line is wrong: https://github.com/fanout/django-eventstream/blob/2b61b2915e7440629cf73938a530f27ed7705501/django_eventstream/storage.py#L121

and the results of the pipe are only available after pipe.execute(). I wanted to try event_id, _ = pipe.execute(), but the data in the pipe.setex call already needs the event_id.

Printing the result of pipe.execute without changing other code gives for example [2, True].

allo- avatar Aug 06 '24 20:08 allo-

Maybe @erfantarighi can look at this.

jkarneges avatar Aug 15 '24 22:08 jkarneges

@erfantarighi Can you have a look?

I currently have storage disabled because of the issue, but in the project I am developing it would probably be good to have storage sooner or later and as I am already using Redis I think RedisStorage would be the best choice.

allo- avatar Oct 02 '24 15:10 allo-

@allo- I solved this by subclassing RedisStorage and overriding the append_event method. Here's my code, let me know your thoughts:

import json

from django_eventstream.event import Event
from django_eventstream.storage import EVENT_TIMEOUT
from django_eventstream.storage import RedisStorage as BaseRedisStorage


class RedisStorage(BaseRedisStorage):
    def append_event(self, channel: str, event_type: str, data: dict) -> Event:
        """
        Appends a new event to the storage for the specified channel.

        Args:
            channel (str): The name of the channel to append the event to.
            event_type (str): The type of the event.
            data (dict): The data associated with the event.

        Returns:
            Event: An Event object representing the appended event.
        """
        with self.redis.pipeline() as pipe:
            try:
                _event_id = pipe.incr("event_counter:" + channel)
                event_data = json.dumps({"type": event_type, "data": data})
                pipe.setex(
                    "event:" + channel + ":" + str(_event_id),
                    EVENT_TIMEOUT * 60,
                    event_data,
                )
                event_id, _ = pipe.execute()
                return Event(channel, event_type, data, id=event_id)

            except ConnectionError as e:
                raise ConnectionError("Failed to append event to Redis.") from e
EVENTSTREAM_STORAGE_CLASS = "myapp.storages.RedisStorage"

stefanofusai avatar Feb 14 '25 15:02 stefanofusai

@allo- I solved this by subclassing RedisStorage and overriding the append_event method. Here's my code, let me know your thoughts:

import json

from django_eventstream.event import Event from django_eventstream.storage import EVENT_TIMEOUT from django_eventstream.storage import RedisStorage as BaseRedisStorage

class RedisStorage(BaseRedisStorage): def append_event(self, channel: str, event_type: str, data: dict) -> Event: """ Appends a new event to the storage for the specified channel.

    Args:
        channel (str): The name of the channel to append the event to.
        event_type (str): The type of the event.
        data (dict): The data associated with the event.

    Returns:
        Event: An Event object representing the appended event.
    """
    with self.redis.pipeline() as pipe:
        try:
            _event_id = pipe.incr("event_counter:" + channel)
            event_data = json.dumps({"type": event_type, "data": data})
            pipe.setex(
                "event:" + channel + ":" + str(_event_id),
                EVENT_TIMEOUT * 60,
                event_data,
            )
            event_id, _ = pipe.execute()
            return Event(channel, event_type, data, id=event_id)

        except ConnectionError as e:
            raise ConnectionError("Failed to append event to Redis.") from e

EVENTSTREAM_STORAGE_CLASS = "myapp.storages.RedisStorage"

Hi, could you make a PR with tests ?

enzofrnt avatar Mar 31 '25 11:03 enzofrnt