[mongodb-cdc] Decimal type should assign precision and scale information
Search before asking
- [X] I had searched in the issues and found no similar issues.
What happened
{ "price": NumberDecimal("822") }
SeaTunnel Version
2.3.4
SeaTunnel Config
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
MongoDB-CDC {
hosts = "172.16.0.11:27017"
database = ["pms-v3"]
collection = ["pms-v3.demand_item_row"]
username = mongouser
password = "XXX"
schema = {
fields {
"_id" : string,
"ras" : string,
"price": Decimal128,
"created" : Timestamp
}
}
}
}
sink {
Console {
parallelism = 1
}
}
Running Command
docker
Error Exception
row-streaming | 2024-04-16 06:39:22,208 ERROR [o.a.s.c.s.SeaTunnel ] [main] -
row-streaming | ===============================================================================
row-streaming |
row-streaming |
row-streaming |
row-streaming | Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
row-streaming | at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
row-streaming | at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
row-streaming | at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
row-streaming | Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'MongoDB-CDC'.
row-streaming | at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:100)
row-streaming | at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:320)
row-streaming | at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:181)
row-streaming | at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:88)
row-streaming | at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:161)
row-streaming | at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:146)
row-streaming | ... 2 more
row-streaming | Caused by: java.lang.RuntimeException: Decimal type should assign precision and scale information
row-streaming | at org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil.parseDecimalType(SeaTunnelDataTypeConvertorUtil.java:228)
row-streaming | at org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil.parseComplexDataType(SeaTunnelDataTypeConvertorUtil.java:123)
row-streaming | at org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(SeaTunnelDataTypeConvertorUtil.java:52)
row-streaming | at org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser$FieldParser.parse(ReadonlyConfigParser.java:97)
row-streaming | at org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser$FieldParser.parse(ReadonlyConfigParser.java:84)
row-streaming | at org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser.parse(ReadonlyConfigParser.java:64)
row-streaming | at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.buildWithConfig(CatalogTableUtil.java:198)
row-streaming | at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:108)
row-streaming | at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:96)
row-streaming | at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.MongodbIncrementalSourceFactory.lambda$createSource$0(MongodbIncrementalSourceFactory.java:77)
row-streaming | at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:112)
row-streaming | at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:73)
row-streaming | ... 7 more
Zeta or Flink or Spark Version
No response
Java or Scala Version
No response
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
I'm willing to fix it, please assign it to me. thanks. @bulolo @Carl-Zhou-CN I think it's reasonable for me to finish it in 10 days.
@bulolo how about set price type to decimal(30, 8) refer this https://github.com/apache/seatunnel/blob/79bb70101a5056de7cd44197ce6a613bf5ad3a01/docs/en/connector-v2/source/FakeSource.md?plain=1#L165
@bulolo how about set price type to
decimal(30, 8)refer thishttps://github.com/apache/seatunnel/blob/79bb70101a5056de7cd44197ce6a613bf5ad3a01/docs/en/connector-v2/source/FakeSource.md?plain=1#L165
source {
MongoDB-CDC {
hosts = "mongo0:27017"
database = ["inventory"]
collection = ["inventory.products"]
username = stuser
password = stpw
schema = {
fields {
"_id" : string,
"name" : string,
"description" : string,
"weight" : decimal(30, 8), ------not support
}
}
}
}
mongocdc did not support it
I'm willing to fix it, please assign it to me. thanks. @bulolo @Carl-Zhou-CN I think it's reasonable for me to finish it in 10 days.
there is another question,
mongodb save 10 as numberInt ,and save 10.1 as double,when mongocdc ,type will not work with numberInt and double
for example,save "price" { "price":10----> NumberInt type }
{ "price":10.1----> double type } when cdc,a type error happlen ,because we had to set a type in schema.fields,if we set double, when meet int data ,err happlen,when set int,double data error happlen
@bulolo how about set price type to
decimal(30, 8)refer thishttps://github.com/apache/seatunnel/blob/79bb70101a5056de7cd44197ce6a613bf5ad3a01/docs/en/connector-v2/source/FakeSource.md?plain=1#L165
source { MongoDB-CDC { hosts = "mongo0:27017" database = ["inventory"] collection = ["inventory.products"] username = stuser password = stpw schema = { fields { "_id" : string, "name" : string, "description" : string, "weight" : decimal(30, 8), ------not support } } } }mongocdc did not support it
"decimal(30, 8)", the decimal type must be enclosed in ".
how-to-declare-type
@bulolo how about set price type to
decimal(30, 8)refer this https://github.com/apache/seatunnel/blob/79bb70101a5056de7cd44197ce6a613bf5ad3a01/docs/en/connector-v2/source/FakeSource.md?plain=1#L165source { MongoDB-CDC { hosts = "mongo0:27017" database = ["inventory"] collection = ["inventory.products"] username = stuser password = stpw schema = { fields { "_id" : string, "name" : string, "description" : string, "weight" : decimal(30, 8), ------not support } } } }mongocdc did not support it
"decimal(30, 8)", the decimal type must be enclosed in". how-to-declare-type
@bulolo Add a use case https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf
@Carl-Zhou-CN thanks,bad example for mongo cdc docs,wrong type use case https://seatunnel.apache.org/docs/2.3.4/connector-v2/source/MongoDB-CDC
is there same type define for "mongo" and "mongo cdc"?
your use case is mongo not mongocdc
mongo cdc docs write the different type define way?
schema = {
fields {
"_id" : string,
"name" : string,
"description" : string
}
}
i will try
"weight" : "decimal(30, 8)"
@Carl-Zhou-CN thanks,bad example for mongo cdc docs,wrong type use case https://seatunnel.apache.org/docs/2.3.4/connector-v2/source/MongoDB-CDC
is there same type define for "mongo" and "mongo cdc"?
your use case is mongo not mongocdc
mongo cdc docs write the different type define way?
schema = { fields { "_id" : string, "name" : string, "description" : string } }i will try
"weight" : "decimal(30, 8)"
@bulolo Both should be similar for schemas