kafka-connect-jdbc icon indicating copy to clipboard operation
kafka-connect-jdbc copied to clipboard

Limit Database Query to Configurable Number of Rows

Open philipschm1tt opened this issue 3 years ago • 13 comments

Idea Add a configurable limit to a number of rows to the database query.

Context We use the JDBC source connector on a large database table with a million new rows per day in mode timestamp+incrementing. We have had outages where the JDBC source connector fell multiple days behind and then had to be restarted. When trying to catch up, the database query became too expensive – it takes so long that the connector does not manage to catch up.

Our workaround: We set the timestamp.delay.interval.ms multiple days in the past so that the query returns fewer rows and the connector processes them quickly. Then we reduce the timestamp.delay.interval.ms step by step to allow the connector to catch up.

Ideally, we would like to limit the connector to query only a certain number of rows – for example 100k at a time. Then the querys would stay fast even when the connector has a large lag to the database table. Then we would not need to adjust the timestamp.delay.interval.ms.

Since the connector adds the WHERE...ORDER BY... itself, we cannot directly add the limit in the query.

What do you think?

philipschm1tt avatar Nov 25 '20 16:11 philipschm1tt

If you add '--' to the end of your query, you can comment out the WHERE and ORDER BY which the connector appends and then include them in the query yourself. That way you get complete control over the whole query. You can add the LIMIT wherever you want then. In our case, we were able to add a TOP and move the WHERE inside a subquery.

cannontrodder avatar Jan 17 '21 00:01 cannontrodder

Thanks, @cannontrodder – I did not think of that :)

It might still be a nice feature.

philipschm1tt avatar Jan 17 '21 08:01 philipschm1tt

Thanks, @cannontrodder – I did not think of that :)

It might still be a nice feature.

I'd actually like it if we could provide/implement the query for the connector instead of have it append SQL snippets to our queries. My suggestion is technically a SQL injection hack!!!

cannontrodder avatar Jan 18 '21 23:01 cannontrodder

The only problem with that, how are you managing the offset? You will miss records of you are not managing limit and offset in this scenario correct? Or is that not of concern?

cmrenus avatar Feb 02 '21 17:02 cmrenus

I think it is a much needed feature. It can be implemented by having a property like "limit_clause" (with "top", "limit", "fetch first", etc. as possible values). And JDBC connector should put the limit/top clause at appropriate place in query.

ussama-rafique avatar Feb 25 '21 17:02 ussama-rafique

Hi @maeh2k , @cannontrodder , @cmrenus , @ussama-rafique , @rhauch

We are having the same problem against oracle database but the observed behavior is pretty much the same.

@maeh2k , your proposal is actually what we are applying for the workaround. But we have numerous jobs and every config change is causing a rebalance which makes things a lot harder to manage...

As @cmrenus indicated, I also think that hacking the sql that way would lead to offset confusion. Indeed, a similar solution proposed for the same problem in this so thread , but I believe it will fail because of the same reason.

@rhauch , this seems to be common and needed feature , what do you think? Do you have any suggestions?

If this somehow gets implemented, the only possible problem that comes to my mind is: How connect should handle the situation when the actual number of records for a single incrementing criteria instance is greater than the configured max number of rows?

To be more explicit, assume that timestamp incrementing criteria is used and there are 100 records in db with the exact same timestamp value. If the configured max number of rows is 10, how should connect increment the offset after polling 10 records? Simply advancing the offset would lead to missing of the remaining 90 records ? But in order to be able to consume them in the next poll, another offset column would be required since the records are not separable by timestamp offset column? Or this situation should be documented as a known/expected limitation?

Best regards

simpleusr avatar Mar 01 '21 11:03 simpleusr

@simpleusr we have recently switched to @cannontrodder's "SQL injection workaround".

I have not checked the code but I would assume that this still will not miss records if you inject the limit correctly (in Oracle with a nested SELECT).

If the connector correctly stores the new offset based on the latest database row it processed, it would still have the same timestamp as the 100 records. But it would set the offset of the 10th record. Then the next query would use that timestamp and offset and still return the remaining 90 records.

philipschm1tt avatar Mar 01 '21 11:03 philipschm1tt

@maeh2k

Could you please provide a concrete example since I still could not get it (may be you can share the actual query obfuscurating the table and column names and simplifying if possible)?

Without any code change I think you implemented sth like the proposed so answer I mentioned about

If so how could you pass the offset to the inner select?

I also could not understand the second part either.

But it would set the offset of the 10th record. Then the next query would use that timestamp and offset and still return the remaining 90 record

When we are using the timestamp mode and all the 100 records have the same exact time stamp x , offset in connect will be set to x after consuming these 10 right? How would the next query proceed with the 90 records with timestamp value of x? I assume the next query should be wrapped like : "#query from task config# where timestamp_column>x" and this will be missing them?

Regards

simpleusr avatar Mar 01 '21 12:03 simpleusr

For Oracle, the query would look like this:

