solie
solie copied to clipboard
task durations and aggregate trades can be more generic with `TimedQueue`
import asyncio
import time
class TimedQueue:
"""A queue that only stores items added within a certain time window."""
def __init__(self, max_age):
"""
Initialize the TimedQueue.
Parameters:
max_age (float): The maximum age (in seconds) allowed for items in the queue.
"""
self.queue = asyncio.Queue()
self.max_age = max_age
self.loop = asyncio.get_event_loop()
async def add_item(self, item):
"""
Add an item to the queue.
Parameters:
item: The item to add to the queue.
"""
await self.queue.put((time.time(), item))
await self._cleanup()
async def _cleanup(self):
"""Remove items from the queue that are older than the maximum age."""
current_time = time.time()
while not self.queue.empty():
timestamp, _ = await self.queue.get()
if current_time - timestamp <= self.max_age:
self.queue.put_nowait((timestamp, item))
break
async def get_items(self):
"""
Retrieve all items currently in the queue.
Returns:
list: A list of items in the queue.
"""
return [item for _, item in list(self.queue._queue)]
async def main():
timed_queue = TimedQueue(10)
await timed_queue.add_item("First item")
await asyncio.sleep(5) # Simulate some time passing
await timed_queue.add_item("Second item")
await asyncio.sleep(6) # More time passing
await timed_queue.add_item("Third item")
items = await timed_queue.get_items()
print(items)
asyncio.run(main())