FsKafka
FsKafka copied to clipboard
Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 1.x
FsKafka

F# friendly wrapper for Confluent.Kafka, with minimal dependencies or additional abstractions (but see related repos).
FsKafka wraps Confluent.Kafka to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. Depends on Confluent.Kafka [1.9.0], librdkafka.redist [1.9.0] (pinned to ensure we use a tested pairing), Serilog (but no specific Serilog sinks, i.e. you configure to emit to NLog etc) and Newtonsoft.Json (used internally to parse Broker-provided Statistics for logging purposes).
Usage
FsKafka is delivered as a Nuget package targeting netstandard2.0 and F# >= 4.5.
Install-Package FsKafka
or for paket, use:
paket add FsKafka
Related repos
- See the Propulsion repo for extended Producers and Consumers.
- See the Jet
dotnet newtemplates repo'sproProjectortemplate (in-kmode) for example producer logic using theBatchedProducerand theproConsumertemplate for examples of using theBatchedConsumerfromFsKafka, alongside the extended modes inPropulsion. - See the Equinox QuickStart for examples of using this library to project to Kafka from
Equinox.Cosmosand/orEquinox.EventStore.
CONTRIBUTING
Contributions of all sizes are warmly welcomed. See our contribution guide
TEMPLATES
The best place to start, sample-wise is from the dotnet new templates stored in a dedicated repo.
BUILDING
The templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.
NB The tests are reliant on a TEST_KAFKA_BROKER environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run writes to a guid-named topic)
build, including tests on netcoreapp3.1
export TEST_KAFKA_BROKER="<server>:9092"
dotnet build build.proj -v n
FAQ
How do I get rid of all the breaking off polling ... resuming polling spam?
-
The
BatchedConsumerimplementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of #32, such messages are tagged with the typeFsKafka.Core.InFlightMessageCounter, and as such can be silenced by including the following in one'sLoggerConfiguration():.MinimumLevel.Override(FsKafka.Core.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)
What is this, why does it exist, where did it come from, is anyone using it ?
This code results from building out an end-to-end batteries-included set of libraries and templates as part of the Equinox project.
Equinox places some key constraints on all components and dependencies:-
- batteries-included examples of end-to-end functionality within the Equinox remit; samples should have clean consistent wiring
- pick a well-established base library, try not to add new concepts
- low dependencies, so it can work in lots of contexts without egregiously forcing you to upgrade things
- aim to add any resilience features as patches to upstream repos
- thorough test coverage; integration coverage for core wrapped functionality, unit tests for any non-trivial logic in the wrapper library
Minimal producer example
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let batching = Batching.Linger (System.TimeSpan.FromMilliseconds 10.)
let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All, batching)
let producer = KafkaProducer.Create(log, producerConfig, "MyTopic")
let key = Guid.NewGuid().ToString()
let deliveryResult = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously
Minimal batched consumer example
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async {
use consumer = BatchedConsumer.Start(log, cfg, handler)
return! consumer.AwaitShutdown()
} |> Async.RunSynchronously
Minimal batched consumer example with monitor
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async {
use consumer = BatchedConsumer.Start(log, cfg, handler)
use _ = KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId)
return! consumer.AwaitShutdown()
} |> Async.RunSynchronously
Running (and awaiting) a pair of consumers until either throws
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let config topic = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", [topic], "MyGroupId", AutoOffsetReset.Earliest)
let cfg1, cfg2 = config "MyTopicA", config "MyTopicB"
async {
use consumer1 = BatchedConsumer.Start(log, cfg1, handler)
use consumer2 = BatchedConsumer.Start(log, cfg2, handler)
use _ = KafkaMonitor(log).Start(consumer1.Inner, cfg1.Inner.GroupId)
use _ = KafkaMonitor(log).Start(consumer2.Inner, cfg2.Inner.GroupId)
return! Async.Parallel [consumer1.AwaitWithStopOnCancellation(); consumer2.AwaitWithStopOnCancellation()]
} |> Async.RunSynchronously