"SELECT * FROM
              (SELECT * FROM
                             (SELECT TIMESTAMP, INCREMENTING, SOME_VALUE FROM SOMETABLE)
              WHERE TIMESTAMP < ? AND ((TIMESTAMP = ? AND INCREMENTING > ?) OR CREATED_TIMESTAMP > ?)
              ORDER BY TIMESTAMP ASC, INCREMENTING ASC)
WHERE ROWNUM < 10000 --"

That is, you take the WHERE condition from the timestamp+incrementing configuration. You wrap it with another select to limit the number of rows in the Oracle style. The implementation provides the query parameters. The -- at the end comments out the orgiginal WHERE condition.

philipschm1tt avatar Mar 02 '21 09:03 philipschm1tt

Hi @maeh2k

Many thanks for the collaboration. Stupid me, I was missing that "--" at the end which comments out the appended where clause by connect...

To put it another way: per my understanding , what you achieve is plugging the timestampIncrementingWhereClause logic :

 protected void timestampIncrementingWhereClause(ExpressionBuilder builder) {
    // This version combines two possible conditions. The first checks timestamp == last
    // timestamp and incrementing > last incrementing. The timestamp alone would include
    // duplicates, but adding the incrementing condition ensures no duplicates, e.g. you would
    // get only the row with id = 23:
    //  timestamp 1234, id 22 <- last
    //  timestamp 1234, id 23
    // The second check only uses the timestamp >= last timestamp. This covers everything new,
    // even if it is an update of the existing row. If we previously had:
    //  timestamp 1234, id 22 <- last
    // and then these rows were written:
    //  timestamp 1235, id 22
    //  timestamp 1236, id 23
    // We should capture both id = 22 (an update) and id = 23 (a new row)
    builder.append(" WHERE ");
    coalesceTimestampColumns(builder);
    builder.append(" < ? AND ((");
    coalesceTimestampColumns(builder);
    builder.append(" = ? AND ");
    builder.append(incrementingColumn);
    builder.append(" > ?");
    builder.append(") OR ");
    coalesceTimestampColumns(builder);
    builder.append(" > ?)");
    builder.append(" ORDER BY ");
    coalesceTimestampColumns(builder);
    builder.append(",");
    builder.append(incrementingColumn);
    builder.append(" ASC");
  }

to your row limited query, without breaking connect query semantics and this I believe is a very clever workaround solution.

For my case, I am using mode timestamp (as opposed to yours of timestamp+incrementing)

So I had to wrap my query as below:

 SELECT * FROM
              (SELECT * FROM
                             (SELECT TIMESTAMP, SOME_VALUE FROM SOMETABLE)
              WHERE TIMESTAMP > ? AND TIMESTAMP < ? ORDER BY TIMESTAMP ASC )
WHERE ROWNUM < 10000

to plug in the timestampWhereClause logic :

 protected void timestampWhereClause(ExpressionBuilder builder) {
    builder.append(" WHERE ");
    coalesceTimestampColumns(builder);
    builder.append(" > ? AND ");
    coalesceTimestampColumns(builder);
    builder.append(" < ? ORDER BY ");
    coalesceTimestampColumns(builder);
    builder.append(" ASC");
  }

And it seems to work as expected in test environment. Many thanks again...

But unfortunately , skipping records may occur for my case if number of rows having the exact same timestamp value is greater than the provided rownum value.

You actually have two offset columns timestamp and incrementing. For my case, unfortunately, there is no auto increment like column to be used as an incrementing column, just the timestamp column. Timestamp + incrementing combination can uniquely identify a row from table for your case , but not for my case since I only have a single column.

But anyway , given the situation, this seems to be the best option. So many thanks again:)

Lastly , I want to have your thoughts on query timeout. I opened an issue but did not receive any comments/feedback until now. What do you think about that? Did you make any customizations for applying timeout?

Best regards

simpleusr avatar Mar 02 '21 12:03 simpleusr

So why isn't batch.max.rows just added as a limit statement to the generated query? (Up until I hit this error yesterday, that was my assumption) java.sql.SQLTransientConnectionException: (conn=8590895) Error writing file '/rdsdbdata/tmp/MYy37HKO' (Errcode: 28 - No space left on device) Apparently if you query against a very large table, even if you only want 10k rows at a time it's going to kill your server because it attempts to write all the data to temp space.

crutis avatar Jan 13 '22 17:01 crutis

I don't have an oracle setup but how about using the "query.suffix" configuration parameter.

https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/source_config_options.html.

The documentation claims that this is the "suffix to append at the end of the generated query". Could this be an option?

For example: "query.suffix": " FETCH FIRST 10 ROWS ONLY".

luxpermanet avatar Feb 04 '22 10:02 luxpermanet

@luxpermanet it can work, sometimes the query you need to use to pull from the table requires some nesting of subqueries and in order to force a particular execution plan, the LIMIT or TOP directive needs to be in a subquery with the connect applied WHERE clause needing to be outside it. The above solution from me helps in that case. What would be better is just being able to completely replace the query used when there is an offset in connects topics with our own.

cannontrodder avatar Feb 04 '22 12:02 cannontrodder