confluent-kafka-python
confluent-kafka-python copied to clipboard
json_consumer always return None
https://github.com/confluentinc/confluent-kafka-python/blob/80ea78c8e46823b68372448e49153723dfffdbf8/examples/json_consumer.py#L109
I play with 2 files: json_consumer, json_producer. But json_consumer always return None
And then I try play with json_producer and consumer. I can't convert data to dict type with command:
msg = msg.value()
msg = json.loads(msg)
It raise:
return codecs.utf_32_be_decode(input, errors, True)
UnicodeDecodeError: 'utf-32-be' codec can't decode bytes in position 4-7: code point not in range(0x110000)
If I try:
msg = msg.value()
msg = msg.decode('utf-8')
for i in msg:
print(i)
Some thing like space at leading. But I try: print(msg[0] == ' ') , it return False.
I try a lot but fails
Hi @alexnguyen2201 , thanks for asking.
The json_consumer always return None because there is no message delivered to the topic. So consumer can't consume anything. Please check your producer to make sure there are msgs delivered to the topic.
Thank you very much, I solved the problem.
But when I try to add another str_schema with another Object instead of user_str_schema and User in example code. I found that error:
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="Schema being registered is incompatible with an earlier schema for subject "example_serde_json-value" (HTTP status code 409, SR code 409)"}
And the name of schema is automatically same as topic name.
So one topic only have one schema. It's seem not true, because I think one topic can send a lot of type of data.
secondly. I try another way to define user_to_dict function:
def user_to_dict(user, ctx):
return dict(id=user.id,
age=user.age)
def user_to_dict(user, ctx):
return user.__dict__
But I got an error:
raise ValueSerializationError(se) confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="Object of type InstanceState is not JSON serializable"}
I can't understand. Because I try a lot sample code like:
user1_dict = {'name': 'Nam', 'age': '30'}
user2 = dict(name=user1.name, age=user1.age)
user3 = user1.__dict__
user2_json = json.dumps(user2)
user3_json = json.dumps(user3)
print(user2 == user3)
print(user2_json == user3_json)
It's all True
thanks for the question.
we will endeavor to make our documentation around using the JSON serdes better.