sbmt-kafka_consumer
sbmt-kafka_consumer copied to clipboard
Ruby gem for consuming Kafka messages
Sbmt-KafkaConsumer
This gem is used to consume Kafka messages. It is a wrapper over the Karafka gem, and is recommended for use as a transport with the sbmt-outbox gem.
Installation
Add this line to your application's Gemfile:
gem "sbmt-kafka_consumer"
And then execute:
bundle install
Demo
Learn how to use this gem and how it works with Ruby on Rails at here https://github.com/SberMarket-Tech/outbox-example-apps
Auto configuration
We recommend going through the configuration and file creation process using the following Rails generators. Each generator can be run by using the --help option to learn more about the available arguments.
Initial configuration
If you plug the gem into your application for the first time, you can generate the initial configuration:
rails g kafka_consumer:install
As the result, the config/kafka_consumer.yml file will be created.
Consumer class
A consumer class can be generated with the following command:
rails g kafka_consumer:consumer MaybeNamespaced::Name
Inbox consumer
To generate an Inbox consumer for use with gem sbmt-outbox, run the following command:
rails g kafka_consumer:inbox_consumer MaybeNamespaced::Name some-consumer-group some-topic
Manual configuration
The config/kafka_consumer.yml file is a main configuration for the gem.
Example config with a full set of options:
default: &default
client_id: "my-app-consumer"
concurrency: 4 # max number of threads
# optional Karafka options
max_wait_time: 1
shutdown_timeout: 60
pause_timeout: 1
pause_max_timeout: 30
pause_with_exponential_backoff: true
auth:
kind: plaintext
kafka:
servers: "kafka:9092"
# optional Kafka options
heartbeat_timeout: 5
session_timeout: 30
reconnect_timeout: 3
connect_timeout: 5
socket_timeout: 30
kafka_options:
allow.auto.create.topics: true
probes: # optional section
port: 9394
endpoints:
readiness:
enabled: true
path: "/readiness"
liveness:
enabled: true
path: "/liveness"
timeout: 15
metrics: # optional section
port: 9090
path: "/metrics"
consumer_groups:
group_ref_id_1:
name: cg_with_single_topic
topics:
- name: topic_with_inbox_items
consumer:
klass: "Sbmt::KafkaConsumer::InboxConsumer"
init_attrs:
name: "test_items"
inbox_item: "TestInboxItem"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::NullDeserializer"
kafka_options:
auto.offset.reset: latest
group_ref_id_2:
name: cg_with_multiple_topics
topics:
- name: topic_with_json_data
consumer:
klass: "SomeConsumer"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::JsonDeserializer"
- name: topic_with_protobuf_data
consumer:
klass: "SomeConsumer"
deserializer:
klass: "Sbmt::KafkaConsumer::Serialization::ProtobufDeserializer"
init_attrs:
message_decoder_klass: "SomeDecoder"
skip_decoding_error: true
development:
<<: *default
test:
<<: *default
deliver: false
production:
<<: *default
auth config section
The gem supports 2 variants: plaintext (default) and SASL-plaintext
SASL-plaintext:
auth:
kind: sasl_plaintext
sasl_username: user
sasl_password: pwd
sasl_mechanism: SCRAM-SHA-512
kafka config section
The servers key is required and should be in rdkafka format: without kafka:// prefix, for example: srv1:port1,srv2:port2,....
The kafka_config section may contain any rdkafka option. Also, kafka_options may be redefined for each topic.
consumer_groups config section
consumer_groups:
# group id can be used when starting a consumer process (see CLI section below)
group_id:
name: some_group_name # required
topics:
- name: some_topic_name # required
active: true # optional, default true
consumer:
klass: SomeConsumerClass # required, a consumer class inherited from Sbmt::KafkaConsumer::BaseConsumer
init_attrs: # optional, consumer class attributes (see below)
key: value
deserializer:
klass: SomeDeserializerClass # optional, default NullDeserializer, a deserializer class inherited from Sbmt::KafkaConsumer::Serialization::NullDeserializer
init_attrs: # optional, deserializer class attributes (see below)
key: value
kafka_options: # optional, this section allows to redefine the root rdkafka options for each topic
auto.offset.reset: latest
consumer.init_attrs options for BaseConsumer
skip_on_error- optional, default false, omit consumer errors in message processing and commit the offset to Kafka
consumer.init_attrs options for InboxConsumer
inbox_item- required, name of the inbox item classevent_name- optional, default nil, used when the inbox item keep several event typesskip_on_error- optional, default false, omit consumer errors in message processing and commit the offset to Kafka
deserializer.init_attrs options
skip_decoding_error— don't raise an exception when cannot deserialize the message
probes config section
In Kubernetes, probes are mechanisms used to assess the health of your application running within a container.
probes:
port: 9394 # optional, default 9394
endpoints:
liveness:
enabled: true # optional, default true
path: /liveness # optional, default "/liveness"
timeout: 10 # optional, default 10, timeout in seconds after which the group is considered dead
readiness:
enabled: true # optional, default true
path: /readiness/kafka_consumer # optional, default "/readiness/kafka_consumer"
metrics config section
We use Yabeda to collect all kind of metrics.
metrics:
port: 9090 # optional, default is probes.port
path: /metrics # optional, default "/metrics"
Kafkafile
You can create a Kafkafile in the root of your app to configure additional settings for your needs.
Example:
require_relative "config/environment"
some-extra-configuration
CLI
Run the following command to execute a server
kafka_consumer -g some_group_id_1 -g some_group_id_2 -c 5
Where:
-g-group, a consumer group id, if not specified, all groups from the config will be processed-c-concurrency, a number of threads, default is 4
concurrency argument
Concurrency and Multithreading.
Don't forget to properly calculate and set the size of the ActiveRecord connection pool:
- each thread will utilize one db connection from the pool
- an application can have monitoring threads which can use db connections from the pool
Also pay attention to the number of processes of the server:
number_of_processes x concurrencyfor topics with high data intensity can be equal to the number of partitions of the consumed topicnumber_sof_processes x concurrencyfor topics with low data intensity can be less than the number of partitions of the consumed topic
Testing
To test your consumer with Rspec, please use this shared context
require "sbmt/kafka_consumer/testing"
RSpec.describe OrderCreatedConsumer do
include_context "with sbmt karafka consumer"
it "works" do
publish_to_sbmt_karafka(payload, deserializer: deserializer)
expect { consume_with_sbmt_karafka }.to change(Order, :count).by(1)
end
end
Development
- Prepare environment
dip provision
- Run tests
dip rspec
- Run linter
dip rubocop
- Run Kafka server
dip up
- Run consumer server
dip kafka-consumer