pykafka icon indicating copy to clipboard operation
pykafka copied to clipboard

OffsetFetchResponseV2 can result in offsets being lost

Open bobgrigoryan opened this issue 6 years ago • 0 comments

PyKafka version: 2.8.0 (using SimpleConsumer with rdkafka support) Kafka version: 1.0.1 (not reproducible on 0.8.2.2)

There's a rare case on our production environment when one of the broker went down, in such cases our application restarts consumer, and at that point we are loosing committed offsets and start consuming from very beginning. The only error we see in logs is:

pykafka.simpleconsumer - ERROR - Error fetching offsets for topic 'xxx' (errors: {})

After deep investigation we came to conclusion that this is related to implementation of OffsetFetchResponseV2 message. In V2 they added also response-level error code as part of message, in difference to V1 where error codes were only reported for each partition separately. see here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-88%3A+OffsetFetch+Protocol+Update

basically pykafka decodes this error code field here: https://github.com/Parsely/pykafka/blob/e7665bf36bfe521050fdcb017c68e92365bd89ed/pykafka/protocol/offset_commit.py#L367

but further it is not used anywhere around here: https://github.com/Parsely/pykafka/blob/ebbc5c70901237e60bd6654336675886793fb8d9/pykafka/simpleconsumer.py#L660

so fetch_offsets() gets empty list of partitions, does nothing (even not retrying to fetch again) and exists without setting offsets.

we've made some changes to build_parts_by_error() function to workaround the problem. in case where response-level error code is set, it is artificially being copied into partition-specific error codes, to simulate V1 behavior, so further code will be able to handle it without modifications. not sure if this change is correct, since build_parts_by_error() is used for other cases as well.

def build_parts_by_error(response, partitions_by_id):
    """Separate the partitions from a response by their error code

    :param response: a Response object containing partition responses
    :type response: :class:`pykafka.protocol.Response`
    :param partitions_by_id: a dict mapping partition ids to OwnedPartition
        instances
    :type partitions_by_id: dict
        {int: :class:`pykafka.simpleconsumer.OwnedPartition`}
    """
    # group partition responses by error code
    parts_by_error = defaultdict(list)

    if getattr(response, 'err', 0) != 0:
        # for OffsetFetchResponseV2 error processing - duplicate generic error into all partitions
        if partitions_by_id is not None:
            for partition_id, owned_partition in iteritems(partitions_by_id):
                parts_by_error[response.err].append((owned_partition, None))

    for topic_name in response.topics.keys():
        for partition_id, pres in iteritems(response.topics[topic_name]):
            if partitions_by_id is not None and partition_id in partitions_by_id:
                owned_partition = partitions_by_id[partition_id]
                parts_by_error[pres.err].append((owned_partition, pres))
    return parts_by_error

Cannot provide a runnable code at the moment. Was able to reproduce the similar error by forcibly sending request to "wrong" group coordinator. So OffsetFetchResponseV2 in this case was:

err = 16 # NotCoordinatorForGroup
partition_responses = [] 

bobgrigoryan avatar Aug 15 '19 17:08 bobgrigoryan