RedisStorage seems to be broken
When using RedisStorage, I get the error: unsupported operand type(s) for -: 'Pipeline' and 'int' at https://github.com/fanout/django-eventstream/blob/2b61b2915e7440629cf73938a530f27ed7705501/django_eventstream/eventstream.py#L58
Testing it a bit, I think this line is wrong: https://github.com/fanout/django-eventstream/blob/2b61b2915e7440629cf73938a530f27ed7705501/django_eventstream/storage.py#L121
and the results of the pipe are only available after pipe.execute(). I wanted to try event_id, _ = pipe.execute(), but the data in the pipe.setex call already needs the event_id.
Printing the result of pipe.execute without changing other code gives for example [2, True].
Maybe @erfantarighi can look at this.
@erfantarighi Can you have a look?
I currently have storage disabled because of the issue, but in the project I am developing it would probably be good to have storage sooner or later and as I am already using Redis I think RedisStorage would be the best choice.
@allo- I solved this by subclassing RedisStorage and overriding the append_event method. Here's my code, let me know your thoughts:
import json
from django_eventstream.event import Event
from django_eventstream.storage import EVENT_TIMEOUT
from django_eventstream.storage import RedisStorage as BaseRedisStorage
class RedisStorage(BaseRedisStorage):
def append_event(self, channel: str, event_type: str, data: dict) -> Event:
"""
Appends a new event to the storage for the specified channel.
Args:
channel (str): The name of the channel to append the event to.
event_type (str): The type of the event.
data (dict): The data associated with the event.
Returns:
Event: An Event object representing the appended event.
"""
with self.redis.pipeline() as pipe:
try:
_event_id = pipe.incr("event_counter:" + channel)
event_data = json.dumps({"type": event_type, "data": data})
pipe.setex(
"event:" + channel + ":" + str(_event_id),
EVENT_TIMEOUT * 60,
event_data,
)
event_id, _ = pipe.execute()
return Event(channel, event_type, data, id=event_id)
except ConnectionError as e:
raise ConnectionError("Failed to append event to Redis.") from e
EVENTSTREAM_STORAGE_CLASS = "myapp.storages.RedisStorage"
@allo- I solved this by subclassing
RedisStorageand overriding theappend_eventmethod. Here's my code, let me know your thoughts:import json
from django_eventstream.event import Event from django_eventstream.storage import EVENT_TIMEOUT from django_eventstream.storage import RedisStorage as BaseRedisStorage
class RedisStorage(BaseRedisStorage): def append_event(self, channel: str, event_type: str, data: dict) -> Event: """ Appends a new event to the storage for the specified channel.
Args: channel (str): The name of the channel to append the event to. event_type (str): The type of the event. data (dict): The data associated with the event. Returns: Event: An Event object representing the appended event. """ with self.redis.pipeline() as pipe: try: _event_id = pipe.incr("event_counter:" + channel) event_data = json.dumps({"type": event_type, "data": data}) pipe.setex( "event:" + channel + ":" + str(_event_id), EVENT_TIMEOUT * 60, event_data, ) event_id, _ = pipe.execute() return Event(channel, event_type, data, id=event_id) except ConnectionError as e: raise ConnectionError("Failed to append event to Redis.") from eEVENTSTREAM_STORAGE_CLASS = "myapp.storages.RedisStorage"
Hi, could you make a PR with tests ?