eventbridge-kafka-connector
eventbridge-kafka-connector copied to clipboard
Use a field from the input data to override the AWS Event time.
What is your idea?
Add the possibility to override the AWS Event time by adding configuration parameters that specify an input data field to take the timestamp information from. If the config parameters are not provided everything will continue to work as it already does i.e. Event time is not defined and AWS Eventbridge sets it to the current timestamp.
Would you be willing to make the change?
Yes I will post a pull request.
Additional context
We need the possibility to set a custom event time from input data for the event time when processing historical data.
+1
Thank you @jensur77 for raising this. Just to understand, you basically want a way to manipulate the outgoing EventBridge event e.g., providing your own timestamp? We have had similar requests, and while it's possible to write individual logic ("functions") to do that per field, the better option IMHO is to use "interceptor" classes where we provide you with hooks to deserialize the original Kafka object and manipulate the outgoing EventBridge JSON event (with Jackson). Otherwise my concern is that you're going to depend on this project to implement custom logic and maintenance/customization is going to be painful.
WDYT? We have something similar already for overwriting the outgoing event type information using a custom Java class.
cc/ @maschnetwork @agebhar1 for feedback
Note: we could provide helper functions to make deserialization easier because this will be common across those interceptors.
To add option to configure a class for EventBridgeMapper interface and then open up DefaultEventBridgeMapper.java a bit so one can just override the method createPutEventsEntry().
I.e in a similar patter as already exists for DefaultDetailTypeMapper.java
That will solve the problem.
Or another way which might be less work is to exactly follow the DetailTypeMapper.java pattern and have another mapper for "time"
E.g
private EventBridgeResult<PutEventsRequestEntry> createPutEventsEntry(SinkRecord record) {
try {
return success(
record,
PutEventsRequestEntry.builder()
.eventBusName(config.eventBusArn)
.source(sourcePrefix + config.connectorId)
.detailType(detailTypeMapper.getDetailType(record))
.time(timeMapper.getTime(record))
.resources(config.resources)
.detail(createJsonPayload(record))
.build());
} catch (Exception e) {
return failure(record, reportOnly("Cannot convert Kafka record to EventBridge.", e));
}
}
Good points! I can see how the number of configuration parameters grown with more and more changes like the original suggestion. Instead the configuration could be an optional class name for timeMapperClass, following the pattern for defaultTypeMapper. The TimeMapper interface will have a default implementation that returns null and can be overridden be the timeMapperClass parameter. And maybe move the deserialization to a unitily class. Sounds like a plan?
sounds good :-)
Hi everyone,
thanks for raising this. I discussed with @embano1 and a TimeMapper, similar to DetailTypeMapper would be a good choice moving forward. Also, moving the desertialization logic to a utility class makes sense as the TimeMapper would need to rely on this too. The only difference to DetailTypeMapper is that TimeMapper does not need to have a configure method to provide values from the config as this will be solely handled from the custom implementation - only the classname needs to be provided.
@jensur77 do you have capacitiy to provide a first PR for this?
Hi,
nice to hear! Sure, a PR for this is coming soon.
Fantastic! Our reviewers are eagerly waiting :)
Hi
We are working on a solution but have some difficulties to test it in a "real life" scenario. When we put an impl of TimeMapper in a jar which we add to the lib folder for the connector (in the expanded zip) it isn't "visible" for the class loader.
I was wondering if some manifest file of some sort needs to be updated?
Can you share the code/example you built? Technically it should suffice if the class is in the class loader path. What's the exception thrown? cc/ @maschnetwork @agebhar1
it is like the default time mapper but outside the "project" We added that the class name is verified at start the same way as for detailTypeMapper which is Class.forName and then there will be a ClassNotFoundException.
I also added google ClassGraph util to print the class path and our added jar was not in the list. But the google util jar was of course since that I added the pom.xml for the connector.
The only difference is that our jar with the impl doesn't have a version in the filename.
I tried to see if some sort of "java module" or similar is used but I can't see that
The "complete" setup is that we build a docker image based on confluentinc/cp-kafka-connect-base image. Add our connectors unzipped to /usr/share/java.
The build has the connector zip file as dependency.
All is built with Gradle Jib plugin from google.
All I can think of is that Kafka connect framework has some means to know which jar file sto put on the class path.
I did a simple PoC (not using DetailTypeMapper by intention). The JAR of the 'extension' is in the same directory as the connector itself. All is started by the provided E2E Docker compose file. The connector is loaded/activated by
$ curl -d @connect-config.json -Ss -H 'Content-Type: application/json' http://localhost:8083/connectors
The class from the 'extension' could be resolved by the JVM and the class was successfully instantiated.
Details from the debug view:
Yeah, it must be something then with the base image from confluent. Default in that one is to have "plugins" under /usr/share/java but I see in your installation it is under /opt/connectors
But did you run the uber jar now for the connector or did you unzip the zip file to /opt/connectors?
but ok I will try to mimic your PoC
I was able to successfully add the implementation jar directly under /usr/share/java last nigth on our confluent base image. If another folder is desired it should probably be possible to add it to the plugin.path config param for kafka-connect. So everything works well now.
@jensur77 thanks again! Release v1.3.2 is available now and should be soon on Confluent Hub if you're using that platform.
That was quick :) Thank you too!