machinery icon indicating copy to clipboard operation
machinery copied to clipboard

feat: Kafka broker

Open vividvilla opened this issue 4 years ago • 7 comments

Here is a implementation of Kafka broker, I went through other broker implementations and tried to implement something similar but with few changes like how the worker pool is initialized etc. Also introduced some opinionated changes to initialize Kafka broker

  • Broker url is not required
  • New config struct config.KafkaConfig

Here is an sample initialization

cnf := &config.Config{
  DefaultQueue:    "KAFKA_TOPIC_TO_LISTEN",
  ResultBackend:   "YOUR_BACKEND_URL",
  Kafka: &config.KafkaConfig{
    // Kafka consumer group name.
    Group:    "KAFKA_GROUP",
    // Kafka broker URLs.
    Addrs:    []string{"127.0.0.1:9092"},
    // Client ID used by Kafka client (optional).
    ClientID: "KAFKA_CLIENT_ID",
    // Consume from newest if enabled, bydefault it consumes from oldest.
    OffsetNewest: true,
    // Set compression format (optional). Allowed values are `gzip`, `lz4`, `snappy`, `zstd`.
    Compression: "gzip",
    // Set compression level (optional).
    CompressionLevel: 5,
  },
}

This pull request is based on v1.6.7.

edit: Added client level compression.

vividvilla avatar Aug 05 '19 07:08 vividvilla

Updated the fork with latest changes to upstream master. Hope this gets reviewed and merged soon :)

vividvilla avatar Aug 27 '19 08:08 vividvilla

Codecov Report

Patch coverage has no change and project coverage change: -0.04 :warning:

Comparison is base (6f7b63c) 48.03% compared to head (ed2188d) 47.99%.

:exclamation: Current head ed2188d differs from pull request most recent head d0f55c3. Consider uploading reports for the commit d0f55c3 to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##           master     #449      +/-   ##
==========================================
- Coverage   48.03%   47.99%   -0.04%     
==========================================
  Files          29       29              
  Lines        2644     2646       +2     
==========================================
  Hits         1270     1270              
- Misses       1234     1235       +1     
- Partials      140      141       +1     
Impacted Files Coverage Δ
v1/config/config.go 80.00% <ø> (ø)
v1/factories.go 54.47% <0.00%> (-0.83%) :arrow_down:

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Do you have feedback about the report comment? Let us know in this issue.

codecov[bot] avatar Aug 27 '19 08:08 codecov[bot]

@vividvilla I'd like first to solve the problem of importing all dependencies as this is becoming bigger issue. By adding more brokers/backends, it increases amount of dependencies that machinery will import.

It would be best to refactor the code so it only imports dependencies of broker/backend you are using.

Little bit afraid of adding yet another broker before that is addressed.

RichardKnop avatar Oct 08 '19 20:10 RichardKnop

When I started working with Machinery I was shocked by the number of dependencies it pulls and how tightly coupled broker configs are. Recently our CI build were started failing because of Apache thrift library fetch timeout , we had to switch to Go mod proxy to solve the issue. Also compile time is very high compared to any of our other Go projects and binary size also 3-4x bloated.

In fact I started refactoring my Machinery fork to address these issues but halted for a while due to time constraint on our company projects. My idea was to ditch current way of passing broker url to initialize server and instead send broker itself, for example

server, err := machinery.NewServer(&config.Config{
	Broker: kafkabroker.New(kafkabroker.Config{
		Group:    "broker_group",
		Addrs:    ["broker_addrs"],
		ClientID: "broker_id",
	})
})

Similarly backend stores also has to be initialized and passed on as config.

Also the idea is to remove support for current Yaml config, ideally user should be using projects like Viper or Koanf for configuration, it shouldn't be a part of the library.

These changes I have proposed are not backward compatible so has to be released as new version.

vividvilla avatar Oct 09 '19 06:10 vividvilla

@vividvilla Something like that would be ideal. I am still thinking about a ways to refactor the code to make it not import all dependencies (even those you don't need) without breaking changes. But it might have to be a breaking change in which case I would probably create a new top level package called v2.

RichardKnop avatar Oct 11 '19 12:10 RichardKnop

Possibly, we could start with a v2 package like this: https://github.com/RichardKnop/machinery/pull/477

Still basically compatible with current version but there is no factory so when you import it from your project, you will only need to import dependencies for a broker and backend you are using.

RichardKnop avatar Oct 11 '19 12:10 RichardKnop

lgtm

yuexian1234 avatar Nov 28 '23 12:11 yuexian1234