kafka-connect-storage-cloud icon indicating copy to clipboard operation
kafka-connect-storage-cloud copied to clipboard

Getting NullPointerException at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188)

Open dongxiaohe opened this issue 5 years ago • 15 comments

Hi guys,

Currently, we have a Kafka topic called maxwell which has around 10 Million records lag over 200 partitions. It seems Kafka connect S3 sinker keeps throws NPE during the bootstrap (Running state end up to Failed state in several secs). Can someone help plz, NPE is not an useful exception to help us debug the issue.

Also, restart api seems not functional as well. when I tried to use restful api to restart the failed task by calling curl -X POST localhost:8083/connectors/maxwell-s3-connector-test/restart. It seems it only restart one consumer worker, rest of workers has no response ....

Here is status topic data:

.... ect.s3.S3SinkTask.put(S3SinkTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)\n\t... 10 more\n","worker_id":"172.31.16.219:8083","generation":26} {"state":"UNASSIGNED","trace":null,"worker_id":"172.31.16.219:8083","generation":26} {"state":"RUNNING","trace":null,"worker_id":"172.31.16.219:8083","generation":26}

I was expecting it would start 10 consumers(workers) instead ? (task.max = 10). The workaround we have to do is to delete the kafka connector and recreate one ....

  • The error stack trace from status topic: {"state":"RUNNING","trace":null,"worker_id":"172.31.28.239:8083","generation":26} {"state":"RUNNING","trace":null,"worker_id":"172.31.28.239:8083","generation":26} {"state":"RUNNING","trace":null,"worker_id":"172.31.28.239:8083","generation":26} {"state":"FAILED","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.NullPointerException\n\tat io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:188)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:564)\n\t... 10 more\n","worker_id":"172.31.28.239:8083","generation":24}

  • Kafka connect version: confluentinc/cp-kafka-connect:5.0.0

  • Kafka s3 connector config:

      "config": {
          "connector.class": "io.confluent.connect.s3.S3SinkConnector",
          "tasks.max": 10,
          "topics": "maxwell",
          "flush.size": 10000,
          "rotate.schedule.interval.ms": 1800000,
          "s3.part.size": 26214400,
          "s3.region": "us-east-1",
          "s3.bucket.name": "maxwell-connect-dev",
          "s3.credentials.provider.class": "com.amazonaws.auth.EnvironmentVariableCredentialsProvider",
          "storage.class": "io.confluent.connect.s3.storage.S3Storage",
          "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
          "format.class": "io.confluent.connect.s3.format.bytearray.ByteArrayFormat",
          "format.bytearray.separator": "\n",
          "schema.compatibility": "NONE",
          "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
          "timestamp.extractor": "Wallclock",
          "partition.duration.ms": 3600000,
          "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
          "locale": "en-US",
          "timezone": "UTC"
      }
    
  • Some clarification about the config

    • "flush.size": 10000 and "rotate.schedule.interval.ms": 1800000. S3 is not a streaming service and it has request limit, so we would like to batch more records and then commit to s3.

dongxiaohe avatar Dec 04 '18 00:12 dongxiaohe

Not sure the immediate issue, but would it be possible to get debugging enabled like I did here - https://github.com/confluentinc/kafka-connect-storage-common/issues/91

Your stacktrace exactly aligns with that and would like to know why your config would be causing it since you didn't rename a topic like in the other post.

OneCricketeer avatar Dec 06 '18 00:12 OneCricketeer

RE https://github.com/confluentinc/kafka-connect-storage-common/issues/91#issuecomment-444734003 I think you meant to respond here


I've been filling in working example stacks here over time - https://github.com/cricket007/kafka-connect-sandbox

Minio is a suitable replacement. Doesn't need to run on AWS

OneCricketeer avatar Dec 06 '18 03:12 OneCricketeer

I am hitting this issue in my setup

pranavoyo avatar Apr 08 '19 07:04 pranavoyo

I'm getting this same exception but when I try to use transforms.

Without it it works just fine, but as soon I added the following to the connector configuration I got NPE.

transforms=dropPrefix
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex=.*
transforms.dropPrefix.replacement=prefix_$0

Or

transforms=dropPrefix
transforms.dropPrefix.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex=avro_(.*)
transforms.dropPrefix.replacement=$1

Or

transforms=KeyTopic
transforms.KeyTopic.type=io.confluent.connect.transforms.ExtractTopic$Key

Exception (it's always the same for all examples above):

[2019-11-27 16:10:18,826] ERROR WorkerSinkTask{id=avro_writer-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:558)
java.lang.NullPointerException
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:181)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
[2019-11-27 16:10:18,827] ERROR WorkerSinkTask{id=avro_writer-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
	at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:181)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
	... 10 more
