flink-connector-jdbc icon indicating copy to clipboard operation
flink-connector-jdbc copied to clipboard

[FLINK-37809][Connector/JDBC] sqlserver limit statement support

Open Sleepy0521 opened this issue 11 months ago • 6 comments

fix flink sql jdbc limit statement for support sqlserver query

Sleepy0521 avatar Apr 02 '25 09:04 Sleepy0521

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

boring-cyborg[bot] avatar Apr 02 '25 09:04 boring-cyborg[bot]

I am not sure this is a hotfix, please could you raise a Jira explaining

  • what the current issue with SQL Server is
  • why this is not being solved in the dialect. I do not think that JdbcDynamicTablesource should have any reference to a specific dialect. Would it be more appropriate to use the Select TOP for all dialect?
  • if the SQL Server issue was not causing a unit test failure - we need to add a unit test in this area.

I applied for a Jira account but it hasn't been approved yet.

  • The problem occurs when you use the SQL Server JDBC connector to query a SQL Server table. If you don't add a LIMIT statement or any WHERE clause, the Flink connector will query the entire table, which can impose IO pressure on the SQL Server.
  • For simple queries or tests where you only need to retrieve a few hundred records, using the LIMIT statement would be more efficient. However, Flink sqlserver connector does not support this feature. SqlServerDialect.java
    
    @Override
    public String getLimitClause(long limit) {
        throw new IllegalArgumentException("SqlServerDialect does not support limit clause");
    }
  • why this is not being solved in the dialect. The JdbcDynamicTableSource directly appends the LIMIT statement at the end of the query, even though SQL Server clearly does not support this syntax. so, I added an if-else block in JdbcDynamicTableSource
    if (limit >= 0) {
        query = String.format("%s %s", query, dialect.getLimitClause(limit));
    }

Sleepy0521 avatar Apr 08 '25 07:04 Sleepy0521

I agree with @davidradl — JdbcDynamicTableSource should not have any direct reference to a specific dialect, and we need to add a unit test for this..

Since we're assuming that getLimitClause will always append the clause at the end of the query, I suggest introducing a new method in JdbcDialect:

String addLimitClause(String query, long limit);

We can then provide a default implementation in AbstractDialect:

@Override
public String addLimitClause(String query, long limit) {
    return String.format("%s %s", query, dialect.getLimitClause(limit));
}

Changing in JdbcDynamicTableSource

if (limit >= 0) {
    query = dialect.addLimitClause(query, limit);
}

For dialects like SQL Server, this method can be overridden to inject the limit clause appropriately. For example:

// SqlServerDialect
@Override
public String addLimitClause(String query, long limit) {
    return query.replace("SELECT", String.format("SELECT TOP %s", limit));
}

Optionally, the if (limit >= 0) check in JdbcDynamicTableSource could be moved into this new method for better encapsulation, but I’ve left it out here for simplicity.

eskabetxe avatar Apr 10 '25 09:04 eskabetxe

I agree with @davidradl — JdbcDynamicTableSource should not have any direct reference to a specific dialect, and we need to add a unit test for this..

Since we're assuming that getLimitClause will always append the clause at the end of the query, I suggest introducing a new method in JdbcDialect:

String addLimitClause(String query, long limit);

We can then provide a default implementation in AbstractDialect:

@Override
public String addLimitClause(String query, long limit) {
    return String.format("%s %s", query, dialect.getLimitClause(limit));
}

Changing in JdbcDynamicTableSource

if (limit >= 0) {
    query = dialect.addLimitClause(query, limit);
}

For dialects like SQL Server, this method can be overridden to inject the limit clause appropriately. For example:

// SqlServerDialect
@Override
public String addLimitClause(String query, long limit) {
    return query.replace("SELECT", String.format("SELECT TOP %s", limit));
}

Optionally, the if (limit >= 0) check in JdbcDynamicTableSource could be moved into this new method for better encapsulation, but I’ve left it out here for simplicity.

Good idea I commit again. please check it

Sleepy0521 avatar Apr 21 '25 00:04 Sleepy0521

@Sleepy0521 I think we really need a Jira for this -as it is not a hot fix. Did you get your Jira account approved? If not I suggest chasing on the dev list.

davidradl avatar May 07 '25 10:05 davidradl

@Sleepy0521 I think we really need a Jira for this -as it is not a hot fix. Did you get your Jira account approved? If not I suggest chasing on the dev list.

@eskabetxe @davidradl I create a jira issue to talk about the problem https://issues.apache.org/jira/browse/FLINK-37809

Sleepy0521 avatar May 16 '25 02:05 Sleepy0521

This PR is being marked as stale since it has not had any activity in the last 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out to the community, contact details can be found here: https://flink.apache.org/what-is-flink/community/

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

github-actions[bot] avatar Nov 29 '25 06:11 github-actions[bot]