fs2-kafka
fs2-kafka copied to clipboard
withCreateConsumer and withCreateProducter methods have been removed
How does one use Kafka's MockConsumer
and MockProducer
with fs2-kafka?
The documentation (e.g. for ConsumerSettings) mentions withCreateConsumer
and withCreateProducer
but these methods have been removed. This commit removed the wtihCreateConsumer
method from ConsumerSettings
, and this commit removed the withCreateProducer
method from ProducerSettings
.
What is the replacement method? Thanks!
Have you tried with com.evolutiongaming.skafka.producer.Producer#fromProducerJ2
and com.evolutiongaming.skafka.consumer.Consumer#fromConsumerJ1
?
I'm happy to work on this issue if someone can tell me what the design should be.
Sorry for the slow response to this - good question, I don't think we really thought about this when making the changes you've linked to. I don't have all the context on this currently in my head but it looks like you'd need to override the default instances of MkConsumer
/MkProducer
with custom instances that create instances of the relevant mock classes. It would be good to have those in a testkit module and we definitely need to update the docs. I'd be happy to take a PR on this - is that enough to go on?
I'll work on something and then we can discuss in a PR.
Umm ... the build doesn't work out of the box due to this:
ThisBuild / latestVersion := tlLatestVersion.value.getOrElse(
throw new IllegalStateException("No tagged version found")
)
Any suggestions?
That's surprising, I haven't seen that problem - I'll try checking out a fresh copy of the repo later and see if I can reproduce this
In the meantime you can probably just type a value in to be able to start hacking?
:+1:
I've made a bit of progress. Perhaps you can take a look and tell me if this is keeping with what you're thinking of:
https://github.com/noelwelsh/fs2-kafka/tree/feature/public-mk-consumer
I'm having a lot of trouble testing this code. The basic problem seems to be that the MockConsumer
is not assigned to any of the partitions. I've tried a lot of variations on the code below, which is following the pattern of existing tests. Any insights here would be appreciated!
it("should use the MkConsumer instance in scope") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val mockConsumer = new MockConsumer[Array[Byte], Array[Byte]](OffsetResetStrategy.NONE)
implicit val mockMkConsumer = MkConsumer.fromKafkaByteConsumer[IO](mockConsumer)
val consumed =
KafkaConsumer[IO]
.stream(consumerSettings[IO])
.subscribeTo(topic)
.evalMap(IO.sleep(3.seconds).as(_)) // sleep a bit to trigger potential race condition with _.stream
.evalTap(
_ =>
IO.delay {
// mockConsumer.assign(java.util.Collections.singleton(new TopicPartition(topic, 0)))
// val partitionInfo = mockConsumer.partitionsFor(topic).get(0)
val record = new clients.consumer.ConsumerRecord(
topic,
0,
0L,
"Hello".getBytes(),
"Kafka".getBytes()
)
mockConsumer.addRecord(record)
}
)
.records
.map(committable => committable.record.key -> committable.record.value)
.interruptAfter(10.seconds)
.compile
.toVector
.unsafeRunSync()
consumed.size shouldEqual 1
consumed(0) shouldEqual "Hello" -> "Kafka"
}
}
I've tried playing around with creating custom MkConsumer
and MkProducer
instances in the code I'm trying to update. I'm afraid to say I think this new design is really the wrong thing.
The settings don't encapsulate all the settings any more. Previously the ConsumerSettings
and ProducerSettings
would encapsulate all the details needed to create a consumer or producer. Now the settings are split across the settings and the MkConsumer
and MkProducer
instances. So now I have to track down all the points of use of the settings and make sure the right instances are also used.
MkConsumer
and MkProducer
are not unique for a given type, as clearly shown by this issue, and therefore are not good candidates for type classes.
@noelwelsh that's a fair point about the ergonomics (though the traits are intended as capability traits rather than type classes so I don't see non-uniqueness as an argument against them in itself). The motivation for introducing the MkX traits was to decouple the effect type of consumer/producer creation from the effect type of the consumer/producer itself - see https://github.com/fd4s/fs2-kafka/pull/588. One other way I can see to do this is reintroduce withCreateX
but with type () => Consumer
etc instead of F[Consumer]
- what do you think?
As another option, would it help if there were an option to explicitly pass a MkX
instance when creating an consumer/producer?
Umm ... the build doesn't work out of the box due to this:
ThisBuild / latestVersion := tlLatestVersion.value.getOrElse( throw new IllegalStateException("No tagged version found") )
Any suggestions?
I'm unable to reproduce this when cloning the repo in the normal way:
git clone [email protected]:fd4s/fs2-kafka.git
cd fs2-kafka
sbt compile
Is there anything unusual about your development environment that might cause the tags to be missing?
Sorry about the delay. I don't get time to work on this most of the week.
Summary: I think the best solution is to add a method to pass a MkX
instance when creating the settings.
Here's my reasoning:
-
The root problem is the consumer (or producer; but I'm just going talk about consumers to keep it simpler) may be created with a different effect type than the settings.
-
Scala 2 functions cannot have type parameters, so we cannot create a builder method on
ConsumerSettings
that accepts a function to build the consumer. -
MkConsumer
is essentially a function parameterised by an effect type, solving that issue. -
The issue with the current design is that the settings are split across the
ConsumerSettings
and theMkConsumer
, which must both be provided at the point where we create a consumer. -
We want to encapsulate all the settings in a single object, and hence should move to providing a
MkConsumer
instance at the point where we construct theConsumerSettings
.
So the best way forward seems:
-
ConsumerSettings
includes aMkConsumer
field - this field defaults to the default
MkConsumer
(implicit) value - there is a builder method to change the value to a custom value
- there is a builder method to use the
MockConsumer
I don't think there is much value in keeping MkConsumer
as an implicit value in this design.