[2019-11-27 16:10:18,828] ERROR WorkerSinkTask{id=avro_writer-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

msilvestre avatar Nov 27 '19 16:11 msilvestre

Hi, same issue as @msilvestre ... I'm using the io.confluent.connect.s3.S3SinkConnectorand my tranform config looks like this:

"transforms": "AddSuffix",
"transforms.AddSuffix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.AddSuffix.regex": ".*",
"transforms.AddSuffix.replacement": "$0_bak"

compchi avatar Jan 02 '20 22:01 compchi

Hi @dongxiaohe, it looks like the NPE comes from the topic partition not being found - was there any topic or topic partition manipulation done?

aakashnshah avatar Jan 08 '20 18:01 aakashnshah

@aakashnshah the RegexRouter is changing the topic name dynamically but this should work as in other connectors

carlos-verdes avatar Aug 13 '20 17:08 carlos-verdes

@aakashnshah We're having a similar issue with the NPE when modifying the topic name before it goes to the S3SinkConnector. Our expectation is that the S3SinkConnector would just translate the new topic as part of the path that should be getting written too. Is it actually more nuanced than that? Is there more validation happening where it expects that topic to actually exist?

winglian avatar Feb 20 '21 15:02 winglian

I can confirm that we also only have this on the transform... it seems like it might be specific to s3/cloud-storage connector, the same setup for JDBCSinkConnector works for routing to different tables as this would be to route to specific folders. The stack trace is pretty awful, so i'm wondering if is actually catch bug that's obfuscating the underlying stack trace.

"transforms.ExtractTopic.type": "io.confluent.connect.transforms.ExtractTopic$Value"
"transforms.ExtractTopic.field": "eventType"
"transforms.ExtractTopic.skip.missing.or.null": true

pcfleischer avatar Mar 24 '21 20:03 pcfleischer

It looks like there are some writers initialized on open based upon the context of the connector source topics, and when the records are being written there are no writers initialized based upon the records after the mapping.

I also notice they dropped the assignment model in the initialization in recent versions, perhaps the below would be a solution?

Initialization

  @Override
  public void open(Collection<TopicPartition> partitions) {
    // assignment should be empty, either because this is the initial call or because it follows
    // a call to "close".
    assignment.addAll(partitions);
    for (TopicPartition tp : assignment) {
      topicPartitionWriters.put(tp, newTopicPartitionWriter(tp));
    }
  }

Usage

  @Override
  public void put(Collection<SinkRecord> records) throws ConnectException {
    for (SinkRecord record : records) {
      String topic = record.topic();
      int partition = record.kafkaPartition();
      TopicPartition tp = new TopicPartition(topic, partition);
      // this won't handle a rename since topic partition writers map is built of sources
      topicPartitionWriters.get(tp).buffer(record);
    }
  ...
  }

Potential Solution

  @Override
  public void put(Collection<SinkRecord> records) throws ConnectException {
    for (SinkRecord record : records) {
      String topic = record.topic();
      int partition = record.kafkaPartition();
      TopicPartition tp = new TopicPartition(topic, partition);
      // will this work???
      TopicPartitionWriter writer = topicPartitionWriters.get(tp);
      if (writer == null) {
        writer = newTopicPartitionWriter(tp);
        assignment.add(tp);
        topicPartitionWriters.put(tp, writer);
      }
      writer.buffer(record);
    }
  ...
  }

pcfleischer avatar Mar 24 '21 22:03 pcfleischer

I tried this solution on local and it didn't work... and it seems for good reason as the writer uses buffered writes per topic, so splitting/routing from one to one seems possible since it's one consumer but one to multiple (our use case) seems like a more fundamental change since there would be multiple file buffers per consumer.

In our case, we're using filters with multiple connectors instead and changing the parent directory.

pcfleischer avatar Mar 29 '21 19:03 pcfleischer

Same issue as @msilvestre and others here. Regex router on s3 sink makes it go NPE.

(Not sure if it's the same cause as @dongxiaohe, though. Shouldn't this issue be splitted ?)

mdespriee avatar Aug 02 '21 13:08 mdespriee

I've found the cause for this NPE and I'm working on a fix

Whenever we apply transformers to a topic it looses the ability to interface with the source topic/partition, thus we need to keep track of it.

here https://github.com/fmeyer/kafka-connect-storage-cloud/commit/8448b5954a6f359d8c25834739ef18f64ae125ff

fmeyer avatar Jan 10 '22 20:01 fmeyer

PR for the transforms issue is here #480 @pcfleischer @mdespriee let me know if you can try it on your cases.

fmeyer avatar Jan 13 '22 15:01 fmeyer

Hi @fmeyer I had compiled your code in my local and copied the .jar file to my plugins directory, then i had run the connector with the 'RegexRouter' transformation , it is working as expected , i can see the topic name getting changed.

But not sure why this PR is not merged , are there any unknown issues if we merge this changes ?

spider-123-eng avatar Feb 17 '23 10:02 spider-123-eng