confluent-kafka-python
confluent-kafka-python copied to clipboard
Hard-deleting a subject fails if the subject has already been soft-deleted
Description
Attempting to hard-delete a subject that has been soft-deleted ends up throwing a 404 error and will never execute the hard-delete API call.
How to reproduce
- Register schema to Schema Registry
- Soft-delete the subject we just registered
- Attempt to programmatically hard-delete the subject using SchemaRegistryClient.delete_subject
Schema Registry returns the follow error
Traceback (most recent call last):
File "/Users/user/confluent-kafka-python-error.py", line 28, in <module>
client.delete_subject(subject, permanent=True)
File "/Users/user/venv/lib/python3.7/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 462, in delete_subject
.format(_urlencode(subject_name)))
File "/Users/user/venv/lib/python3.7/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 130, in delete
return self.send_request(url, method='DELETE')
File "/Users/user/venv/lib/python3.7/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 176, in send_request
response.json().get('message'))
confluent_kafka.schema_registry.error.SchemaRegistryError: Subject 'lorem-ipsum-value' was soft deleted.Set permanent=true to delete permanently (HTTP status code 404, SR code 40404)
Repro Script
from confluent_kafka.schema_registry import Schema, SchemaRegistryClient
if __name__ == '__main__':
subject = 'lorem-ipsum-value'
schema = """
{
"namespace": "confluent.io.examples.serialization.avro",
"name": "User",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string"}
]
}
"""
client: SchemaRegistryClient = SchemaRegistryClient({'url': 'http://localhost:8081'})
# Staging: Register Schema then soft delete it
client.register_schema(subject, Schema(schema_str=schema, schema_type='AVRO'))
client.delete_subject(subject)
# Bug: Attempt to hard-delete a subject that has been soft-deleted in the past
# Throws 404 Exception by Schema Registry and it won't ever be hard-deleted
client.delete_subject(subject, permanent=True)
# Expected Exception
# =======================
# confluent_kafka.schema_registry.error.SchemaRegistryError: Subject 'lorem-ipsum-value' was soft deleted.
# Set permanent=true to delete permanently (HTTP status code 404, SR code 40404)
Checklist
Please provide the following information:
-
[x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
):-
confluent-kafka-python = ('1.6.1', 17170688)
-
Librdkafka = ('1.6.1', 17170943)
-
-
[x] Apache Kafka broker version:
-
[x] Client configuration:
{...}
-
[x] Operating system: macOS Catalina (10.15.7)
-
[x] Provide client logs (with
'debug': '..'
as necessary)- N/A
-
[x] Provide broker log excerpts
- N/A
-
[ ] Critical issue
Remediation
Option 1
Perform only 1 request depending on the value of permanent.
def delete_subject(self, subject_name, permanent=False):
"""
Deletes the specified subject and its associated compatibility level if
registered. It is recommended to use this API only when a topic needs
to be recycled or in development environments.
Args:
subject_name (str): subject name
permanent (bool): True for a hard delete, False (default) for a soft delete
Returns:
list(int): Versions deleted under this subject
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`DELETE Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)>`_
""" # noqa: E501
if permanent:
return self._rest_client.delete('subjects/{}?permanent=true'
.format(_urlencode(subject_name)))
return self._rest_client.delete('subjects/{}'
.format(_urlencode(subject_name)))
Pros:
- Flexibility
- User handles the 404 error in their application code
Cons:
-
Based on documentation, user has to call this method twice to ensure it is hard-deleted since it is required to do a soft-delete first.
-
{ "error_code": 40405, "message": "Subject 'lorem-ipsum' was not deleted first before being permanently deleted" }
-
Option 2
Keep the routine but wrap the soft-delete call to catch 404
def delete_subject(self, subject_name, permanent=False):
"""
Deletes the specified subject and its associated compatibility level if
registered. It is recommended to use this API only when a topic needs
to be recycled or in development environments.
Args:
subject_name (str): subject name
permanent (bool): True for a hard delete, False (default) for a soft delete
Returns:
list(int): Versions deleted under this subject
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`DELETE Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)>`_
""" # noqa: E501
try:
list = self._rest_client.delete('subjects/{}'
.format(_urlencode(subject_name)))
except SchemaRegistryError as e:
if e.http_status_code != 404:
raise
if permanent:
list = self._rest_client.delete('subjects/{}?permanent=true'
.format(_urlencode(subject_name)))
return list
Pros:
- User only needs to call this method once to hard-delete
Cons:
- We have to manage the 404 error
Related to #1029 @slominskir
I’m not sure which is the best approach. I find the distinction between soft and hard delete dubious so I just hard delete everything, but we should support all combinations - forcing a two step process may better match the REST API so I guess I’m leaning that way and catching exceptions inside the method can potentially hide issues (404 might occur for some other reason - maybe misconfigured registry url)
Right, for the SchemaRegistryError object, we can opt for error_code instead of http_status_code. This will allow us to only continue if we encountered the below exception. (when SR code == 40404) All other errors would be propagated to the user.
confluent_kafka.schema_registry.error.SchemaRegistryError: Subject 'lorem-ipsum-value' was soft deleted.Set permanent=true to delete permanently (HTTP status code 404, SR code 40404)
def delete_subject(self, subject_name, permanent=False):
"""
Deletes the specified subject and its associated compatibility level if
registered. It is recommended to use this API only when a topic needs
to be recycled or in development environments.
Args:
subject_name (str): subject name
permanent (bool): True for a hard delete, False (default) for a soft delete
Returns:
list(int): Versions deleted under this subject
Raises:
SchemaRegistryError: if the request was unsuccessful.
See Also:
`DELETE Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)>`_
""" # noqa: E501
try:
list = self._rest_client.delete('subjects/{}'
.format(_urlencode(subject_name)))
except SchemaRegistryError as e:
if e.error_code != 40404:
raise
if permanent:
list = self._rest_client.delete('subjects/{}?permanent=true'
.format(_urlencode(subject_name)))
return list
Looks good to me. Submit that change as a pull request and see if maintainers go for it.
@PatrickTu I was wondering if you are still experiencing this issue in the latest versions, or if it has been solved in any of the updates since then