camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

CamelPostgresqlsourceSourceConnector publishes InputStreamCache object reference to topic

Open camwardy opened this issue 2 years ago • 12 comments

Hi @oscerd,

I'm attempting to use the CamelPostgresqlsourceSourceConnector. I can create the connector but it publishes an InputStreamCache object reference to the topic, in the form:

org.apache.camel.converter.stream.InputStreamCache@6e37b6bd
org.apache.camel.converter.stream.InputStreamCache@6c833187
org.apache.camel.converter.stream.InputStreamCache@59bcea5a
org.apache.camel.converter.stream.InputStreamCache@4235ec9b
org.apache.camel.converter.stream.InputStreamCache@1fd4716e
org.apache.camel.converter.stream.InputStreamCache@3a08393d
org.apache.camel.converter.stream.InputStreamCache@2e2bb2c
org.apache.camel.converter.stream.InputStreamCache@c2ac46c
org.apache.camel.converter.stream.InputStreamCache@2a224dd6
org.apache.camel.converter.stream.InputStreamCache@1437780
...

Is there any additional config I need to set to get the connector to publish something that a consumer can easily utilise?

Here is my connector config:

{
    "name": "source",
    "config": {
        "connector.class": "org.apache.camel.kafkaconnector.postgresqlsource.CamelPostgresqlsourceSourceConnector",
        "tasks.max": "1",
        "topics": "test-topic",
        "camel.kamelet.postgresql-source.databaseName": "db",
        "camel.kamelet.postgresql-source.query": "SELECT (id,name) FROM source;",
        "camel.kamelet.postgresql-source.serverName": "postgres",
        "camel.kamelet.postgresql-source.username": "postgres",
        "camel.kamelet.postgresql-source.password": "postgres",
        "camel.kamelet.postgresql-source.delay": 2000
    }
}

Thanks

camwardy avatar Jun 28 '23 13:06 camwardy

@orpiske & @oscerd, We're also seeing the same InputStreamCache issue, the manual is very barebones and doesn't this particular fix/workround. Can you please help?

aozmen121 avatar Jun 28 '23 14:06 aozmen121

Can you try with

key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.converters.ByteArrayConverter

oscerd avatar Jun 28 '23 14:06 oscerd

@oscerd We get the following exception: org.apache.kafka.connect.errors.DataException: ByteArrayConverter is not compatible with objects of type class org.apache.camel.converter.stream.InputStreamCache

camwardy avatar Jun 28 '23 15:06 camwardy

I need to reproduce this but I think we need to disable the stream caching here: https://github.com/apache/camel-kafka-connector/blob/main/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java#L143

with

context.setStreamCaching(false);

You can try to add this fix and rebuild the whole project, starting from the tag you're using, for the time being.

oscerd avatar Jun 28 '23 15:06 oscerd

@oscerd Doesn't look like setting converter to org.apache.kafka.connect.converters.ByteArrayConverter worked for us either.

aozmen121 avatar Jun 28 '23 15:06 aozmen121

@oscerd We get the following exception: org.apache.kafka.connect.errors.DataException: ByteArrayConverter is not compatible with objects of type class org.apache.camel.converter.stream.InputStreamCache

The documentation is barebones because nobody is contributing a single line of documentation or code. Open source is not just asking.

oscerd avatar Jun 28 '23 15:06 oscerd

@oscerd Doesn't look like setting converter to org.apache.kafka.connect.converters.ByteArrayConverter worked for us either.

Try to build the project with the fix suggested above.

oscerd avatar Jun 28 '23 15:06 oscerd

Hi, I faced the same issue with camel-http-secured-source-kafka-connector the task is returning InputStreamCache object as the SourceRecord value, which cannot be converted with ByteArrayConverter.

Disabling streaming cache with camel.main.streamCachingEnabled property changes the java-type to org.apache.camel.converter.stream.CachedOutputStream$WrappedInputStream which is not suitable either.

When combined with camel.source.marshal: jackson I get following error:

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.camel.converter.stream.InputStreamCache and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)

I tried looking for suitable camel-http properties but I couldn't find anything.

For now I've bypassed this by implementing custom transformation plugin which transforms the InputStream into the byte array. However I was wondering if there a way to configure the connector to at the very least returned unwrapped bytes or preferably being able to convert it to HashMap for the JSON payload to be able to apply further field transformations ?

jakubmalek avatar Jul 18 '23 16:07 jakubmalek

is there no one who have resolved this issue? please help

cmartinez-peigo avatar Oct 11 '23 14:10 cmartinez-peigo

We are investigating what could the problem. @valdar is looking at that

oscerd avatar Oct 11 '23 14:10 oscerd