How Can I get stream one value in HTTP Request?
from faust import App, Record from faust.web import Request, Response, View import asyncio
app = App('ness', broker='kafka://localhost')
data_queues = {}
async def process_message(stream): async for record in stream: yield record
async def process_events(stream): async for event in stream.events(): event = stream.current_event message = event.message topic = event.message.topic print(f"Received message {message} from topic {topic}")
agents = {} topics = {} for i in ['test-30']: agents[i] = app.agent(channel=i, name=app.topic(i))(process_message) topics[i] = app.topic(i)
app.start()
@app.page('/get_data/{topic_name}') async def get_data(self, request: Request, topic_name) -> Response: print(topic_name) if topic_name not in agents: return self.json({'error': 'Topic not found'}, status=404)
agent = agents[topic_name]
data = process_events(agent.stream())
return self.json({'topic_name': topic_name, 'message': 'data'})
if name == 'main': import sys if len(sys.argv) < 2: sys.argv.extend(['worker', '-l', 'info']) app.main()
Versions
- Python version: 3.11
- Faust version: faust-streaming 0.11.1
- Operating system macOS
- Kafka version 2.8
- RocksDB version (if applicable) None
Can you explain your problem and format the code in your comment? It's very hard to understand what it is you want