neo4j-streams icon indicating copy to clipboard operation
neo4j-streams copied to clipboard

Unable to convert java.math.BigDecimal to Neo4j Value

Open Tin-Nguyen opened this issue 4 years ago • 3 comments

Guidelines

Please help me to resolve the issue if you can.

I config-ed Neo4j Sink Connector to write the data from a Kafka topic to Neo4j. However I'm facing an issue that makes Sink Connector not working recently

Caused by: org.apache.kafka.connect.errors.DataException: Exception thrown while processing field 'target'
        at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:259)
        at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convert(AbstractConverter.java:78)
        at streams.kafka.connect.sink.Neo4jService.write(Neo4jService.kt:94)
        at streams.service.StreamsSinkService.writeWithCypherTemplate(StreamsSinkService.kt:36)
        at streams.service.StreamsSinkService.writeForTopic(StreamsSinkService.kt:42)
        at streams.kafka.connect.sink.Neo4jService$writeData$2$invokeSuspend$$inlined$flatMap$lambda$1.invokeSuspend(Neo4jService.kt:121)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:32)
        at kotlinx.coroutines.DispatchedTask$DefaultImpls.run(Dispatched.kt:235)
        at kotlinx.coroutines.DispatchedContinuation.run(Dispatched.kt:81)
        at kotlinx.coroutines.scheduling.Task.run(Tasks.kt:94)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:586)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.access$runSafely(CoroutineScheduler.kt:60)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:732)
Caused by: org.neo4j.driver.v1.exceptions.ClientException: Unable to convert java.math.BigDecimal to Neo4j Value.
        at org.neo4j.driver.v1.Values.value(Values.java:130)
        at streams.kafka.connect.sink.ValueConverter.setValue(ValueConverter.kt:23)
        at streams.kafka.connect.sink.ValueConverter.setDecimalField(ValueConverter.kt:112)
        at streams.kafka.connect.sink.ValueConverter.setDecimalField(ValueConverter.kt:17)
        at com.github.jcustenborder.kafka.connect.utils.data.AbstractConverter.convertStruct(AbstractConverter.java:223)
        ... 12 more

I'm wondering how I can figure out the root cause and troubleshot the issue in this case since the log is only thing I have.

The avro schema generated automatically by Kafka Connect JDBC

{
  "type": "record",
  "name": "ConnectDefault",
  "namespace": "io.confluent.connect.avro",
  "fields": [
    {
      "name": "goal_id",
      "type": "string"
    },
    {
      "name": "publisher",
      "type": "string"
    },
    {
      "name": "lineitem_id",
      "type": "string"
    },
    {
      "name": "position_id",
      "type": {
        "type": "int",
        "connect.type": "int16"
      }
    },
    {
      "name": "indicator_name",
      "type": "string"
    },
    {
      "name": "target",
      "type": {
        "type": "bytes",
        "scale": 6,
        "precision": 64,
        "connect.version": 1,
        "connect.parameters": {
          "scale": "6"
        },
        "connect.name": "org.apache.kafka.connect.data.Decimal",
        "logicalType": "decimal"
      }
    },
    {
      "name": "result_val",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 6,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "6"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    },
    {
      "name": "tracking_value",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    },
    {
      "name": "tracking_pct",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    },
    {
      "name": "daily_target",
      "type": [
        "null",
        {
          "type": "bytes",
          "scale": 2,
          "precision": 64,
          "connect.version": 1,
          "connect.parameters": {
            "scale": "2"
          },
          "connect.name": "org.apache.kafka.connect.data.Decimal",
          "logicalType": "decimal"
        }
      ],
      "default": null
    },
    {
      "name": "is_disabled",
      "type": "boolean"
    },
    {
      "name": "metric_type",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "pg_field",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "refresh_sql_statement",
      "type": "string"
    },
    {
      "name": "last_updated",
      "type": {
        "type": "long",
        "connect.version": 1,
        "connect.name": "org.apache.kafka.connect.data.Timestamp",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "hash",
      "type": "string"
    }
  ]
}

Expected Behavior (Mandatory)

Actual Behavior (Mandatory)

How to Reproduce the Problem

Simple Dataset (where it's possibile)

//Insert here a set of Cypher statements that helps us to reproduce the problem

Steps (Mandatory)

Screenshots (where it's possibile)

Specifications (Mandatory)

Currently used versions

Versions

  • OS: AWS EC2
  • Neo4j: 3.5.14
  • Neo4j-Streams: 3.5.5

Tin-Nguyen avatar Apr 15 '20 19:04 Tin-Nguyen

I have the same problem. Any solution?

cuongndc avatar Oct 07 '20 14:10 cuongndc

@Tin-Nguyen @103cuong Could you please try with the latest release for Neo4j 3.5.x? You can find it here.

mroiter-larus avatar Oct 14 '20 14:10 mroiter-larus

However, If your use case is Sink-only, Kafka Connect Neo4j Sink plugin is strongly recommended in general, over the Neo4j Streams plugin.

mroiter-larus avatar Oct 14 '20 14:10 mroiter-larus