sylph icon indicating copy to clipboard operation
sylph copied to clipboard

The custom kafka data source not support json data map list format parser

Open nimuyuhan opened this issue 6 years ago • 0 comments

I use below sql create source table: create source table topic1( uuid int, data var ) with ( type = 'kafka', kafka_topic = 'flow_message_test', "auto.offset.reset" = latest, kafka_broker = '10.16.98.10:9092', kafka_group_id = 'test_json_parser', value_type = 'json' ); Data are as follows: { "uuid": 10455, "data": ["{"applyid":"1590224987","base_info":"{\"city_id\":201}}"] }

That sql grammar check pass. running throw exceptions blow: java.lang.ClassCastException: java.util.ArrayList cannot be cast to java.lang.String at org.apache.flink.api.common.typeutils.base.StringSerializer.copy(StringSerializer.java:33) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:96) at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:47) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:738) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748)

nimuyuhan avatar Mar 21 '19 09:03 nimuyuhan