pykafka
pykafka copied to clipboard
Last two messages are not written to the Topic
Below is my code : Problem : Producer not writing the last two messages to the Topic.
`import os
import asyncio
import websockets
from pykafka import KafkaClient
from websockets.extensions import permessage_deflate
class Server:
client = None
kafka_client = None
topic_name = "tpa_19"
topic = None
producer = None
def get_port(self):
return os.getenv('WS_PORT', '10015')
def connect_kafka_client(self):
client = KafkaClient(hosts="localhost:9092", use_greenlets=True)
self.client = client
self.set_topic()
print("Connection Done with Kafka")
def set_topic(self):
self.topic = self.client.topics[self.topic_name]
self.producer = self.topic.get_producer(min_queued_messages=1,max_queued_messages=0,
linger_ms=500)
def get_host(self):
return os.getenv('WS_HOST', 'localhost')
def start(self):
return websockets.serve(self.handler, self.get_host(), self.get_port(), ping_interval=None, max_size=None,
max_queue=None,close_timeout=None,extensions=[
permessage_deflate.ServerPerMessageDeflateFactory(
server_max_window_bits=11,
client_max_window_bits=11,
compress_settings={'memLevel': 4},
),
])
async def send_message_to_kafka(self, producer, row):
try:
# print(row)
producer.produce(row.encode())
except Exception as ex:
print(ex)
async def handler(self, websocket, path):
async for row in websocket:
await self.send_message_to_kafka(self.producer, row)
if __name__ == '__main__':
ws = Server()
ws.connect_kafka_client()
asyncio.get_event_loop().run_until_complete(ws.start())
asyncio.get_event_loop().run_forever()
`