confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

json_consumer always return None

Open alexnguyen2201 opened this issue 3 years ago • 3 comments

https://github.com/confluentinc/confluent-kafka-python/blob/80ea78c8e46823b68372448e49153723dfffdbf8/examples/json_consumer.py#L109

I play with 2 files: json_consumer, json_producer. But json_consumer always return None

And then I try play with json_producer and consumer. I can't convert data to dict type with command:

  msg = msg.value()
  msg = json.loads(msg)

It raise:

    return codecs.utf_32_be_decode(input, errors, True)
UnicodeDecodeError: 'utf-32-be' codec can't decode bytes in position 4-7: code point not in range(0x110000)

If I try:

                msg = msg.value()
                msg = msg.decode('utf-8')
                for i in msg:
                    print(i)
Screen Shot 2022-05-18 at 21 07 45

Some thing like space at leading. But I try: print(msg[0] == ' ') , it return False.

I try a lot but fails

alexnguyen2201 avatar May 18 '22 14:05 alexnguyen2201

Hi @alexnguyen2201 , thanks for asking.

The json_consumer always return None because there is no message delivered to the topic. So consumer can't consume anything. Please check your producer to make sure there are msgs delivered to the topic.

jliunyu avatar May 19 '22 22:05 jliunyu

Thank you very much, I solved the problem.

But when I try to add another str_schema with another Object instead of user_str_schema and User in example code. I found that error:

confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="Schema being registered is incompatible with an earlier schema for subject "example_serde_json-value" (HTTP status code 409, SR code 409)"}

And the name of schema is automatically same as topic name.

So one topic only have one schema. It's seem not true, because I think one topic can send a lot of type of data.

alexnguyen2201 avatar May 20 '22 03:05 alexnguyen2201

secondly. I try another way to define user_to_dict function:

def user_to_dict(user, ctx):

    return dict(id=user.id,
                age=user.age)


def user_to_dict(user, ctx):
    return user.__dict__

But I got an error:

raise ValueSerializationError(se) confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="Object of type InstanceState is not JSON serializable"}

I can't understand. Because I try a lot sample code like:

user1_dict = {'name': 'Nam', 'age': '30'}
user2 = dict(name=user1.name, age=user1.age)
user3 = user1.__dict__

user2_json = json.dumps(user2)
user3_json = json.dumps(user3)

print(user2 == user3)
print(user2_json == user3_json)

It's all True

alexnguyen2201 avatar May 20 '22 04:05 alexnguyen2201

thanks for the question.

we will endeavor to make our documentation around using the JSON serdes better.

mhowlett avatar Oct 25 '22 18:10 mhowlett