seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[Feature][Transform-V2][SQL] Support case when clause for SQL Transform plugin (#5013)

Open javalover123 opened this issue 1 year ago • 4 comments

Purpose of this pull request

Check list

  • [x] Code changed are covered with tests, or it does not need tests for reason:
  • [x] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
  • [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
  • [x] If you are contributing the connector code, please check that the following files are updated:
    1. Update change log that in connector document. For more details you can refer to connector-v2
    2. Update plugin-mapping.properties and add new connector information in it
    3. Update the pom file of seatunnel-dist
  • [x] Update the release-note.

javalover123 avatar Jul 04 '23 01:07 javalover123

Please add e2e for this update.

EricJoy2048 avatar Jul 11 '23 08:07 EricJoy2048

OK, added. apache/seatunnel-website#253

I am sorry, I didn't express myself clearly. You need update the document https://github.com/apache/seatunnel/blob/dev/docs/en/transform-v2/sql-functions.md

EricJoy2048 avatar Jul 13 '23 05:07 EricJoy2048

Hi, @rewerma , Can you help to review this pr?

EricJoy2048 avatar Jul 13 '23 05:07 EricJoy2048

there's a bug,If it is bigint, the case when cannot be matched image image

wu-a-ge avatar Jul 27 '23 07:07 wu-a-ge

@javalover123 there's a bug

wu-a-ge avatar Jul 27 '23 07:07 wu-a-ge

besides int,tinyint smallint bigint can't nornal match

wu-a-ge avatar Jul 27 '23 07:07 wu-a-ge

@wu-a-ge Updated, but I haven't run e2e test already. https://github.com/apache/seatunnel/commit/5f5f623b87fbcde0f3aac983d9c7f04266eb7c1a

javalover123 avatar Jul 27 '23 14:07 javalover123

@wu-a-ge Updated, but I haven't run e2e test already. 5f5f623

another bug,my job config :



env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 5000
  #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}

source {
  # This is a example source plugin **only for test and demonstrate the feature source plugin**
  FakeSource {
    result_table_name = "fake"
    parallelism = 1
    schema = {
      fields {
        name = "string"
        age = "bigint"
      }
    }
    rows = [
      {fields = ["javalover123",6], kind = INSERT}
    ]
  }
}

transform {
  sql {
    source_table_name="fake"
    result_table_name="result_0"
    query="select name,case age when 6 then 1 else age end as age from fake"
  }
}

sink {
  jdbc {
    url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "123456"
    source_table_name="result_0"
    database="seatunnel"
    table="test"
    generate_sink_sql="true"
  }
}

exception:

2023-07-30 23:38:38,172 WARN  org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-709778] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@64ec1e6
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:209) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:71) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:51) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:52) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[classes/:?]
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613) [classes/:?]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_281]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_281]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_281]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_281]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_281]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_281]
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:145) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:45) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:31) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:51) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:204) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:169) ~[classes/:?]
	at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.prepareCommit(JdbcSinkWriter.java:88) ~[classes/:?]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:159) ~[classes/:?]
	... 16 more

@javalover123

wu-a-ge avatar Jul 30 '23 15:07 wu-a-ge

Thanks, can you give some help/advice? @wu-a-ge

javalover123 avatar Jul 31 '23 05:07 javalover123

Thanks, can you give some help/advice? @wu-a-ge

Thanks for your contribution. My current workaround is to directly adjust the toExternal method in the AbstractJdbcRowConverter class. All casts are parsed using the parse method that wraps the type, but I personally believe that it is still necessary to determine the data type to solve the problem of different types of conversions image

wu-a-ge avatar Jul 31 '23 05:07 wu-a-ge

Update, please check, thanks. @wu-a-ge In another way, Number.*Value() method performance should be better when run with JDK 9 or above.
image

                case TINYINT:
                    statement.setByte(
                            statementIndex, ((Number) row.getField(fieldIndex)).byteValue());
                    break;
                case SMALLINT:
                    statement.setShort(
                            statementIndex, ((Number) row.getField(fieldIndex)).shortValue());
                    break;
                case INT:
                    statement.setInt(
                            statementIndex, ((Number) row.getField(fieldIndex)).intValue());
                    break;
                case BIGINT:
                    statement.setLong(
                            statementIndex, ((Number) row.getField(fieldIndex)).longValue());
                    break;
                case FLOAT:
                    statement.setFloat(
                            statementIndex, ((Number) row.getField(fieldIndex)).floatValue());
                    break;
                case DOUBLE:
                    statement.setDouble(
                            statementIndex, ((Number) row.getField(fieldIndex)).doubleValue());

