confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Differentiate Schema Registry Communication Failure From SerializerError

Open bradsnay opened this issue 3 years ago • 0 comments

Overview

When using the Cached Schema Registry client with a URL that has moved or been decommissioned, calls to AvroConsumer.poll() will raise a SerializerError. This is the same error that is received when a message is malformed (i.e. "message does not start with magic byte") which makes it impossible to handle the cases differently.

For example, when I receive a malformed message, I may choose to ignore it since it is not processable. To determine if the message is malformed, I will catch the SerializerError exception when I call poll, log that I received a malformed message and commit my offsets.

What I've seen happen is the message is correctly formatted but the call to the Schema Registry fails with a 503 HTTP response code because the URL has moved or been decommissioned. This results in my application following the same code path as when it receives a malformed message which results in the message being dropped without processing.

Desired Outcome

Presently, this code will drop any message that results in a SerializerError:

try:
    schema_registry = CachedSchemaRegistryClient({"url": "some_url"})
    avro_consumer = AvroConsumer(config, schema_registry=schema_registry)
    msg = avro_consumer.poll()
except SerializerError as se:
    logger.warning("Failed to poll Kafka for messages.", {"error": str(se)})

if msg is not None:
    # Do something with msg.
    ...
    avro_consumer.commit(msg)

Ideally, we would be able to catch a separate exception when the Schema Registry is not available so we can treat that error differently, in this case re-raising the exception to prevent committing offsets instead of just catching it:

try:
    schema_registry = CachedSchemaRegistryClient({"url": "some_url"})
    avro_consumer = AvroConsumer(config, schema_registry=schema_registry)
    msg = avro_consumer.poll()
except SchemaRegistryUnavailable as sru: # Actual exception name TBD. Maybe use an existing one such as ClientError
    logger.error("Schema Registry is Unavailable", {"error": str(sru)})
    raise
except SerializerError as se:
    logger.warning("Failed to poll Kafka for messages.", {"error": str(se)})

if msg is not None:
    # Do something with msg.
    ...
    avro_consumer.commit(msg)

To keep backwards compatibility with existing systems, the new exception can be an extension of the existing SerializerError. This would ensure that existing code which catches SerializerError would be unaffected while clients running the newer release can explicitly catch the new exception if desired.

Code Locations

Non-200 HTTP code returned (i.e. 503): https://github.com/confluentinc/confluent-kafka-python/blob/e7e066120f05ca6f9c37b18d77f974f1a6cad222/src/confluent_kafka/avro/cached_schema_registry_client.py#L309

Subsequent SerializerError raised: https://github.com/confluentinc/confluent-kafka-python/blob/e7e066120f05ca6f9c37b18d77f974f1a6cad222/src/confluent_kafka/avro/serializer/message_serializer.py#L169

bradsnay avatar Apr 20 '22 19:04 bradsnay