kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

How to use batch.size?

Open dipdeb opened this issue 8 years ago • 35 comments

Hi, I want to read only 50 records in a batch through jdbc sink, for which I've used the batch.size in the jdbc sink config file:

name=jdbc-sink connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 batch.size=50 topics=postgres_users

connection.url=jdbc:postgresql://localhost:34771/postgres?user=foo&password=bar file=test.sink.txt auto.create=true

But the batch.size has no effect as records are getting inserted into the database when new records are inserted into the source database.

How can I achieve to insert in a batch of 50?

dipdeb avatar Sep 25 '17 10:09 dipdeb

batch.size is best effort for each set of records delivered to the sink connector:

Specifies how many records to attempt to batch together for insertion into the destination table, when possible.

ewencp avatar Sep 25 '17 16:09 ewencp

@ewencp sorry I couldn't get your reply.

dipdeb avatar Sep 26 '17 07:09 dipdeb

I'm also struggling to increase Sink batch more than 500. It seems batch.size is not for this and as @ewencp mentioned, it's to deliver data to sink connector. I highly appreciate any clue about customizing batch size of the connector.

sashati avatar Nov 13 '17 08:11 sashati

@saeedsh instead of batching it on the sink side, I guess it can be done on the source connector side or you can try to increase the poll.interval.ms in source config.

dipdeb avatar Dec 01 '17 06:12 dipdeb

Thanks @dipdeb. But my problem is when I have data in Kafka and need to Sink them. For example, when I have a million records in Kafka and run JDBC Sink connector, it sends to DB in batches, 500 each, which takes quite time. I don't know how to increase number of records go to DB.

sashati avatar Dec 01 '17 07:12 sashati

The Connect worker consumes the messages from the topics, and the consumer's max.poll.records specifies the maximum number of records that will be returned by a single poll. The connector's batch.size can really never be larger than this value, since that's the maximum number of records that will be processed at one time. Try changing that consumer property for your connector.

rhauch avatar Dec 04 '17 17:12 rhauch

I am hitting this issue as well. Even with max.poll.records set as high as 4000, I can't get more than about 80-100 rows per batch.

I have 8 partitions across 3 brokers. The max.poll.records is set to 4000, the tasks.max is set to 4 ( also tried 8 ) and batch.size is set to 1000 ( also tried 500 ).

I turned on DEBUG to see if the logs turned up anything and here is an example:

[2018-03-13 15:09:59,807] DEBUG [Consumer clientId=consumer-23, groupId=connect-oracle-jdbc-sink-steam-1] Fetch READ_UNCOMMITTED at offset 2073259 for partition dmg.inode.steam.1-
7 returned fetch data (error=NONE, highWaterMark=2073327, lastStableOffset = -1, logStartOffset = 673821, abortedTransactions = null, recordsSizeInBytes=22525) (org.apache.kafka.c
lients.consumer.internals.Fetcher)
[2018-03-13 15:09:59,808] DEBUG [Consumer clientId=consumer-23, groupId=connect-oracle-jdbc-sink-steam-1] Added READ_UNCOMMITTED fetch request for partition dmg.inode.steam.1-7 at
 offset 2073327 to node kafka-dmg-test-01.pixar.com:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)
[2018-03-13 15:09:59,808] DEBUG [Consumer clientId=consumer-23, groupId=connect-oracle-jdbc-sink-steam-1] Sending READ_UNCOMMITTED fetch for partitions [dmg.inode.steam.1-7] to br
oker kafka-dmg-test-01.pixar.com:9092 (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.Fetcher)

Any help would be appreciated. These small batch sizes are causing the destination DB ( oracle ) to commit too frequently and is causing waits and IO contention.

atman9 avatar Mar 13 '18 22:03 atman9

I was able to get greater throughput by adjusting the following properties in the connect-distributed.properties:

# based on 500*180byte message size
consumer.fetch.min.bytes=900000
consumer.fetch.wait.max.ms=1000

atman9 avatar Mar 15 '18 21:03 atman9

Did the above work for anyone else? I still am stuck at 500 records per batch

