machinery
machinery copied to clipboard
feat: Kafka broker
Here is a implementation of Kafka broker, I went through other broker implementations and tried to implement something similar but with few changes like how the worker pool is initialized etc. Also introduced some opinionated changes to initialize Kafka broker
- Broker url is not required
- New config struct
config.KafkaConfig
Here is an sample initialization
cnf := &config.Config{
DefaultQueue: "KAFKA_TOPIC_TO_LISTEN",
ResultBackend: "YOUR_BACKEND_URL",
Kafka: &config.KafkaConfig{
// Kafka consumer group name.
Group: "KAFKA_GROUP",
// Kafka broker URLs.
Addrs: []string{"127.0.0.1:9092"},
// Client ID used by Kafka client (optional).
ClientID: "KAFKA_CLIENT_ID",
// Consume from newest if enabled, bydefault it consumes from oldest.
OffsetNewest: true,
// Set compression format (optional). Allowed values are `gzip`, `lz4`, `snappy`, `zstd`.
Compression: "gzip",
// Set compression level (optional).
CompressionLevel: 5,
},
}
This pull request is based on v1.6.7
.
edit: Added client level compression.
Updated the fork with latest changes to upstream master. Hope this gets reviewed and merged soon :)
Codecov Report
Patch coverage has no change and project coverage change: -0.04
:warning:
Comparison is base (
6f7b63c
) 48.03% compared to head (ed2188d
) 47.99%.
:exclamation: Current head ed2188d differs from pull request most recent head d0f55c3. Consider uploading reports for the commit d0f55c3 to get more accurate results
Additional details and impacted files
@@ Coverage Diff @@
## master #449 +/- ##
==========================================
- Coverage 48.03% 47.99% -0.04%
==========================================
Files 29 29
Lines 2644 2646 +2
==========================================
Hits 1270 1270
- Misses 1234 1235 +1
- Partials 140 141 +1
Impacted Files | Coverage Δ | |
---|---|---|
v1/config/config.go | 80.00% <ø> (ø) |
|
v1/factories.go | 54.47% <0.00%> (-0.83%) |
:arrow_down: |
Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.
@vividvilla I'd like first to solve the problem of importing all dependencies as this is becoming bigger issue. By adding more brokers/backends, it increases amount of dependencies that machinery will import.
It would be best to refactor the code so it only imports dependencies of broker/backend you are using.
Little bit afraid of adding yet another broker before that is addressed.
When I started working with Machinery I was shocked by the number of dependencies it pulls and how tightly coupled broker configs are. Recently our CI build were started failing because of Apache thrift library fetch timeout , we had to switch to Go mod proxy to solve the issue. Also compile time is very high compared to any of our other Go projects and binary size also 3-4x bloated.
In fact I started refactoring my Machinery fork to address these issues but halted for a while due to time constraint on our company projects. My idea was to ditch current way of passing broker url to initialize server and instead send broker itself, for example
server, err := machinery.NewServer(&config.Config{
Broker: kafkabroker.New(kafkabroker.Config{
Group: "broker_group",
Addrs: ["broker_addrs"],
ClientID: "broker_id",
})
})
Similarly backend stores also has to be initialized and passed on as config.
Also the idea is to remove support for current Yaml config, ideally user should be using projects like Viper or Koanf for configuration, it shouldn't be a part of the library.
These changes I have proposed are not backward compatible so has to be released as new version.
@vividvilla Something like that would be ideal. I am still thinking about a ways to refactor the code to make it not import all dependencies (even those you don't need) without breaking changes. But it might have to be a breaking change in which case I would probably create a new top level package called v2.
Possibly, we could start with a v2 package like this: https://github.com/RichardKnop/machinery/pull/477
Still basically compatible with current version but there is no factory so when you import it from your project, you will only need to import dependencies for a broker and backend you are using.
lgtm