kafka-connect-arangodb icon indicating copy to clipboard operation
kafka-connect-arangodb copied to clipboard

Raw JSON import causes java.lang.NullPointerException

Open ttrading opened this issue 4 years ago • 1 comments

I just downloaded ArangoDB 3.7.10 and kafka-connect-arangodb sink connector on the server as our Kafka Broker to test the integration (OS:ubuntu20). Use the quick start guide to start everything but somehow getting an error when I produce the following JSON message to the designated topic:

{ "type": "Point", "time": 1560507792000, "value": 5 }

My quickstart-kafka-connect-arangodb-sink property file is as follows:

# Kafka settings
name=arangodb-sink
connector.class=io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkConnector
tasks.max=1

# Topics to consume from (comma-separated for a list of multiple topics)
topics=test

# ArangoDB sink configuration
arangodb.host=10.10.10.195
arangodb.port=8529
arangodb.user=root
arangodb.password=xxxxxx
arangodb.database.name=webperformance

# Optional transformers
#transforms=cdc
#transforms.cdc.type=io.github.jaredpetersen.kafkaconnectarangodb.sink.transforms.Cdc

In my standalone kafka connect config file, I set the schema convert options to "false" for key and value. I wish I could give me details but the following is the only thing I see in the output of kafka-connect:

 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:361)
[2021-04-06 20:02:14,583] INFO ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = connector-consumer-arangodb-sink-0
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = connect-arangodb-sink
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig:361)
[2021-04-06 20:02:14,633] WARN The configuration 'metrics.context.connect.kafka.cluster.id' was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig:369)
[2021-04-06 20:02:14,633] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser:119)
[2021-04-06 20:02:14,633] INFO Kafka commitId: 448719dc99a19793 (org.apache.kafka.common.utils.AppInfoParser:120)
[2021-04-06 20:02:14,633] INFO Kafka startTimeMs: 1617739334633 (org.apache.kafka.common.utils.AppInfoParser:121)
[2021-04-06 20:02:14,646] INFO Created connector arangodb-sink (org.apache.kafka.connect.cli.ConnectStandalone:112)
[2021-04-06 20:02:14,647] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Subscribed to topic(s): test (org.apache.kafka.clients.consumer.KafkaConsumer:961)
[2021-04-06 20:02:14,648] INFO task config: {connector.class=io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkConnector, arangodb.port=8529, arangodb.database.name=webperformance, task.class=io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask, tasks.max=1, topics=test, arangodb.host=10.10.10.195, name=arangodb-sink, arangodb.password=1advtec2, arangodb.user=root} (io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask:41)
[2021-04-06 20:02:14,648] INFO initial config: {connector.class=io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkConnector, arangodb.port=8529, arangodb.database.name=webperformance, task.class=io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask, tasks.max=1, topics=test, arangodb.host=10.10.10.195, name=arangodb-sink, arangodb.password=1advtec2, arangodb.user=root} (io.github.jaredpetersen.kafkaconnectarangodb.sink.config.ArangoDbSinkConfig:56)
[2021-04-06 20:02:14,726] INFO JsonConverterConfig values:
        converter.type = value
        decimal.format = BASE64
        schemas.cache.size = 1000
        schemas.enable = false
 (org.apache.kafka.connect.json.JsonConverterConfig:361)
