eventbridge-kafka-connector icon indicating copy to clipboard operation
eventbridge-kafka-connector copied to clipboard

Use a field from the input data to override the AWS Event time.

Open jensur77 opened this issue 1 year ago • 10 comments

What is your idea?

Add the possibility to override the AWS Event time by adding configuration parameters that specify an input data field to take the timestamp information from. If the config parameters are not provided everything will continue to work as it already does i.e. Event time is not defined and AWS Eventbridge sets it to the current timestamp.

Would you be willing to make the change?

Yes I will post a pull request.

Additional context

We need the possibility to set a custom event time from input data for the event time when processing historical data.

jensur77 avatar Aug 09 '24 08:08 jensur77

+1

Lucas3oo avatar Aug 09 '24 08:08 Lucas3oo

Thank you @jensur77 for raising this. Just to understand, you basically want a way to manipulate the outgoing EventBridge event e.g., providing your own timestamp? We have had similar requests, and while it's possible to write individual logic ("functions") to do that per field, the better option IMHO is to use "interceptor" classes where we provide you with hooks to deserialize the original Kafka object and manipulate the outgoing EventBridge JSON event (with Jackson). Otherwise my concern is that you're going to depend on this project to implement custom logic and maintenance/customization is going to be painful.

WDYT? We have something similar already for overwriting the outgoing event type information using a custom Java class.

embano1 avatar Aug 09 '24 08:08 embano1

cc/ @maschnetwork @agebhar1 for feedback

Note: we could provide helper functions to make deserialization easier because this will be common across those interceptors.

embano1 avatar Aug 09 '24 08:08 embano1

To add option to configure a class for EventBridgeMapper interface and then open up DefaultEventBridgeMapper.java a bit so one can just override the method createPutEventsEntry().

I.e in a similar patter as already exists for DefaultDetailTypeMapper.java

That will solve the problem.

Lucas3oo avatar Aug 09 '24 09:08 Lucas3oo

Or another way which might be less work is to exactly follow the DetailTypeMapper.java pattern and have another mapper for "time"

E.g

private EventBridgeResult<PutEventsRequestEntry> createPutEventsEntry(SinkRecord record) {
    try {
      return success(
          record,
          PutEventsRequestEntry.builder()
              .eventBusName(config.eventBusArn)
              .source(sourcePrefix + config.connectorId)
              .detailType(detailTypeMapper.getDetailType(record))
              .time(timeMapper.getTime(record))
              .resources(config.resources)
              .detail(createJsonPayload(record))
              .build());
    } catch (Exception e) {
      return failure(record, reportOnly("Cannot convert Kafka record to EventBridge.", e));
    }
  }

Lucas3oo avatar Aug 09 '24 11:08 Lucas3oo

Good points! I can see how the number of configuration parameters grown with more and more changes like the original suggestion. Instead the configuration could be an optional class name for timeMapperClass, following the pattern for defaultTypeMapper. The TimeMapper interface will have a default implementation that returns null and can be overridden be the timeMapperClass parameter. And maybe move the deserialization to a unitily class. Sounds like a plan?

jensur77 avatar Aug 09 '24 11:08 jensur77

sounds good :-)

Lucas3oo avatar Aug 09 '24 11:08 Lucas3oo

Hi everyone,

thanks for raising this. I discussed with @embano1 and a TimeMapper, similar to DetailTypeMapper would be a good choice moving forward. Also, moving the desertialization logic to a utility class makes sense as the TimeMapper would need to rely on this too. The only difference to DetailTypeMapper is that TimeMapper does not need to have a configure method to provide values from the config as this will be solely handled from the custom implementation - only the classname needs to be provided.

@jensur77 do you have capacitiy to provide a first PR for this?

maschnetwork avatar Aug 12 '24 14:08 maschnetwork

Hi,

nice to hear! Sure, a PR for this is coming soon.

jensur77 avatar Aug 13 '24 08:08 jensur77

Fantastic! Our reviewers are eagerly waiting :)

embano1 avatar Aug 13 '24 08:08 embano1

Hi

We are working on a solution but have some difficulties to test it in a "real life" scenario. When we put an impl of TimeMapper in a jar which we add to the lib folder for the connector (in the expanded zip) it isn't "visible" for the class loader.

I was wondering if some manifest file of some sort needs to be updated?

Lucas3oo avatar Aug 14 '24 14:08 Lucas3oo

Can you share the code/example you built? Technically it should suffice if the class is in the class loader path. What's the exception thrown? cc/ @maschnetwork @agebhar1

embano1 avatar Aug 14 '24 15:08 embano1

it is like the default time mapper but outside the "project" We added that the class name is verified at start the same way as for detailTypeMapper which is Class.forName and then there will be a ClassNotFoundException.

I also added google ClassGraph util to print the class path and our added jar was not in the list. But the google util jar was of course since that I added the pom.xml for the connector.

The only difference is that our jar with the impl doesn't have a version in the filename.

Lucas3oo avatar Aug 14 '24 16:08 Lucas3oo

I tried to see if some sort of "java module" or similar is used but I can't see that

Lucas3oo avatar Aug 14 '24 16:08 Lucas3oo

The "complete" setup is that we build a docker image based on confluentinc/cp-kafka-connect-base image. Add our connectors unzipped to /usr/share/java.

The build has the connector zip file as dependency.

All is built with Gradle Jib plugin from google.

All I can think of is that Kafka connect framework has some means to know which jar file sto put on the class path.

Lucas3oo avatar Aug 14 '24 16:08 Lucas3oo

I did a simple PoC (not using DetailTypeMapper by intention). The JAR of the 'extension' is in the same directory as the connector itself. All is started by the provided E2E Docker compose file. The connector is loaded/activated by

$ curl -d @connect-config.json -Ss -H 'Content-Type: application/json'  http://localhost:8083/connectors

witness

The class from the 'extension' could be resolved by the JVM and the class was successfully instantiated.

Details from the debug view: pluginclassloader

agebhar1 avatar Aug 14 '24 17:08 agebhar1

Yeah, it must be something then with the base image from confluent. Default in that one is to have "plugins" under /usr/share/java but I see in your installation it is under /opt/connectors

But did you run the uber jar now for the connector or did you unzip the zip file to /opt/connectors?

Lucas3oo avatar Aug 14 '24 17:08 Lucas3oo

but ok I will try to mimic your PoC

Lucas3oo avatar Aug 14 '24 17:08 Lucas3oo

I was able to successfully add the implementation jar directly under /usr/share/java last nigth on our confluent base image. If another folder is desired it should probably be possible to add it to the plugin.path config param for kafka-connect. So everything works well now.

jensur77 avatar Aug 15 '24 04:08 jensur77

@jensur77 thanks again! Release v1.3.2 is available now and should be soon on Confluent Hub if you're using that platform.

embano1 avatar Aug 22 '24 11:08 embano1

That was quick :) Thank you too!

jensur77 avatar Aug 22 '24 12:08 jensur77