seatunnel icon indicating copy to clipboard operation
seatunnel copied to clipboard

[mongodb-cdc] Decimal type should assign precision and scale information

Open bulolo opened this issue 1 year ago • 8 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

What happened

{ "price": NumberDecimal("822") }

SeaTunnel Version

2.3.4

SeaTunnel Config

env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
}

source {
  MongoDB-CDC {
    hosts = "172.16.0.11:27017"
    database = ["pms-v3"]
    collection = ["pms-v3.demand_item_row"]
    username = mongouser
    password = "XXX"
    schema = {
      fields {
        "_id" : string,
        "ras" : string,
        "price": Decimal128,
        "created" : Timestamp
      }
    }
  }
}

sink {
  Console {
    parallelism = 1
  }
}

Running Command

docker

Error Exception

row-streaming  | 2024-04-16 06:39:22,208 ERROR [o.a.s.c.s.SeaTunnel           ] [main] - 
row-streaming  | ===============================================================================
row-streaming  | 
row-streaming  | 
row-streaming  | 
row-streaming  | Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed
row-streaming  |        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:199)
row-streaming  |        at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
row-streaming  |        at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:34)
row-streaming  | Caused by: org.apache.seatunnel.api.table.factory.FactoryException: ErrorCode:[API-06], ErrorDescription:[Factory initialize failed] - Unable to create a source for identifier 'MongoDB-CDC'.
row-streaming  |        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:100)
row-streaming  |        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parseSource(MultipleTableJobConfigParser.java:320)
row-streaming  |        at org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser.parse(MultipleTableJobConfigParser.java:181)
row-streaming  |        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.getLogicalDag(ClientJobExecutionEnvironment.java:88)
row-streaming  |        at org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment.execute(ClientJobExecutionEnvironment.java:161)
row-streaming  |        at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:146)
row-streaming  |        ... 2 more
row-streaming  | Caused by: java.lang.RuntimeException: Decimal type should assign precision and scale information
row-streaming  |        at org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil.parseDecimalType(SeaTunnelDataTypeConvertorUtil.java:228)
row-streaming  |        at org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil.parseComplexDataType(SeaTunnelDataTypeConvertorUtil.java:123)
row-streaming  |        at org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(SeaTunnelDataTypeConvertorUtil.java:52)
row-streaming  |        at org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser$FieldParser.parse(ReadonlyConfigParser.java:97)
row-streaming  |        at org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser$FieldParser.parse(ReadonlyConfigParser.java:84)
row-streaming  |        at org.apache.seatunnel.api.table.catalog.schema.ReadonlyConfigParser.parse(ReadonlyConfigParser.java:64)
row-streaming  |        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.buildWithConfig(CatalogTableUtil.java:198)
row-streaming  |        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:108)
row-streaming  |        at org.apache.seatunnel.api.table.catalog.CatalogTableUtil.getCatalogTables(CatalogTableUtil.java:96)
row-streaming  |        at org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.MongodbIncrementalSourceFactory.lambda$createSource$0(MongodbIncrementalSourceFactory.java:77)
row-streaming  |        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:112)
row-streaming  |        at org.apache.seatunnel.api.table.factory.FactoryUtil.createAndPrepareSource(FactoryUtil.java:73)
row-streaming  |        ... 7 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

bulolo avatar Apr 16 '24 06:04 bulolo

I'm willing to fix it, please assign it to me. thanks. @bulolo @Carl-Zhou-CN I think it's reasonable for me to finish it in 10 days.

geek-dragon avatar Apr 16 '24 14:04 geek-dragon

@bulolo how about set price type to decimal(30, 8) refer this https://github.com/apache/seatunnel/blob/79bb70101a5056de7cd44197ce6a613bf5ad3a01/docs/en/connector-v2/source/FakeSource.md?plain=1#L165

liunaijie avatar Apr 17 '24 08:04 liunaijie

