camel-kafka-connector
camel-kafka-connector copied to clipboard
CamelPostgresqlsourceSourceConnector publishes InputStreamCache object reference to topic
Hi @oscerd,
I'm attempting to use the CamelPostgresqlsourceSourceConnector. I can create the connector but it publishes an InputStreamCache object reference to the topic, in the form:
org.apache.camel.converter.stream.InputStreamCache@6e37b6bd
org.apache.camel.converter.stream.InputStreamCache@6c833187
org.apache.camel.converter.stream.InputStreamCache@59bcea5a
org.apache.camel.converter.stream.InputStreamCache@4235ec9b
org.apache.camel.converter.stream.InputStreamCache@1fd4716e
org.apache.camel.converter.stream.InputStreamCache@3a08393d
org.apache.camel.converter.stream.InputStreamCache@2e2bb2c
org.apache.camel.converter.stream.InputStreamCache@c2ac46c
org.apache.camel.converter.stream.InputStreamCache@2a224dd6
org.apache.camel.converter.stream.InputStreamCache@1437780
...
Is there any additional config I need to set to get the connector to publish something that a consumer can easily utilise?
Here is my connector config:
{
"name": "source",
"config": {
"connector.class": "org.apache.camel.kafkaconnector.postgresqlsource.CamelPostgresqlsourceSourceConnector",
"tasks.max": "1",
"topics": "test-topic",
"camel.kamelet.postgresql-source.databaseName": "db",
"camel.kamelet.postgresql-source.query": "SELECT (id,name) FROM source;",
"camel.kamelet.postgresql-source.serverName": "postgres",
"camel.kamelet.postgresql-source.username": "postgres",
"camel.kamelet.postgresql-source.password": "postgres",
"camel.kamelet.postgresql-source.delay": 2000
}
}
Thanks
@orpiske & @oscerd, We're also seeing the same InputStreamCache issue, the manual is very barebones and doesn't this particular fix/workround. Can you please help?
Can you try with
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter
@oscerd We get the following exception: org.apache.kafka.connect.errors.DataException: ByteArrayConverter is not compatible with objects of type class org.apache.camel.converter.stream.InputStreamCache
I need to reproduce this but I think we need to disable the stream caching here: https://github.com/apache/camel-kafka-connector/blob/main/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java#L143
with
context.setStreamCaching(false);
You can try to add this fix and rebuild the whole project, starting from the tag you're using, for the time being.
@oscerd Doesn't look like setting converter to org.apache.kafka.connect.converters.ByteArrayConverter worked for us either.
@oscerd We get the following exception:
org.apache.kafka.connect.errors.DataException: ByteArrayConverter is not compatible with objects of type class org.apache.camel.converter.stream.InputStreamCache
The documentation is barebones because nobody is contributing a single line of documentation or code. Open source is not just asking.
@oscerd Doesn't look like setting
convertertoorg.apache.kafka.connect.converters.ByteArrayConverterworked for us either.
Try to build the project with the fix suggested above.
Hi,
I faced the same issue with camel-http-secured-source-kafka-connector the task is returning InputStreamCache object as the SourceRecord value, which cannot be converted with ByteArrayConverter.
Disabling streaming cache with camel.main.streamCachingEnabled property changes the java-type to org.apache.camel.converter.stream.CachedOutputStream$WrappedInputStream which is not suitable either.
When combined with camel.source.marshal: jackson I get following error:
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.camel.converter.stream.InputStreamCache and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
I tried looking for suitable camel-http properties but I couldn't find anything.
For now I've bypassed this by implementing custom transformation plugin which transforms the InputStream into the byte array.
However I was wondering if there a way to configure the connector to at the very least returned unwrapped bytes or preferably being able to convert it to HashMap for the JSON payload to be able to apply further field transformations ?
is there no one who have resolved this issue? please help
We are investigating what could the problem. @valdar is looking at that