neo4j-streams
neo4j-streams copied to clipboard
Unable to convert java.math.BigDecimal to Neo4j Value
Guidelines
Please help me to resolve the issue if you can.
I config-ed Neo4j Sink Connector to write the data from a Kafka topic to Neo4j. However I'm facing an issue that makes Sink Connector not working recently
Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'target'
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:259)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:78)
at streams.kafka.connect.sink.Neo4jService.write(Neo4jService.kt:94)
at streams.service.StreamsSinkService.writeWithCypherTemplate(StreamsSinkService.kt:36)
at streams.service.StreamsSinkService.writeForTopic(StreamsSinkService.kt:42)
at streams.kafka.connect.sink.Neo4jService$writeData$2$invokeSuspend$$inlined$flatMap$lambda$1.invokeSuspend(Neo4jService.kt:121)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)
at kotlinx.coroutines.DispatchedTask$DefaultImpls.run(Dispatched.kt:235)
at kotlinx.coroutines.DispatchedContinuation.run(Dispatched.kt:81)
at kotlinx.coroutines.scheduling.Task.run(Tasks.kt:94)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:732)
Caused by: org.neo4j.driver.v1.exceptions.ClientException: Unable to convert java.math.BigDecimal to Neo4j Value.
at org.neo4j.driver.v1.Values.value(Values.java:130)
at streams.kafka.connect.sink.ValueConverter.setValue(ValueConverter.kt:23)
at streams.kafka.connect.sink.ValueConverter.setDecimalField(ValueConverter.kt:112)
at streams.kafka.connect.sink.ValueConverter.setDecimalField(ValueConverter.kt:17)
at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:223)
... 12 more
I'm wondering how I can figure out the root cause and troubleshot the issue in this case since the log is only thing I have.
The avro schema generated automatically by Kafka Connect JDBC
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{
"name": "goal_id",
"type": "string"
},
{
"name": "publisher",
"type": "string"
},
{
"name": "lineitem_id",
"type": "string"
},
{
"name": "position_id",
"type": {
"type": "int",
"connect.type": "int16"
}
},
{
"name": "indicator_name",
"type": "string"
},
{
"name": "target",
"type": {
"type": "bytes",
"scale": 6,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "6"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
},
{
"name": "result_val",
"type": [
"null",
{
"type": "bytes",
"scale": 6,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "6"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
},
{
"name": "tracking_value",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
},
{
"name": "tracking_pct",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
},
{
"name": "daily_target",
"type": [
"null",
{
"type": "bytes",
"scale": 2,
"precision": 64,
"connect.version": 1,
"connect.parameters": {
"scale": "2"
},
"connect.name": "org.apache.kafka.connect.data.Decimal",
"logicalType": "decimal"
}
],
"default": null
},
{
"name": "is_disabled",
"type": "boolean"
},
{
"name": "metric_type",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "pg_field",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "refresh_sql_statement",
"type": "string"
},
{
"name": "last_updated",
"type": {
"type": "long",
"connect.version": 1,
"connect.name": "org.apache.kafka.connect.data.Timestamp",
"logicalType": "timestamp-millis"
}
},
{
"name": "hash",
"type": "string"
}
]
}
Expected Behavior (Mandatory)
Actual Behavior (Mandatory)
How to Reproduce the Problem
Simple Dataset (where it's possibile)
//Insert here a set of Cypher statements that helps us to reproduce the problem
Steps (Mandatory)
Screenshots (where it's possibile)
Specifications (Mandatory)
Currently used versions
Versions
- OS: AWS EC2
- Neo4j: 3.5.14
- Neo4j-Streams: 3.5.5
I have the same problem. Any solution?
@Tin-Nguyen @103cuong Could you please try with the latest release for Neo4j 3.5.x? You can find it here.
However, If your use case is Sink-only, Kafka Connect Neo4j Sink plugin is strongly recommended in general, over the Neo4j Streams plugin.