TopologyTestDriver's TestInputTopic is not thread safe
I use the TopologyTestDriver in many (and long-lived) integration tests (it works quite well in the ASYNC_CLUSTER_IN_MEMORY mode)
So, during test, I override an abstraction I have around message producers to use the associated TestInputTopic instead of an IProducer.
However, because my app will occasionally manually produce messages to some of the input topics, it's doing so from multiple threads simultaneously. In these cases, I've had a few different issues:
- The InputTopic pipe will hang indefinitely (so I need to wrap it with a try/catch with a timeout and retry)
- The InputTopic pipe will complain that multiple things are adding to it's internal collection and throw an error
- There are sometimes exceptions thrown about inconsistent timestamps
I've worked around these issues in several ways, but my latest workaround involves wrapping it in a lock on a static object:
public class TestKafkaStreams : IKafkaStreams, IMessageProducer
{
private readonly static object _lockObj = new();
// ...
public void ProduceMessage<T>(...)
{
lock (_lockObj)
_topic.PipeInput(...);
}
Which seems to work, for now. But, are there plans to perhaps support a concurrent collection type for the pipes?
Thanks.
Hey @xdave ,
Sounds like a great feature request. Let me try to add this one in the 1.8.0 release.
Hey @xdave ,
Sounds like a great feature request. Let me try to add this one in the 1.8.0 release.
@LGouellec I was also thinking it might be useful to be able to pipe messages into an OutputTopic for the Test driver, too... in cases where something needs to be deleted asynchronously outside of the stream topology.
@xdave I'm not sure I understood your last point
@xdave I'm not sure I understood your last point
@LGouellec My last point was that if I decide to manually send a null value to a changelog topic, I cannot test this scenario with the TopologyTestDriver, as it won't let me create an InputTopic with the same name as an existing OutputTopic.
@xdave TopologyTestDriver doesn't support the compaction either btw
PR Merged