opensearch-connector-for-apache-kafka icon indicating copy to clipboard operation
opensearch-connector-for-apache-kafka copied to clipboard

got error for sending bulk request

Open parsingk opened this issue 1 year ago • 2 comments

  • opensearch-connector-for-apache-kafka-1.0.1
  • Amazon Opensearch Service version 1.2

error:

ERROR Failed to send bulk request from batch 2258 of 22 records (io.aiven.kafka.connect.opensearch.BulkProcessor:378)
ERROR Failed to bulk processing after total of 1 attempt(s) (io.aiven.kafka.connect.opensearch.RetryUtil:136)
ERROR WorkerSinkTask{id=es-sink-1-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:398)
INFO [Consumer clientId=es-sink-1-0, groupId=es-sink-1] Seeking to offset 295133 for partition log_tb-0 (org.apache.kafka.clients.consumer.KafkaConsumer:1587)
ERROR WorkerSinkTask{id=es-sink-1-0} Commit of offsets threw an unexpected exception for sequence number 2485: null (org.apache.kafka.connect.runtime.WorkerSinkTask:267)
ERROR WorkerSinkTask{id=es-sink-1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Failed to bulk processing after total of 1 attempt(s) (org.apache.kafka.connect.runtime.WorkerSinkTask:612)
WARN WorkerSinkTask{id=es-sink-1-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask:395)
ERROR WorkerSinkTask{id=es-sink-1-0} Commit of offsets threw an unexpected exception for sequence number 2486: null (org.apache.kafka.connect.runtime.WorkerSinkTask:267)
ERROR WorkerSinkTask{id=es-sink-1-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:187)
INFO Stopping OpensearchSinkTask. (io.aiven.kafka.connect.opensearch.OpensearchSinkTask:189)
WARN Failed to flush during stop (io.aiven.kafka.connect.opensearch.OpensearchClient:166)
INFO [Consumer clientId=es-sink-1-0, groupId=es-sink-1] Revoke previously assigned partitions log_tb-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:307)
INFO [Consumer clientId=es-sink-1-0, groupId=es-sink-1] Member es-sink-1-0-f73be1a7-644a-4b4a-8890-063ce48e67f5 sending LeaveGroup request to coordinator <ip>:9092 (id: 2147483647 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1029)
INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics:668)
INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics:672)
INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics:678)
INFO App info kafka.consumer for es-sink-1-0 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)

my sink connector configuration.

name=es-sink-1
connector.class=io.aiven.kafka.connect.opensearch.OpensearchSinkConnector
task.max=1
batch.size=100
max.buffered.records=1000
max.retries=10
retry.backoff.ms=1000
max.in.flight.requests=3
flush.timeout.ms=120000
linger.ms=600000
topic.key.ignore=false
topics=log_tb
connection.url=<amazon opensearch url>
connection.username=<user>
connection.password=<password>
type.name=kafka-connect
read.timeout.ms=20000
transforms=TimestampConverter,TimestampRouter,CreateKey,Extract
transforms.TimestampConverter.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.TimestampConverter.field=rdate
transforms.TimestampConverter.format=yyyy-MM-dd HH:mm:ss
transforms.TimestampConverter.target.type=Timestamp
transforms.TimestampRouter.type=org.apache.kafka.connect.transforms.TimestampRouter
transforms.TimestampRouter.topic.format=log-${timestamp}
transforms.TimestampRouter.timestamp.format=yyyy-MM-dd
transforms.CreateKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.CreateKey.fields=idx
transforms.Extract.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.Extract.field=idx

It is work after i start my kafka-connect service. But 1~2 days later, it is always occurred that errors, then unregistered sink connector. I wonder there is some bug with opensearch 1.2.

Can someone help me? Thank you very much in advance.

parsingk avatar Jul 11 '22 04:07 parsingk

the reason might be #97, try setting max.in.flight.requests to 1

Danny02 avatar Aug 16 '22 14:08 Danny02

@parsingk Do you have a stacktrace for this error, or can you reproduce it? There's not sufficient information in this error report to determine what's gone wrong.

gharris1727 avatar Apr 05 '23 21:04 gharris1727