seatunnel
seatunnel copied to clipboard
[Feature][KafkaSource]Add customize the row separator.
Purpose of this pull request
Add customize the row separator, if a message contains multiple row of data, you can customize the line separator to split. Config:
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "STREAMING"
execution.planner = blink
job.name = "kafka_hive_row_delimiter_test"
execution.checkpoint.interval = 60000
}
source {
Kafka {
result_table_name = "kafka_table"
schema = {
fields {
c1 = "string"
c2 = "string"
c3 = "string"
}
}
format = text
field_delimiter = ","
topic = "test_topic_row_delimiter"
bootstrap.servers = "kafkacluster:9092"
kafka.max.poll.records = 500
row_delimiter = "\\n"
}
}
transform {
sql {
sql = "select c1,c2,c3,CAST(DATE_FORMAT(CAST(NOW() AS VARCHAR),'yyyyMMdd') as VARCHAR) as dt from kafka_table"
}
}
sink {
Console{}
Hive {
table_name = "db_test1.tmp_test03"
metastore_uri = "thrift://hive:9083"
partition_dir_expression = "${v0}"
}
}
Test screenshot
Check list
- [ ] Code changed are covered with tests, or it does not need tests for reason:
- [ ] If any new Jar binary package adding in your PR, please add License Notice according New License Guide
- [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
- [ ] If you are contributing the connector code, please check that the following files are updated:
- Update change log that in connector document. For more details you can refer to connector-v2
- Update plugin-mapping.properties and add new connector information in it
- Update the pom file of seatunnel-dist
- [ ] Update the
release-note
.
please approve ci check.
@Hisoka-X @TyrantLucifer please approve ci check.
@Hisoka-X @TyrantLucifer @hailin0 PTAL.
@Hisoka-X @TyrantLucifer @hailin0 PTAL.
please approve ci , thanks.
please approve ci.
@Hisoka-X @EricJoy2048 @TyrantLucifer please approve ci.
@Hisoka-X @EricJoy2048 @TyrantLucifer please approve ci.
Sorry for late response. Let me check now!