seatunnel
seatunnel copied to clipboard
[Feature][Transform-V2][SQL] Support case when clause for SQL Transform plugin (#5013)
Purpose of this pull request
Check list
- [x] Code changed are covered with tests, or it does not need tests for reason:
- [x] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
- [x] If necessary, please update the documentation to describe the new feature. https://github.com/apache/seatunnel/tree/dev/docs
- [x] If you are contributing the connector code, please check that the following files are updated:
- Update change log that in connector document. For more details you can refer to connector-v2
- Update plugin-mapping.properties and add new connector information in it
- Update the pom file of seatunnel-dist
- [x] Update the
release-note
.
Please add e2e for this update.
OK, added. apache/seatunnel-website#253
I am sorry, I didn't express myself clearly. You need update the document https://github.com/apache/seatunnel/blob/dev/docs/en/transform-v2/sql-functions.md
Hi, @rewerma , Can you help to review this pr?
there's a bug,If it is bigint, the case when cannot be matched
@javalover123 there's a bug
besides int,tinyint smallint bigint can't nornal match
@wu-a-ge Updated, but I haven't run e2e test already. https://github.com/apache/seatunnel/commit/5f5f623b87fbcde0f3aac983d9c7f04266eb7c1a
@wu-a-ge Updated, but I haven't run e2e test already. 5f5f623
another bug,my job config :
env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
parallelism = 1
schema = {
fields {
name = "string"
age = "bigint"
}
}
rows = [
{fields = ["javalover123",6], kind = INSERT}
]
}
}
transform {
sql {
source_table_name="fake"
result_table_name="result_0"
query="select name,case age when 6 then 1 else age end as age from fake"
}
}
sink {
jdbc {
url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = "123456"
source_table_name="result_0"
database="seatunnel"
table="test"
generate_sink_sql="true"
}
}
exception:
2023-07-30 23:38:38,172 WARN org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel-709778] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@64ec1e6
java.lang.RuntimeException: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:209) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:71) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:51) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:52) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[classes/:?]
at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:613) [classes/:?]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_281]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_281]
at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_281]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_281]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_281]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_281]
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter.toExternal(AbstractJdbcRowConverter.java:145) ~[classes/:?]
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:45) ~[classes/:?]
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:31) ~[classes/:?]
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:51) ~[classes/:?]
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:204) ~[classes/:?]
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:169) ~[classes/:?]
at org.apache.seatunnel.connectors.seatunnel.jdbc.sink.JdbcSinkWriter.prepareCommit(JdbcSinkWriter.java:88) ~[classes/:?]
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:159) ~[classes/:?]
... 16 more
@javalover123
Thanks, can you give some help/advice? @wu-a-ge
Thanks, can you give some help/advice? @wu-a-ge
Thanks for your contribution. My current workaround is to directly adjust the toExternal method in the AbstractJdbcRowConverter class. All casts are parsed using the parse method that wraps the type, but I personally believe that it is still necessary to determine the data type to solve the problem of different types of conversions
Update, please check, thanks. @wu-a-ge
In another way, Number.*Value() method performance should be better when run with JDK 9 or above.
case TINYINT:
statement.setByte(
statementIndex, ((Number) row.getField(fieldIndex)).byteValue());
break;
case SMALLINT:
statement.setShort(
statementIndex, ((Number) row.getField(fieldIndex)).shortValue());
break;
case INT:
statement.setInt(
statementIndex, ((Number) row.getField(fieldIndex)).intValue());
break;
case BIGINT:
statement.setLong(
statementIndex, ((Number) row.getField(fieldIndex)).longValue());
break;
case FLOAT:
statement.setFloat(
statementIndex, ((Number) row.getField(fieldIndex)).floatValue());
break;
case DOUBLE:
statement.setDouble(
statementIndex, ((Number) row.getField(fieldIndex)).doubleValue());
Update, please check, thanks. @wu-a-ge In another way, Number.*Value() method performance should be better when run with JDK 9 or above.
case TINYINT: statement.setByte( statementIndex, ((Number) row.getField(fieldIndex)).byteValue()); break; case SMALLINT: statement.setShort( statementIndex, ((Number) row.getField(fieldIndex)).shortValue()); break; case INT: statement.setInt( statementIndex, ((Number) row.getField(fieldIndex)).intValue()); break; case BIGINT: statement.setLong( statementIndex, ((Number) row.getField(fieldIndex)).longValue()); break; case FLOAT: statement.setFloat( statementIndex, ((Number) row.getField(fieldIndex)).floatValue()); break; case DOUBLE: statement.setDouble( statementIndex, ((Number) row.getField(fieldIndex)).doubleValue());
Found a bug,
caseExpression.getElseExpression() may be null,so there will throw a NullPointException
@javalover123
Update, please check, thanks. @wu-a-ge
Update, please check, thanks. @wu-a-ge
Hello,One scenario is not supported
case when num3>2 and num='abc' then 7 end as tinyint1
case when is a complex logical expression
Update, please check, thanks. @wu-a-ge
@rewerma PTAL.
Update, please check, thanks. @rewerma
It seems that different data types may have Bug that cannot be covered by test cases. Can you add more e2e case to cover them? Hi, @wu-a-ge , do you have any suggestions ?
I don't know what test cases are missing, personal advice,I think more type tests can be added to make sure it passes,For example, string,int,double,
SELECT
column_name,
CASE
WHEN column_name IN (value_1, value_2, ...) THEN result_1
...
ELSE test_in_int,
CASE
WHEN column_name IN ('value_1','value_2', ...) THEN result_1
...
ELSE test_in_string
CASE
WHEN (column_name IN ('value_1','value_2', ...) or column_name!='value3' or or column_int!=10) THEN result_1
...
ELSE test_in_or_string,
CASE
WHEN column_name IN (value_1, value_2 ) THEN value_2
ELSE test_in_return
END AS output_column
FROM table_name;
@EricJoy2048 @zhilinli123 I may add this week
@EricJoy2048 @zhilinli123 I may add this week
Hi, Are there any updates on this PR?
Sorry, I've modified one test, but don't have any more time to work on this recently.
I don't see any movement in this pr, can you assign it to me temporarily and I will find time to finish it in the near future @EricJoy2048 cc @javalover123
I don't see any movement in this pr, can you assign it to me temporarily and I will find time to finish it in the near future @EricJoy2048 cc @javalover123
@zhilinli123 Sorry, I can't see assign button, @EricJoy2048 please try.
Ok, I will close this pr.