[2021-04-06 20:02:14,728] INFO WorkerSinkTask{id=arangodb-sink-0} Sink task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:309)
[2021-04-06 20:02:14,751] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Cluster ID: L0KJ7-s_RpScEldM7-SUTw (org.apache.kafka.clients.Metadata:279)
[2021-04-06 20:02:14,752] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Discovered group coordinator questdb:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:847)
[2021-04-06 20:02:14,755] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)
[2021-04-06 20:02:14,771] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)
[2021-04-06 20:02:14,775] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Successfully joined group with generation Generation{generationId=1, memberId='connector-consumer-arangodb-sink-0-c122edff-4722-4306-b173-1f7112cf01d7', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:596)
[2021-04-06 20:02:14,777] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Finished assignment for group at generation 1: {connector-consumer-arangodb-sink-0-c122edff-4722-4306-b173-1f7112cf01d7=Assignment(partitions=[test-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:626)
[2021-04-06 20:02:14,783] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Successfully synced group in generation Generation{generationId=1, memberId='connector-consumer-arangodb-sink-0-c122edff-4722-4306-b173-1f7112cf01d7', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)
[2021-04-06 20:02:14,783] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Notifying assignor about the new Assignment(partitions=[test-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:276)
[2021-04-06 20:02:14,786] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Adding newly assigned partitions: test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:288)
[2021-04-06 20:02:14,793] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Found no committed offset for partition test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1354)
[2021-04-06 20:02:14,805] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Resetting offset for partition test-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[questdb:9092 (id: 0 rack: null)], epoch=0}}. (org.apache.kafka.clients.consumer.internals.SubscriptionState:396)
[2021-04-06 20:03:09,353] INFO writing 1 record(s) (io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask:74)
[2021-04-06 20:03:09,355] ERROR WorkerSinkTask{id=arangodb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: null (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
java.lang.NullPointerException
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.writer.RecordConverter.getKey(RecordConverter.java:69)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.writer.RecordConverter.convert(RecordConverter.java:42)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask.lambda$put$0(ArangoDbSinkTask.java:78)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask.put(ArangoDbSinkTask.java:79)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
[2021-04-06 20:03:09,358] ERROR WorkerSinkTask{id=arangodb-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.writer.RecordConverter.getKey(RecordConverter.java:69)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.writer.RecordConverter.convert(RecordConverter.java:42)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask.lambda$put$0(ArangoDbSinkTask.java:78)
        at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
        at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
        at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
        at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
        at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
        at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
        at io.github.jaredpetersen.kafkaconnectarangodb.sink.ArangoDbSinkTask.put(ArangoDbSinkTask.java:79)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        ... 10 more
[2021-04-06 20:03:09,360] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Revoke previously assigned partitions test-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307)
[2021-04-06 20:03:09,360] INFO [Consumer clientId=connector-consumer-arangodb-sink-0, groupId=connect-arangodb-sink] Member connector-consumer-arangodb-sink-0-c122edff-4722-4306-b173-1f7112cf01d7 sending LeaveGroup request to coordinator questdb:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1029)
[2021-04-06 20:03:09,367] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[2021-04-06 20:03:09,367] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[2021-04-06 20:03:09,367] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[2021-04-06 20:03:09,374] INFO App info kafka.consumer for connector-consumer-arangodb-sink-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
^C[2021-04-06 20:03:37,502] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:67)
[2021-04-06 20:03:37,512] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:327)
[2021-04-06 20:03:37,518] INFO Stopped http_8083@66d23e4a{HTTP/1.1, (http/1.1)}{0.0.0.0:8083} (org.eclipse.jetty.server.AbstractConnector:381)
[2021-04-06 20:03:37,518] INFO node0 Stopped scavenging (org.eclipse.jetty.server.session:149)
[2021-04-06 20:03:37,523] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:344)
[2021-04-06 20:03:37,524] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:102)
[2021-04-06 20:03:37,525] INFO Stopping task arangodb-sink-0 (org.apache.kafka.connect.runtime.Worker:836)
[2021-04-06 20:03:37,532] INFO Stopping connector arangodb-sink (org.apache.kafka.connect.runtime.Worker:387)
[2021-04-06 20:03:37,532] INFO Scheduled shutdown for WorkerConnector{id=arangodb-sink} (org.apache.kafka.connect.runtime.WorkerConnector:249)
[2021-04-06 20:03:37,533] INFO Completed shutdown for WorkerConnector{id=arangodb-sink} (org.apache.kafka.connect.runtime.WorkerConnector:269)
[2021-04-06 20:03:37,533] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:209)
[2021-04-06 20:03:37,534] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:66)
[2021-04-06 20:03:37,534] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
[2021-04-06 20:03:37,534] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
[2021-04-06 20:03:37,545] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
[2021-04-06 20:03:37,546] INFO App info kafka.connect for 127.0.1.1:8083 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)
[2021-04-06 20:03:37,546] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:230)
[2021-04-06 20:03:37,547] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:120)
[2021-04-06 20:03:37,547] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:72)

ttrading avatar Apr 06 '21 20:04 ttrading

Can you provide the exact Kafka record that you're sending?

If it's literally { "type": "Point", "time": 1560507792000, "value": 5 } the problem is that you are sending Kafka records without a key.

Kafka Connect ArangoDB currently relies on the Kafka record key to create documents. However, it looks like this isn't required by ArangoDB so this is something we'll have to fix.

jaredpetersen avatar Apr 06 '21 23:04 jaredpetersen