fs2-kafka
fs2-kafka copied to clipboard
java.lang.RuntimeException: io.confluent.kafka.serializers.subject.TopicNameStrategy is not an instance of io.confluent.kafka.serializers.subject.strategy.SubjectNameStrategy
Hi,
I've been seeing this error intermittently when using fs2-kafka in conjunction with Vulcan. It seems to only happen when used in tests and Fabio has also seen this problem.
I've created a simple project where I was able to reproduce the problem: https://github.com/ra-ovo/fs2-kafka-vulcan-issue. There's a screenshot showing the error and the exception stack trace. To reproduce it had to run SBT in repl mode and hit test
a couple of times until I finally got the error. I've been trying to explore the problem, without success, on another branch of that repo.
Note that, because I need a schema registry on that test app, I've used the docker configuration from the docker-compose.yml
file in the project. All of the configurations in the code point to the defaults ports exported by that docker-compose file.
Using sbt-dependency-graph and IntelliJ I saw that there's a couple of kafka-clients
versions showing up: 5.4.0-ccs
and 2.4.0
(by using sbt-dependency-graph) and 5.4.0
(when using IntelliJ to open the AbstractKafkaAvroSerDeConfig
class, one of the ones that show up in the stack trace).
I'm not entirely sure if this is even a problem on fs2-kafka
(on the Vulcan module) or whether anything can be done here, but any hint on how work around this will be very much appreciated.
Please let me know if I can help with this issue.
Thanks!
We started a discussion here a few days ago https://gitter.im/fd4s/fs2-kafka?at=5e788968ada7ae4f2e384733
@vlovgr Note that the issue Ricardo opened happened on a different project, so we know it's happened in the wild twice. Hopefully the linked repro makes this easier to tackle
Not sure if this is relevant, but I've also played a bit with exclusion rules here. I wanted to try to force kafka-clients 2.4.0, but I could still get the error happening.
I forced both sides of the conflict and it still failed for me as well. My hunch (not a lot of confidence in it), is that there is also a problem with sbt's class loader
Can be. I've also tried running the tests using bloop and the problem never occurred.
I ran into this recently - the fix for me was to set run / fork := true
in build.SBT
Any joy on this issue? Although I am using this lib extensively in a service it only fails for me in an integration test. Very odd. I have tried many things and none has worked so far.
@sergiojoker11 Honestly I haven't even looked at it again myself. Have you tried @bplommer's suggestion?
Yes, I have. @rcdmrl
Sorted. The problem is sbt default class loader layering strategy doesn't play very well with libraries using reflection. i.e kafka-[avro/schema]-serializer does use reflection. On this scenario, the sbt docs advise to use the Flat strategy. https://www.scala-sbt.org/1.x/docs/In-Process-Classloaders.html#Troubleshooting Therefore, the one liner in the build.sbt fixing the issue is:
[Configuration] / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat
For me Configuration was equal to IntegrationTest.
One thing I struggle to understand is why my actual service has not failed the same way if my runDev
uses the default sbt class loader layering strategy and the classPath contains the same stuff.
Although I haven't dived any deeper into the issue, I imagine the class loader was picking up/mixing the SubjectNameStrategy (and its implementations) from kafka-avro-serializer and kafka-schema-serializer since these packages contains copies of said interfaces/classes.
This is great @sergiojoker11! Thanks for sharing your findings! I'm also going to give it a try and I'll share the results here.
Update: It seems to be really fixing the problem! I've updated my demo project with this info.