feat(audit): implement sink for kafka
related #3146
- [x] find the right place for
event.protoand data type forpayload - [x] add support for Schema Registry
- [ ] improve test coverage
While all the steps are straightforward, the main issue lies with the Payload of Event. Defining a schema for any in protobuf is challenging. Althoughavro or json schema might be more flexible in this regard, I haven't researched it yet
I'm a bit stuck with Dagger and Redpanda Schema Registry. Redpanda provides a great development Docker image that includes Kafka with Kraft, Schema Registry, and other related services. When I run it locally with Docker, all my tests pass without issue. However, when I run them with Dagger, it fails with the error Client.Timeout exceeded while awaiting headers.
There is an issue report dagger/dagger/issues/6673 suggesting that multiple ports may not be bound, but I tested this additional task and confirmed that there are no issues with the connections.
func (m *Dagger) SchemaRegistry(ctx context.Context) (string, error) {
k := dag.Container().From("redpandadata/redpanda").
WithExposedPort(9092, dagger.ContainerWithExposedPortOpts{
Description: "kafka endpoint",
}).
WithExposedPort(8081, dagger.ContainerWithExposedPortOpts{
Description: "schema registry",
}).
WithEnvVariable("REDPANDA_ADVERTISE_KAFKA_ADDRESS", "kafka:9092").
WithExec(nil).AsService()
return dag.Container().From("alpine").WithServiceBinding("kafka", k).
// WithExec([]string{"nc", "-v", "kafka", "9092"}).
WithExec([]string{"wget", "http://kafka:8081"}).
Stdout(ctx)
}
If anyone can spot where I might be going wrong, I would greatly appreciate any hints.
I'm a bit stuck with Dagger and Redpanda Schema Registry. Redpanda provides a great development Docker image that includes Kafka with Kraft, Schema Registry, and other related services. When I run it locally with Docker, all my tests pass without issue. However, when I run them with Dagger, it fails with the error
Client.Timeout exceeded while awaiting headers.There is an issue report
dagger/dagger/issues/6673suggesting that multiple ports may not be bound, but I tested this additional task and confirmed that there are no issues with the connections.func (m *Dagger) SchemaRegistry(ctx context.Context) (string, error) { k := dag.Container().From("redpandadata/redpanda"). WithExposedPort(9092, dagger.ContainerWithExposedPortOpts{ Description: "kafka endpoint", }). WithExposedPort(8081, dagger.ContainerWithExposedPortOpts{ Description: "schema registry", }). WithEnvVariable("REDPANDA_ADVERTISE_KAFKA_ADDRESS", "kafka:9092"). WithExec(nil).AsService() return dag.Container().From("alpine").WithServiceBinding("kafka", k). // WithExec([]string{"nc", "-v", "kafka", "9092"}). WithExec([]string{"wget", "http://kafka:8081"}). Stdout(ctx) }If anyone can spot where I might be going wrong, I would greatly appreciate any hints.
cc @levlaz @GeorgeMac for any suggestions. I will take a look this evening myself as well
When I run it locally with Docker, all my tests pass without issue.
Hey @erka can you share how you are running this locally with Docker? What specific commands are you using to start the redpanda container?
The code looks good to me - and the proxy is working correctly, you are getting a 404 from kafka:8081 which at least means the kafka container is bound and listening and responding on that port. I suspect the issue has to do with the configuration of the service container itself.
I cannot find anywhere in the docs of redpanda that talks about how to configure things with environment variables (i.e. where does REDPANDA_ADVERTISE_KAFKA_ADDRESS come from?)
Looking at the docker-compose.yml file is the best way to translate to Dagger https://docs.redpanda.com/current/get-started/quick-start/#deploy-redpanda - but the config is very verbose so i'd love to hear how you are running it locally to see if there is a shortcut we can take with a few env vars.
Lastly, I don't think it makes any difference but this does not seem to be necessary WithExec(nil)
Thank you @levlaz for the support.
Hey @erka can you share how you are running this locally with Docker? What specific commands are you using to start the redpanda container?
docker run -it -p 9092:9092 -p 8081:8081 --privileged -h kafka redpandadata/redpanda
Also I had /etc/hosts modified with kafka pointed to 127.0.0.1.
The code looks good to me - and the proxy is working correctly, you are getting a 404 from
kafka:8081which at least means the kafka container is bound and listening and responding on that port. I suspect the issue has to do with the configuration of the service container itself.I cannot find anywhere in the docs of redpanda that talks about how to configure things with environment variables (i.e. where does
REDPANDA_ADVERTISE_KAFKA_ADDRESScome from?)
I saw it in the workflow of go kafka client. There is another place docker-composer.yml which helped me to solve this.
I used kafka as the DNS name to promote the Kafka address, and it works fine for the Flipt container with WithServiceBinding. Inside the Kafka container, the Schema Registry service used the same DNS, and the Kafka container has no records about kafka host so the timeout appears. I hadn't realized that with the dagger alias from WithServiceBinding, it will only be in one container and not in both of them.
Thank you @markphelps @levlaz for help. I could move forward now
Codecov Report
Attention: Patch coverage is 73.98844% with 45 lines in your changes missing coverage. Please review.
Project coverage is 48.37%. Comparing base (
f997fb9) to head (3128e6a). Report is 465 commits behind head on main.
Additional details and impacted files
@@ Coverage Diff @@
## main #3204 +/- ##
===========================================
- Coverage 70.78% 48.37% -22.41%
===========================================
Files 91 162 +71
Lines 8729 13105 +4376
===========================================
+ Hits 6179 6340 +161
- Misses 2165 6297 +4132
- Partials 385 468 +83
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.