flipt icon indicating copy to clipboard operation
flipt copied to clipboard

feat(audit): implement sink for kafka

Open erka opened this issue 1 year ago • 1 comments

related #3146

  • [x] find the right place for event.proto and data type for payload
  • [x] add support for Schema Registry
  • [ ] improve test coverage

erka avatar Jun 22 '24 18:06 erka

While all the steps are straightforward, the main issue lies with the Payload of Event. Defining a schema for any in protobuf is challenging. Althoughavro or json schema might be more flexible in this regard, I haven't researched it yet

erka avatar Jun 22 '24 18:06 erka

I'm a bit stuck with Dagger and Redpanda Schema Registry. Redpanda provides a great development Docker image that includes Kafka with Kraft, Schema Registry, and other related services. When I run it locally with Docker, all my tests pass without issue. However, when I run them with Dagger, it fails with the error Client.Timeout exceeded while awaiting headers.

There is an issue report dagger/dagger/issues/6673 suggesting that multiple ports may not be bound, but I tested this additional task and confirmed that there are no issues with the connections.

func (m *Dagger) SchemaRegistry(ctx context.Context) (string, error) {
	k := dag.Container().From("redpandadata/redpanda").
		WithExposedPort(9092, dagger.ContainerWithExposedPortOpts{
			Description: "kafka endpoint",
		}).
		WithExposedPort(8081, dagger.ContainerWithExposedPortOpts{
			Description: "schema registry",
		}).
		WithEnvVariable("REDPANDA_ADVERTISE_KAFKA_ADDRESS", "kafka:9092").
		WithExec(nil).AsService()

	return dag.Container().From("alpine").WithServiceBinding("kafka", k).
		// WithExec([]string{"nc", "-v", "kafka", "9092"}).
		WithExec([]string{"wget", "http://kafka:8081"}).
		Stdout(ctx)
}

If anyone can spot where I might be going wrong, I would greatly appreciate any hints.

erka avatar Jul 01 '24 15:07 erka

I'm a bit stuck with Dagger and Redpanda Schema Registry. Redpanda provides a great development Docker image that includes Kafka with Kraft, Schema Registry, and other related services. When I run it locally with Docker, all my tests pass without issue. However, when I run them with Dagger, it fails with the error Client.Timeout exceeded while awaiting headers.

There is an issue report dagger/dagger/issues/6673 suggesting that multiple ports may not be bound, but I tested this additional task and confirmed that there are no issues with the connections.

func (m *Dagger) SchemaRegistry(ctx context.Context) (string, error) {
	k := dag.Container().From("redpandadata/redpanda").
		WithExposedPort(9092, dagger.ContainerWithExposedPortOpts{
			Description: "kafka endpoint",
		}).
		WithExposedPort(8081, dagger.ContainerWithExposedPortOpts{
			Description: "schema registry",
		}).
		WithEnvVariable("REDPANDA_ADVERTISE_KAFKA_ADDRESS", "kafka:9092").
		WithExec(nil).AsService()

	return dag.Container().From("alpine").WithServiceBinding("kafka", k).
		// WithExec([]string{"nc", "-v", "kafka", "9092"}).
		WithExec([]string{"wget", "http://kafka:8081"}).
		Stdout(ctx)
}

If anyone can spot where I might be going wrong, I would greatly appreciate any hints.

cc @levlaz @GeorgeMac for any suggestions. I will take a look this evening myself as well

markphelps avatar Jul 01 '24 21:07 markphelps

When I run it locally with Docker, all my tests pass without issue.

Hey @erka can you share how you are running this locally with Docker? What specific commands are you using to start the redpanda container?

The code looks good to me - and the proxy is working correctly, you are getting a 404 from kafka:8081 which at least means the kafka container is bound and listening and responding on that port. I suspect the issue has to do with the configuration of the service container itself.

I cannot find anywhere in the docs of redpanda that talks about how to configure things with environment variables (i.e. where does REDPANDA_ADVERTISE_KAFKA_ADDRESS come from?)

Looking at the docker-compose.yml file is the best way to translate to Dagger https://docs.redpanda.com/current/get-started/quick-start/#deploy-redpanda - but the config is very verbose so i'd love to hear how you are running it locally to see if there is a shortcut we can take with a few env vars.

Lastly, I don't think it makes any difference but this does not seem to be necessary WithExec(nil)

levlaz avatar Jul 01 '24 23:07 levlaz

Thank you @levlaz for the support.

Hey @erka can you share how you are running this locally with Docker? What specific commands are you using to start the redpanda container?

docker run -it -p 9092:9092 -p 8081:8081 --privileged -h kafka redpandadata/redpanda

Also I had /etc/hosts modified with kafka pointed to 127.0.0.1.

The code looks good to me - and the proxy is working correctly, you are getting a 404 from kafka:8081 which at least means the kafka container is bound and listening and responding on that port. I suspect the issue has to do with the configuration of the service container itself.

I cannot find anywhere in the docs of redpanda that talks about how to configure things with environment variables (i.e. where does REDPANDA_ADVERTISE_KAFKA_ADDRESS come from?)

I saw it in the workflow of go kafka client. There is another place docker-composer.yml which helped me to solve this.

I used kafka as the DNS name to promote the Kafka address, and it works fine for the Flipt container with WithServiceBinding. Inside the Kafka container, the Schema Registry service used the same DNS, and the Kafka container has no records about kafka host so the timeout appears. I hadn't realized that with the dagger alias from WithServiceBinding, it will only be in one container and not in both of them.

Thank you @markphelps @levlaz for help. I could move forward now

erka avatar Jul 02 '24 06:07 erka

Codecov Report

Attention: Patch coverage is 73.98844% with 45 lines in your changes missing coverage. Please review.

Project coverage is 48.37%. Comparing base (f997fb9) to head (3128e6a). Report is 465 commits behind head on main.

Files Patch % Lines
internal/server/audit/events.go 0.00% 15 Missing :warning:
internal/server/audit/kafka/kafka.go 83.11% 7 Missing and 6 partials :warning:
internal/server/audit/kafka/protobuf.go 81.08% 4 Missing and 3 partials :warning:
internal/cmd/grpc.go 0.00% 5 Missing and 1 partial :warning:
internal/server/audit/kafka/avro.go 85.00% 2 Missing and 1 partial :warning:
internal/server/audit/audit.go 0.00% 1 Missing :warning:
Additional details and impacted files
@@             Coverage Diff             @@
##             main    #3204       +/-   ##
===========================================
- Coverage   70.78%   48.37%   -22.41%     
===========================================
  Files          91      162       +71     
  Lines        8729    13105     +4376     
===========================================
+ Hits         6179     6340      +161     
- Misses       2165     6297     +4132     
- Partials      385      468       +83     

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Jul 04 '24 14:07 codecov[bot]