storm-contrib
storm-contrib copied to clipboard
Kafka spout PartitionManager should emit failed tuples without replaying all data that follows
PartitionManager resets to the offset of a failed message and upon nextTuple() starts re-transmitting all data from that offset forward. We modified this behavior to cache the data emitted and only re-transmit the failed data on nextTuple(). I would like to submit the changes for review/inclusion if others feel that this is worthwhile. The following changes were made:
*Add a HashMap<Long, List<Object>> for pending messages *Add a LinkedList<Long> for offsets of failed messages *Change fail(Long offset) to only add the offset to the failed offset list *Change ack(Long offset) to also remove the entry of the acked offset from the HashMap *Change next(SpoutOutputCollector collector) to re-emit failed messages if list is not empty
Respectfully Submitted