emasatsugu avatar Mar 13 '19 21:03 emasatsugu

No. It seems 500 is fixed. I couldn't increase it as well

sashati avatar Mar 13 '19 21:03 sashati

interesting discussion, I would like to add to this discussion that what if batch size is set to 500 and the topic has 499 records, and it takes an hour for 500th record to be inserted into topic, will the Sink connector halt completely for that one hour? is there a maximum wait limit property for Jdbc-Sinks?

imranrajjad avatar May 09 '19 07:05 imranrajjad

I have same problem. I changed batch.size, max.poll.records, consumer.fetch.min.bytes and consumer.fetch.wait.max.ms. but looks still update 500 rows. It's not changed. Is there any solutions to fix that? I used below stack. connect = confluent-kafka-connect-jdbc/5.2.2-1 version. Target DB = postgresql Kafka = 2.1.0

jukops avatar Jul 15 '19 07:07 jukops

I am having the same issue, and looking at the code the buffer is flushed once per SinkTask.put method call, so the limiting factor is the number of messages consumed from kafka at a time.

cameronbraid avatar Jul 23 '19 09:07 cameronbraid

@sashati have you got any solution?

surajshinde427 avatar Oct 25 '19 10:10 surajshinde427

@jukops Hello. I have the same problem, but my records are saved by one, not in batches (https://stackoverflow.com/questions/59049762/kafka-jdbc-sink-connector-insert-values-in-batches). I tried to use this options:

batch.size=500

# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000

This options do not give any results for me (records always saved by one). How did you achieve saving in batches?

MashaFomina avatar Nov 27 '19 12:11 MashaFomina

The Connect worker consumes the messages from the topics, and the consumer's max.poll.records specifies the maximum number of records that will be returned by a single poll. The connector's batch.size can really never be larger than this value, since that's the maximum number of records that will be processed at one time. Try changing that consumer property for your connector.

@rhauch, do you know if this behavior is still correct? If so, would it be possible to update the docs to mention that 'batch.size' is essentially tightly coupled to 'max.poll.records' ?

b96m avatar Apr 15 '20 22:04 b96m

@MashaFomina Did you ever find a workaround here? As it stands, we're stuck using a custom consumer to perform this sink job. I still find it hard to believe this would only batch by creating 1000s of individual insert statements as this completely neuters what databases are amazingly efficient at. I don't have much Postgres experience, but for MySQL this would wreak havoc on the binlog and replication.

I suppose Kafka would not be aware of the exact offset in the batch that fails the insert but there will always be unaccounted for issues with garbage in. Or maybe some of the database flavors this connector supports have compatibility issues here or fail multiple inserts differently? The only thing I know for certain is that for MS SQL, MySql and Oracle this is a huge performance deal breaker.

unfrgivn avatar Apr 19 '20 18:04 unfrgivn

@unfrgivn I did not find proper decision, as i finally understood that records are really polled from topic by batches, but are inserted in database in transaction that contain batch of inserts by one record. I investigated speed of writing to databse. You can try to write you own consumer of data to insert records in database.

MashaFomina avatar Apr 19 '20 20:04 MashaFomina

@MashaFomina yea I rewrote this connector with similar config as a consumer. Just asking because I'd love to use the Connect API the same way I do for my other source/sinks, but the performance is almost 30x worse with 1 insert per row, plus the massive increase in disk I/O for MySQL binlog transactions (assuming the most common setup using innodb with autocommit on).

unfrgivn avatar Apr 19 '20 22:04 unfrgivn

@jukops Hello. I have the same problem, but my records are saved by one, not in batches (https://stackoverflow.com/questions/59049762/kafka-jdbc-sink-connector-insert-values-in-batches). I tried to use this options:

batch.size=500

# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000

This options do not give any results for me (records always saved by one). How did you achieve saving in batches?

Are you setting these properties from the worker side or from the connector (JSON) side? One thing that I don't think is documented well is on the connector (JSON) side you need use the prefix consumer.override. to set consumer properties, not just consumer. and all of this AFTER setting connector.client.config.override.policy=All or similar on the Worker side.

Regardless, I would be quite surprised if your records were actually being inserted one at a time. If you can't get over 500, make sure to follow the above. You can turn the root log level to DEBUG and search for "Flushing" as it will tell you how many records it is flushing at a time.

b96m avatar Apr 20 '20 19:04 b96m

for MySQL this would wreak havoc on the binlog and replication

@unfrgivn I'd love to see the tweaks you made to the sink connector to support batch inserts. My team is running into this exact issue, and we seem to have no choice but to fork the repo and write our own custom implementation of the JDBCSinkTask

dconger avatar Jun 05 '20 15:06 dconger

@dconger we are using a nodejs implementation of a Kafka consumer to replicate the sink functionality so it's not a fork nor even a Kafka Connector

unfrgivn avatar Jun 05 '20 16:06 unfrgivn

@ewencp Does it mean that we should create custom sink connector with a custom adjusted poll method that runs on bigger batches? Just to clarify if there is no solution for this somehow yet or it's in the road map for near future.

ugurcancetin avatar Jun 30 '20 17:06 ugurcancetin

Is there any solution?

cobolbaby avatar Jul 28 '20 04:07 cobolbaby

so, Do we have any authoritative Kafka connect source and sink test reports? such as mysql to kafka... And we can see what records per sec is normal

AndreaJulianos avatar Jul 31 '20 07:07 AndreaJulianos

I've no experience with JDBC connectors, but assuming that this is generic configuration for all kinds of connectors, and based on this stackoverflow answer, did you try adding consumer.max.poll.records property to the worker configuration?

I was struggling with this while working with S3 sink connectors, where it was always configuring the max.poll.records to 500 by default. The property above fixed the issue for me.

sergeykranga avatar Aug 04 '20 13:08 sergeykranga

I've no experience with JDBC connectors, but assuming that this is generic configuration for all kinds of connectors, and based on this stackoverflow answer, did you try adding consumer.max.poll.records property to the worker configuration?

I was struggling with this while working with S3 sink connectors, where it was always configuring the max.poll.records to 500 by default. The property above fixed the issue for me.

Sorry I'm also very interested in this topic but I have a question, this consumer.max.poll.records is this a Kafka configuration or would this be defined at each connector level?

I'm asking and interested in this topic because I noticed that my connectors are waiting for days (I'm in a development setup and I don't have massive load on my system) for the data to be pushed from Kafka to the sink systems (both jdbc and s3) I was expecting a delay of a couple of minutes (5, 10, 20?) but not days so I'm a bit lost how can I enforce the data to be sinked without needing to make the flush size 1

cstmgl avatar Jan 11 '21 09:01 cstmgl

I was able to increase the batch size behavior. As indicated above, Kafka Connect needs to enable connector.client.config.override.policy=All and the connector needs to use settings: batch.size: 4000, consumer.override.max.poll.records: 4000.

I was expecting a performance increase, but it stayed around 4k messages per second. When I increased the amount of connector tasks though, I was able to get over 20k messages per second.

marcelrend avatar Apr 26 '21 13:04 marcelrend

@MrMarshall was was your setup like, # of topic partitions? # of tasks for the connector?

ykcai avatar May 12 '21 02:05 ykcai

@MrMarshall was was your setup like, # of topic partitions? # of tasks for the connector?

@ykcai I ran 3 brokers on the same machine, the topic had 6 partitions I believe. I ran multiple tests with a varying number of connector tasks and batch size. I checked the amount of records in postgres every 5 seconds and plotted the added amount of records in the graph below. Each line shows numberoftasks_batchsize, e.g. the brown line has 10 tasks and 10k batch size. I didn't even include running only 1 tasks because it took about 3x longer and that messed up the graph.

My conclusion was that batch size does not significantly affect the performance, but the number of tasks does. When using 6 partitions it doesn't help much to use 5 tasks because one of the tasks will need to take care of 2 partitions, so in that case it's better to use 2, 3 or 6 tasks.

2021-05-12 08_20_23-performance_test - Excel

marcelrend avatar May 12 '21 06:05 marcelrend