@bulolo how about set price type to decimal(30, 8) refer this

https://github.com/apache/seatunnel/blob/79bb70101a5056de7cd44197ce6a613bf5ad3a01/docs/en/connector-v2/source/FakeSource.md?plain=1#L165

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory"]
    collection = ["inventory.products"]
    username = stuser
    password = stpw
    schema = {
      fields {
        "_id" : string,
        "name" : string,
        "description" : string,
        "weight" : decimal(30, 8),  ------not support
      }
    }
  }
}

mongocdc did not support it

bulolo avatar Apr 17 '24 09:04 bulolo

I'm willing to fix it, please assign it to me. thanks. @bulolo @Carl-Zhou-CN I think it's reasonable for me to finish it in 10 days.

there is another question,

mongodb save 10 as numberInt ,and save 10.1 as double,when mongocdc ,type will not work with numberInt and double

for example,save "price" { "price":10----> NumberInt type }

{ "price":10.1----> double type } when cdc,a type error happlen ,because we had to set a type in schema.fields,if we set double, when meet int data ,err happlen,when set int,double data error happlen

bulolo avatar Apr 17 '24 09:04 bulolo

@bulolo how about set price type to decimal(30, 8) refer this

https://github.com/apache/seatunnel/blob/79bb70101a5056de7cd44197ce6a613bf5ad3a01/docs/en/connector-v2/source/FakeSource.md?plain=1#L165

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory"]
    collection = ["inventory.products"]
    username = stuser
    password = stpw
    schema = {
      fields {
        "_id" : string,
        "name" : string,
        "description" : string,
        "weight" : decimal(30, 8),  ------not support
      }
    }
  }
}

mongocdc did not support it

"decimal(30, 8)", the decimal type must be enclosed in ". how-to-declare-type

CheneyYin avatar Apr 25 '24 17:04 CheneyYin

@bulolo how about set price type to decimal(30, 8) refer this https://github.com/apache/seatunnel/blob/79bb70101a5056de7cd44197ce6a613bf5ad3a01/docs/en/connector-v2/source/FakeSource.md?plain=1#L165

source {
  MongoDB-CDC {
    hosts = "mongo0:27017"
    database = ["inventory"]
    collection = ["inventory.products"]
    username = stuser
    password = stpw
    schema = {
      fields {
        "_id" : string,
        "name" : string,
        "description" : string,
        "weight" : decimal(30, 8),  ------not support
      }
    }
  }
}

mongocdc did not support it

"decimal(30, 8)", the decimal type must be enclosed in ". how-to-declare-type

@bulolo Add a use case https://github.com/apache/seatunnel/blob/dev/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/resources/mongodb_source_to_assert.conf

Carl-Zhou-CN avatar Apr 26 '24 01:04 Carl-Zhou-CN

@Carl-Zhou-CN thanks,bad example for mongo cdc docs,wrong type use case https://seatunnel.apache.org/docs/2.3.4/connector-v2/source/MongoDB-CDC

is there same type define for "mongo" and "mongo cdc"?

your use case is mongo not mongocdc

mongo cdc docs write the different type define way?

schema = {
      fields {
        "_id" : string,
        "name" : string,
        "description" : string
      }
    }

i will try

 "weight" : "decimal(30, 8)"

bulolo avatar Apr 26 '24 02:04 bulolo

@Carl-Zhou-CN thanks,bad example for mongo cdc docs,wrong type use case https://seatunnel.apache.org/docs/2.3.4/connector-v2/source/MongoDB-CDC

is there same type define for "mongo" and "mongo cdc"?

your use case is mongo not mongocdc

mongo cdc docs write the different type define way?

schema = {
      fields {
        "_id" : string,
        "name" : string,
        "description" : string
      }
    }

i will try

 "weight" : "decimal(30, 8)"

@bulolo Both should be similar for schemas

Carl-Zhou-CN avatar Apr 26 '24 08:04 Carl-Zhou-CN