javalover123 avatar Aug 01 '23 01:08 javalover123

Update, please check, thanks. @wu-a-ge In another way, Number.*Value() method performance should be better when run with JDK 9 or above. image

                case TINYINT:
                    statement.setByte(
                            statementIndex, ((Number) row.getField(fieldIndex)).byteValue());
                    break;
                case SMALLINT:
                    statement.setShort(
                            statementIndex, ((Number) row.getField(fieldIndex)).shortValue());
                    break;
                case INT:
                    statement.setInt(
                            statementIndex, ((Number) row.getField(fieldIndex)).intValue());
                    break;
                case BIGINT:
                    statement.setLong(
                            statementIndex, ((Number) row.getField(fieldIndex)).longValue());
                    break;
                case FLOAT:
                    statement.setFloat(
                            statementIndex, ((Number) row.getField(fieldIndex)).floatValue());
                    break;
                case DOUBLE:
                    statement.setDouble(
                            statementIndex, ((Number) row.getField(fieldIndex)).doubleValue());

Found a bug, image caseExpression.getElseExpression() may be null,so there will throw a NullPointException image @javalover123

wu-a-ge avatar Aug 01 '23 15:08 wu-a-ge

Update, please check, thanks. @wu-a-ge

javalover123 avatar Aug 01 '23 16:08 javalover123

Update, please check, thanks. @wu-a-ge

Hello,One scenario is not supported

case when num3>2 and num='abc' then 7  end as tinyint1

case when is a complex logical expression

wu-a-ge avatar Aug 07 '23 08:08 wu-a-ge

Update, please check, thanks. @wu-a-ge

javalover123 avatar Aug 08 '23 02:08 javalover123

@rewerma PTAL.

TyrantLucifer avatar Aug 11 '23 10:08 TyrantLucifer

Update, please check, thanks. @rewerma

javalover123 avatar Aug 16 '23 13:08 javalover123

It seems that different data types may have Bug that cannot be covered by test cases. Can you add more e2e case to cover them? Hi, @wu-a-ge , do you have any suggestions ?

EricJoy2048 avatar Sep 11 '23 06:09 EricJoy2048

I don't know what test cases are missing, personal advice,I think more type tests can be added to make sure it passes,For example, string,int,double,

SELECT 
  column_name,
  CASE 
    WHEN column_name IN (value_1, value_2, ...) THEN result_1
    ...
    ELSE test_in_int,
   CASE 
    WHEN column_name IN ('value_1','value_2', ...) THEN result_1
    ...
    ELSE test_in_string
    CASE 
    WHEN (column_name IN ('value_1','value_2', ...)  or column_name!='value3' or  or column_int!=10)  THEN result_1
    ...
    ELSE test_in_or_string,
    CASE 
    WHEN column_name IN (value_1, value_2 ) THEN value_2 
    ELSE test_in_return
  END AS output_column
FROM table_name;

zhilinli123 avatar Sep 12 '23 02:09 zhilinli123

@EricJoy2048 @zhilinli123 I may add this week

javalover123 avatar Sep 14 '23 05:09 javalover123

@EricJoy2048 @zhilinli123 I may add this week

Hi, Are there any updates on this PR?

EricJoy2048 avatar Oct 18 '23 03:10 EricJoy2048

Sorry, I've modified one test, but don't have any more time to work on this recently.

javalover123 avatar Oct 18 '23 03:10 javalover123

I don't see any movement in this pr, can you assign it to me temporarily and I will find time to finish it in the near future @EricJoy2048 cc @javalover123

zhilinli123 avatar Nov 27 '23 02:11 zhilinli123

I don't see any movement in this pr, can you assign it to me temporarily and I will find time to finish it in the near future @EricJoy2048 cc @javalover123

@zhilinli123 Sorry, I can't see assign button, @EricJoy2048 please try.

javalover123 avatar Nov 27 '23 15:11 javalover123

Ok, I will close this pr.

EricJoy2048 avatar Dec 08 '23 02:12 EricJoy2048