zio-kafka icon indicating copy to clipboard operation
zio-kafka copied to clipboard

Error types should be modeled more strongly

Open jypma opened this issue 5 years ago • 3 comments

Right now, all zio-kafka effect types have Throwable as their error type, requiring users to look into the kafka API itself to find out what exceptions to expect.

An argument could be made to introduce concrete error types for the expected error scenarios, and use those instead of Throwable. This can have some clear advantages:

  • We can show the difference between a stream that silently reconnects, vs. a (not implemented yet) stream that fails when kafka's connection goes down (as requested in https://github.com/zio/zio-kafka/issues/175 ).

  • We can model deserialization errors into this type, if desired, as an alternative to the current .asTry construct.

In fact, in its current state, which exceptions can be expected in that Throwable?

jypma avatar Nov 16 '20 10:11 jypma

I like this idea a lot, at the very least as an experiment to see how feasible this is and what would be a good approach. I've been considering doing the same for zio-kinesis.

Taking def partitionedStream as a starting point, which can fail with any Throwable, we'd have to look into the call hierarchy to see which exceptions we can expect. Methods that are called on KafkaConsumer as part of that hierarchy are:

  • poll
  • resume
  • pause
  • assignment
  • seek

For pause, resume, assignment we can refineOrDie on the IllegalStateException, this is already done in some places using ZIO.effectTotal.

Most of the exceptions can come from poll:

    /**     
     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
     *             partitions is undefined or out of range and no offset reset policy has been configured
     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
     *             function is called
     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
     *             this function is called
     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
     * @throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
     *             topics or to the configured groupId. See the exception for more details
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
     *             session timeout, errors deserializing key/value pairs, your rebalance callback thrown exceptions,
     *             or any new error cases in future versions)
     * @throws java.lang.IllegalArgumentException if the timeout value is negative
     * @throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
     *             partitions to consume from
     * @throws java.lang.ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
     * @throws org.apache.kafka.common.errors.InvalidTopicException if the current subscription contains any invalid
     *             topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
     * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the consumer attempts to fetch stable offsets
     *             when the broker doesn't support this feature
     * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this consumer instance gets fenced by broker.
     */

Some of these we should (hopefully) be able to get rid of as zio-kafka programming errors using refineOrDie, because they should not happen the way we have programmed zio-kafka, like:

  • InterruptionException
  • IllegalStateException
  • WakeupException

You would get a fiber failure in such a case. But that still leaves like 20 of them which would need to be modeled into a custom error type or otherwise eliminated, including KafkaException which has 123 subclasses..(!).

I see three options:

  1. Leave error type as Throwable (as is know)
  2. Create an ADT including every possible of these exceptions.
  • Lot of work
  • Some of them may never be thrown from our call hierarchy
  • There still needs to be a catch-all error like UnknownException
  1. Pick out some of them which we explicitly want the client to include in the error ADT.
  • Looking at the 123 subclasses, there's so many things that can go wrong when consuming from kafka.
  • The selection may be very subjective

Besides poll, commit errors are not passed to the error channel of partitionedStream but have to be handled by the caller of CommittableRecord.offset#commit. For committing using commitAsync we can expect a org.apache.kafka.common.errors.RebalanceInProgressException if the commit failed because it is in the middle of a rebalance. In such cases commit could be retried after the rebalance is completed with the {@link KafkaConsumer#poll(Duration)} call.. This is something we might want to handle internally by retrying anyway (@iravid?).

Looking forward to your feedback @jypma @iravid .

svroonland avatar Nov 17 '20 14:11 svroonland

I like the approach of gradually building an ADT, including a catch-all-like type like OtherError(x: KafkaException).

That way, we can start with a few known error-cases, and incrementally extend the error types with actual real-life cases that do occur. Subjectivity should be less of a factor that way, since the ADT hierarchy should be able to mirror the exception hierarchy.

We could model the OtherError type deprecated from the get-go, saying that we'd prefer a PR for a new type, rather than users relying on OtherError. Perhaps even UnmodeledError would be a more apt name?

We'd only have to push the IllegalArgumentException, IllegalStateException and friends into some more-specific types, or get rid of them (e.g. "if the offset is negative", that probably should be caught a lot earlier).

jypma avatar Nov 17 '20 14:11 jypma

Big 👍 on this!

iravid avatar Mar 31 '21 07:03 iravid