Unable to ingest Protobuf Messages into Pinot table with Realtime connector
I am trying to ingest Kafka messages serialized with Protobuf format into Pinot tables using realtime connector but i am getting errors while querying table. Here are the steps followed.
Proto file ->
syntax = "proto3";
message snack {
string name = 1;
string timestamp = 2;
}
Descriptor Generation ->
protoc --include_imports --descriptor_set_out=output.desc schema.proto
After generating this proto descriptor file, I have copied the file to pinot-controller, pinot-server and pinot-broker container under tmp.
Pinot Connector configuration ->
{
"tableName": "transcriptprotobufdescnewprotofile",
"tableType": "REALTIME",
"tenants": {},
"segmentsConfig": {
"replicasPerPartition": "1",
"schemaName": "transcriptprotobufdescnewprotofile",
"timeColumnName": "timestamp",
"timeType": "MILLISECONDS"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "transcriptprotobufdescnewprotofile",
"streamType": "kafka",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder",
"stream.kafka.decoder.prop.descriptorFile": "file:///tmp/output.desc",
"stream.kafka.decoder.prop.protoClassName": "Snack",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka:9092",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "50M",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
}
},
"metadata": {
"customConfigs": {}
}
}
Pinot Schema ->
{
"schemaName": "transcriptprotobufdescnewprotofile",
"dimensionFieldSpecs": [
{
"name": "name",
"dataType": "STRING"
}
],
"dateTimeFieldSpecs": [{
"name": "timestamp",
"dataType": "STRING",
"format" : "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}]
}
Messages are being produced to Kafka topic with following command
kafka-protobuf-console-producer --bootstrap-server kafka:9092 --topic transcriptprotobufdescnewprotofile --property schema.registry.url=http://localhost:8088 --property value.schem'syntax = "proto3"; message snack { string name = 1; string timestamp = 2;}'
Actual Messages ->
{"name":"test","timestamp":"1234"}
{"name":"test","timestamp":"123334"}
Now In the query control dashboard, I am seeing the following error for this newly created table
Query failed with exceptions. Please toggle the switch to view details.
Error Code: 305
null:
1 segments unavailable: [transcriptprotobufdescnewprotofile__0__0__20240613T0645Z]
Upon checking the pinot-controller container logs, i see the following errors.
2024-06-13 12:15:19 2024/06/13 06:45:19.408 ERROR [MessageGenerationPhase] [HelixController-pipeline-default-PinotCluster-(5b9175c5_DEFAULT)] Event 5b9175c5_DEFAULT : Unable to find a next state for resource: transcriptprotobufdescnewprotofile_REALTIME partition: transcriptprotobufdescnewprotofile__0__0__20240613T0645Z from stateModelDefinitionclass org.apache.helix.model.StateModelDefinition from:ERROR to:CONSUMING
The Pinot Swagger API shows the error/unhealthy status of this new table.
errorMessage": "Caught exception while adding CONSUMING segment",
"stackTrace": "org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 5 attempts\n\tat org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:65)\n\tat org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.<init>(RealtimeSegmentDataManager.java:1546)\n\tat org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.doAddConsumingSegment(RealtimeTableDataManager.java:494)\n\tat org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addConsumingSegment(RealtimeTableDataManager.java:439)\n\tat org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addConsumingSegment(HelixInstanceDataManager.java:282)\n\tat org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:81)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:350)\n\tat org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:278)\n\tat org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97)\n\tat org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n"
}
}
}
}
],
"serverDebugInfos": [],
"brokerDebugInfos": [],
"tableSize": {
"reportedSize": "-1 bytes",
"estimatedSize": "-1 bytes"
},
"ingestionStatus": {
"ingestionState": "UNHEALTHY",
"errorMessage": "Did not get any response from servers for segment: transcriptprotobufdescnewprotofile__0__0__20240613T0645Z"
}
What am i missing?
cc @swaminathanmanish @KKcorps
@rahulgulati89 Did you able to solve this?
@rahulgulati89
For me, using Proto and a descriptorFile, worked with:
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder",
instead of:
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.KafkaConfluentSchemaRegistryProtoBufMessageDecoder",