scio icon indicating copy to clipboard operation
scio copied to clipboard

Intercept errors for PubSubIo[T]

Open jmendesky opened this issue 4 years ago • 2 comments

Hi,

We are happy users of Scio - thanks for all your work :)

We are using PubSubIO[GenericRecord]. We would like to make our pipelines more resilient by intercepting errors resulting from messages that could not be parsed as Avro GenericRecords (for example from a client publishing jibberish into the topic). Such messages would just throw errors like the one below:

java.lang.IndexOutOfBoundsException java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180) java.io.FilterInputStream.read(FilterInputStream.java:133)

The only way I currently see to achieve this is to use PubSubIO[ByteArray] and do the parsing into GenericRecords ourselves. I would suppose this is quite a common scenario for resilient pipelines. I was wondering if the Scio API offers such functionality, for example by returning a SCollection[Either[ParseError, GenericReecord]] instead of just Scollection[GenericRecord]. If none such functionality exists, would you be open to considering this feature request? It would certainly align with common pipeline patterns as it would allow splitting the incoming message stream into a success and an error channel.

jmendesky avatar Mar 19 '20 16:03 jmendesky

Hi everyone, what do you think about this question/proposal. We would be happy to help to implement this feature but would need to know whether we are going in the right direction.

jmendesky avatar Mar 31 '20 07:03 jmendesky

Hey how are you reading PubSub with GenericRecord? Looks like only SpecificRecord is implemented. https://github.com/spotify/scio/blob/master/scio-core/src/main/scala/com/spotify/scio/io/PubsubIO.scala#L85

Also I think it'd depend on the underlying Beam PubsubIO behavior, which I don't think exposes error. So you might have to decode manually either way.

nevillelyh avatar May 07 '20 14:05 nevillelyh