confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Hard-deleting a subject fails if the subject has already been soft-deleted

Open PatrickTu opened this issue 3 years ago • 6 comments

Description

Attempting to hard-delete a subject that has been soft-deleted ends up throwing a 404 error and will never execute the hard-delete API call.

How to reproduce

  1. Register schema to Schema Registry
  2. Soft-delete the subject we just registered
  3. Attempt to programmatically hard-delete the subject using SchemaRegistryClient.delete_subject

Schema Registry returns the follow error

Traceback (most recent call last):
  File "/Users/user/confluent-kafka-python-error.py", line 28, in <module>
    client.delete_subject(subject, permanent=True)
  File "/Users/user/venv/lib/python3.7/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 462, in delete_subject
    .format(_urlencode(subject_name)))
  File "/Users/user/venv/lib/python3.7/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 130, in delete
    return self.send_request(url, method='DELETE')
  File "/Users/user/venv/lib/python3.7/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 176, in send_request
    response.json().get('message'))
confluent_kafka.schema_registry.error.SchemaRegistryError: Subject 'lorem-ipsum-value' was soft deleted.Set permanent=true to delete permanently (HTTP status code 404, SR code 40404)

Repro Script

from confluent_kafka.schema_registry import Schema, SchemaRegistryClient

if __name__ == '__main__':
    subject = 'lorem-ipsum-value'
    schema = """
        {
        "namespace": "confluent.io.examples.serialization.avro",
        "name": "User",
        "type": "record",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "favorite_number", "type": "int"},
            {"name": "favorite_color", "type": "string"}
        ]
    }
    """

    client: SchemaRegistryClient = SchemaRegistryClient({'url': 'http://localhost:8081'})

    # Staging: Register Schema then soft delete it
    client.register_schema(subject, Schema(schema_str=schema, schema_type='AVRO'))
    client.delete_subject(subject)

    # Bug: Attempt to hard-delete a subject that has been soft-deleted in the past
    # Throws 404 Exception by Schema Registry and it won't ever be hard-deleted
    client.delete_subject(subject, permanent=True)
    
    # Expected Exception
    # =======================
    # confluent_kafka.schema_registry.error.SchemaRegistryError: Subject 'lorem-ipsum-value' was soft deleted.
    # Set permanent=true to delete permanently (HTTP status code 404, SR code 40404)

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):

    • confluent-kafka-python = ('1.6.1', 17170688)

    • Librdkafka = ('1.6.1', 17170943)

  • [x] Apache Kafka broker version:

  • [x] Client configuration: {...}

  • [x] Operating system: macOS Catalina (10.15.7)

  • [x] Provide client logs (with 'debug': '..' as necessary)

    • N/A
  • [x] Provide broker log excerpts

    • N/A
  • [ ] Critical issue

PatrickTu avatar May 24 '21 22:05 PatrickTu

Remediation

Option 1

Perform only 1 request depending on the value of permanent.

    def delete_subject(self, subject_name, permanent=False):
        """
        Deletes the specified subject and its associated compatibility level if
        registered. It is recommended to use this API only when a topic needs
        to be recycled or in development environments.

        Args:
            subject_name (str): subject name
            permanent (bool): True for a hard delete, False (default) for a soft delete

        Returns:
            list(int): Versions deleted under this subject

        Raises:
            SchemaRegistryError: if the request was unsuccessful.

        See Also:
            `DELETE Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)>`_

        """  # noqa: E501

        if permanent:
            return self._rest_client.delete('subjects/{}?permanent=true'
                                            .format(_urlencode(subject_name)))

        return self._rest_client.delete('subjects/{}'
                                        .format(_urlencode(subject_name)))

Pros:

  • Flexibility
  • User handles the 404 error in their application code

Cons:

  • Based on documentation, user has to call this method twice to ensure it is hard-deleted since it is required to do a soft-delete first.

    • {
          "error_code": 40405,
          "message": "Subject 'lorem-ipsum' was not deleted first before being permanently deleted"
      }
      

Option 2

Keep the routine but wrap the soft-delete call to catch 404

    def delete_subject(self, subject_name, permanent=False):
        """
        Deletes the specified subject and its associated compatibility level if
        registered. It is recommended to use this API only when a topic needs
        to be recycled or in development environments.

        Args:
            subject_name (str): subject name
            permanent (bool): True for a hard delete, False (default) for a soft delete

        Returns:
            list(int): Versions deleted under this subject

        Raises:
            SchemaRegistryError: if the request was unsuccessful.

        See Also:
            `DELETE Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)>`_

        """  # noqa: E501
        try:
            list = self._rest_client.delete('subjects/{}'
                                            .format(_urlencode(subject_name)))
        except SchemaRegistryError as e:
            if e.http_status_code != 404:
                raise

        if permanent:
            list = self._rest_client.delete('subjects/{}?permanent=true'
                                            .format(_urlencode(subject_name)))

        return list

Pros:

  • User only needs to call this method once to hard-delete

Cons:

  • We have to manage the 404 error

PatrickTu avatar May 24 '21 22:05 PatrickTu

Related to #1029 @slominskir

PatrickTu avatar May 24 '21 22:05 PatrickTu

I’m not sure which is the best approach. I find the distinction between soft and hard delete dubious so I just hard delete everything, but we should support all combinations - forcing a two step process may better match the REST API so I guess I’m leaning that way and catching exceptions inside the method can potentially hide issues (404 might occur for some other reason - maybe misconfigured registry url)

slominskir avatar May 25 '21 14:05 slominskir

Right, for the SchemaRegistryError object, we can opt for error_code instead of http_status_code. This will allow us to only continue if we encountered the below exception. (when SR code == 40404) All other errors would be propagated to the user.

confluent_kafka.schema_registry.error.SchemaRegistryError: Subject 'lorem-ipsum-value' was soft deleted.Set permanent=true to delete permanently (HTTP status code 404, SR code 40404)

    def delete_subject(self, subject_name, permanent=False):
        """
        Deletes the specified subject and its associated compatibility level if
        registered. It is recommended to use this API only when a topic needs
        to be recycled or in development environments.

        Args:
            subject_name (str): subject name
            permanent (bool): True for a hard delete, False (default) for a soft delete

        Returns:
            list(int): Versions deleted under this subject

        Raises:
            SchemaRegistryError: if the request was unsuccessful.

        See Also:
            `DELETE Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#delete--subjects-(string-%20subject)>`_

        """  # noqa: E501
        try:
            list = self._rest_client.delete('subjects/{}'
                                            .format(_urlencode(subject_name)))
        except SchemaRegistryError as e:
            if e.error_code != 40404:
                raise

        if permanent:
            list = self._rest_client.delete('subjects/{}?permanent=true'
                                            .format(_urlencode(subject_name)))

        return list

PatrickTu avatar May 25 '21 19:05 PatrickTu

Looks good to me. Submit that change as a pull request and see if maintainers go for it.

slominskir avatar May 25 '21 22:05 slominskir

@PatrickTu I was wondering if you are still experiencing this issue in the latest versions, or if it has been solved in any of the updates since then

nhaq-confluent avatar Mar 06 '24 23:03 nhaq-confluent