Pinot stop consuming from Kinesis Data Stream
Pinot realtime ingestion from kinesis data stream works as expected for sometime but eventually stops consuming. While checking server logs, connection refused error is shown. Is this due to the issue mentioned in this PR #9863.
2022/11/29 16:10:58.538 INFO [ControllerLeaderLocator] [testtable__0__0__20221129T1419Z] Millis since last controller cache value invalidate 18567 is less than allowed frequency 30000. Skipping invalidate.
2022/11/29 16:10:58.538 INFO [LLRealtimeSegmentDataManager_testtable__0__0__20221129T1419Z] [testtable__0__0__20221129T1419Z] Could not commit segment. Retrying after hold
2022/11/29 16:11:00.997 INFO [HttpClient] [testtable__2__0__20221129T1419Z] Sending request: http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentConsumed?reason=forceCommitMessageReceived&streamPartitionMsgOffset=%7B%22shardId-000000000002%22%3A%2249635630746998378075227930204125252036295126557430644770%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__2__0__20221129T1419Z&rowCount=5136&memoryUsedBytes=1828440 to controller: SERVER_IP_ADDRESS.ec2.internal, version: Unknown
2022/11/29 16:11:00.997 INFO [ServerSegmentCompletionProtocolHandler] [testtable__2__0__20221129T1419Z] Controller response {"offset":-1,"isSplitCommitType":false,"streamPartitionMsgOffset":"{\"shardId-000000000002\":\"49635630746998378075227930204125252036295126557430644770\"}","buildTimeSec":-1,"status":"HOLD"} for http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentConsumed?reason=forceCommitMessageReceived&streamPartitionMsgOffset=%7B%22shardId-000000000002%22%3A%2249635630746998378075227930204125252036295126557430644770%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__2__0__20221129T1419Z&rowCount=5136&memoryUsedBytes=1828440
2022/11/29 16:11:01.474 INFO [HttpClient] [testtable__1__0__20221129T1419Z] Sending request: http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentConsumed?reason=forceCommitMessageReceived&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249635630746976077330029399572174273870489207002061864978%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__1__0__20221129T1419Z&memoryUsedBytes=1115986 to controller: SERVER_IP_ADDRESS.ec2.internal, version: Unknown
2022/11/29 16:11:01.474 INFO [ServerSegmentCompletionProtocolHandler] [testtable__1__0__20221129T1419Z] Controller response {"offset":-1,"isSplitCommitType":true,"controllerVipUrl":"http://localhost:9000","streamPartitionMsgOffset":"{\"shardId-000000000001\":\"49635630746976077330029399572174273870489207002061864978\"}","buildTimeSec":126,"status":"COMMIT"} for http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentConsumed?reason=forceCommitMessageReceived&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249635630746976077330029399572174273870489207002061864978%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__1__0__20221129T1419Z&memoryUsedBytes=1115986
2022/11/29 16:11:01.476 INFO [HttpClient] [testtable__1__0__20221129T1419Z] Sending request: http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentCommitStart?segmentSizeBytes=5072&buildTimeMillis=5&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249635630746976077330029399572174273870489207002061864978%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__1__0__20221129T1419Z&memoryUsedBytes=1115986 to controller: SERVER_IP_ADDRESS.ec2.internal, version: Unknown
2022/11/29 16:11:01.476 INFO [ServerSegmentCompletionProtocolHandler] [testtable__1__0__20221129T1419Z] Controller response {"offset":-1,"isSplitCommitType":false,"streamPartitionMsgOffset":null,"buildTimeSec":-1,"status":"COMMIT_CONTINUE"} for http://SERVER_IP_ADDRESS.ec2.internal:9000/segmentCommitStart?segmentSizeBytes=5072&buildTimeMillis=5&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249635630746976077330029399572174273870489207002061864978%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__1__0__20221129T1419Z&memoryUsedBytes=1115986
2022/11/29 16:11:01.476 ERROR [LLRealtimeSegmentDataManager_testtable__1__0__20221129T1419Z] [testtable__1__0__20221129T1419Z] Could not send request http://localhost:9000/segmentUpload?segmentSizeBytes=5072&buildTimeMillis=5&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249635630746976077330029399572174273870489207002061864978%22%7D&instance=Server_SERVER_IP_ADDRESS.ec2.internal_8098&offset=-1&name=testtable__1__0__20221129T1419Z&memoryUsedBytes=1115986
org.apache.pinot.shaded.org.apache.http.conn.HttpHostConnectException: Connect to localhost:9000 [localhost/127.0.0.1] failed: Connection refused (Connection refused)
at org.apache.pinot.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:156) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.shaded.org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:376) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.shaded.org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:393) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.shaded.org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:236) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.shaded.org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:186) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.shaded.org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.shaded.org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:110) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.shaded.org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.shaded.org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.common.utils.http.HttpClient.sendRequest(HttpClient.java:276) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.common.utils.FileUploadDownloadClient.uploadSegment(FileUploadDownloadClient.java:604) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.core.data.manager.realtime.Server2ControllerSegmentUploader.uploadSegmentToController(Server2ControllerSegmentUploader.java:76) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.core.data.manager.realtime.Server2ControllerSegmentUploader.uploadSegment(Server2ControllerSegmentUploader.java:61) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.core.data.manager.realtime.SplitSegmentCommitter.uploadSegment(SplitSegmentCommitter.java:78) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.core.data.manager.realtime.SplitSegmentCommitter.commit(SplitSegmentCommitter.java:59) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.commit(LLRealtimeSegmentDataManager.java:1025) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.commitSegment(LLRealtimeSegmentDataManager.java:995) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:711) [pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method) ~[?:?]
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412) ~[?:?]
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255) ~[?:?]
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237) ~[?:?]
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) ~[?:?]
at java.net.Socket.connect(Socket.java:609) ~[?:?]
at org.apache.pinot.shaded.org.apache.http.conn.socket.PlainConnectionSocketFactory.connectSocket(PlainConnectionSocketFactory.java:75) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
at org.apache.pinot.shaded.org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-58c1dc9b7d8e28ac7207fa7565b1ed76f7a0c2ad]
... 19 more
Another problem that I have noticed is the segments are not getting committed to deep store (s3) even after realtime.segment.flush.threshold.time set to 1h
Looks like it is not able to upload segments. Are the server and controller running on the same host? Segment upload link says "localhost:9000". Hence checking
no the server and controller are in different EC2 instances. @navina Attaching the server and controller config thats used Server:
# Pinot Role
pinot.service.role=SERVER
# Pinot Cluster name
pinot.cluster.name=cluster name
# Pinot Zookeeper Server
pinot.zk.server=zk1:2181,zk2:2182,zk3:2183
# Use hostname as Pinot Instance ID other than IP
pinot.set.instance.id.to.hostname=true
# Pinot Server Netty Port for queris
pinot.server.netty.port=8098
# Pinot Server Admin API port
pinot.server.adminapi.port=8097
# Pinot Server Data Directory
pinot.server.instance.dataDir=/tmp/pinot/data/server/index
# Pinot Server Temporary Segment Tar Directory
pinot.server.instance.segmentTarDir=/tmp/pinot/data/server/segmentTar
pinot.server.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
pinot.server.storage.factory.s3.region=us-east-1
pinot.server.segment.fetcher.protocols=file,http,s3
pinot.server.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
Controller
# Pinot Role
pinot.service.role=CONTROLLER
# Pinot Cluster name
pinot.cluster.name=cluster name
# Pinot Zookeeper Server
pinot.zk.server=zk1:2181,zk2:2182,zk3:2183
# Use hostname as Pinot Instance ID other than IP
pinot.set.instance.id.to.hostname=true
# Pinot Controller Port
controller.port=9000
# Pinot Controller VIP Host
controller.vip.host=localhost
# Pinot Controller VIP Port
controller.vip.port=9000
# Location to store Pinot Segments pushed from clients
controller.data.dir=s3://mybucket/controllerData/
controller.local.temp.dir=/tmp/pinot-tmp-data/
pinot.controller.storage.factory.class.s3=org.apache.pinot.plugin.filesystem.S3PinotFS
pinot.controller.storage.factory.s3.region=us-east-1
pinot.controller.segment.fetcher.protocols=file,http,s3
pinot.controller.segment.fetcher.s3.class=org.apache.pinot.common.utils.fetcher.PinotFSSegmentFetcher
Is there any news on this issue, I am facing the same error. It seems the server is trying to commit the segment to localhost instead of using the controller running in a different ec2 instance
Were you able to fix this?
@FranMorilloAWS
I guess controller.vip.host=localhost is the cause of this. Can you check if you have similar setting in your config?
@Jackie-Jiang Yes i modified the controller.vip.host to point to the loadbalancer of the controllers and it worked. However I am facing now an issue with that with the current Table Configuration, once the segments go from consuming to Good, the servers are not creating new segments to continue consuming from the kinesis data stream.
As well as ignoring the number of rows, or size for the segments.
Ill add my table configuration:
{ "REALTIME": { "tableName": "kinesisTable_REALTIME", "tableType": "REALTIME", "segmentsConfig": { "replication": "2", "retentionTimeUnit": "DAYS", "retentionTimeValue": "7", "replicasPerPartition": "2", "minimizeDataMovement": false, "timeColumnName": "creationTimestamp", "segmentPushType": "APPEND", "completionConfig": { "completionMode": "DOWNLOAD" } }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, "tableIndexConfig": { "invertedIndexColumns": [ "product" ], "noDictionaryColumns": [ "price" ], "rangeIndexVersion": 2, "autoGeneratedInvertedIndex": false, "createInvertedIndexDuringSegmentGeneration": false, "sortedColumn": [ "creationTimestamp" ], "loadMode": "MMAP", "streamConfigs": { "streamType": "kinesis", "stream.kinesis.topic.name": "pinot-stream", "region": "eu-west-1", "shardIteratorType": "LATEST", "stream.kinesis.consumer.type": "lowlevel", "stream.kinesis.fetch.timeout.millis": "30000", "stream.kinesis.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kinesis.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory", "realtime.segment.flush.threshold.rows": "1400000", "realtime.segment.flush.threshold.time": "1h", "realtime.segment.flush.threshold.size": "200M" }, "varLengthDictionaryColumns": [ "campaign", "color", "department" ], "enableDefaultStarTree": false, "enableDynamicStarTreeCreation": false, "aggregateMetrics": false, "nullHandlingEnabled": false, "optimizeDictionary": false, "optimizeDictionaryForMetrics": false, "noDictionarySizeRatioThreshold": 0 }, "metadata": { "customConfigs": {} }, "isDimTable": false } }
The table has four segments that are in Good State and are in S3. It just stops creating consuming segments and stops reading from kinesis
Please check the controller log for exceptions. The new consuming segment should be created by controller. @KKcorps Can you help with this issue?
What I noticed was that when setting this value: "realtime.segment.flush.threshold.rows": "1400000" as a String, it ignores the size and row number for when completing the segments, and it creates new one only after the threshold time occurs. By setting it not as a string but integer, It does create the new segments. It doesnt reach the suggested size as no segment is above 3 megabytes .
You may refer to this doc on how to configure them: https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion
Threshold time is honored when not enough rows are collected. The value should always be string, and if you want to use size threshold, "realtime.segment.flush.threshold.rows": "0" should be used
Hi! I am running tests in Pinot consuming from Kinesis Data Streams. I am running two r5x.large servers. I noticed that once it reached 8 segments and it wont create any new segments, until it reaches the flush time. This is my current configuration: "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "1h", "realtime.segment.flush.threshold.size": "200M"
When i go into each segment i see that the tresholdrows is defined to be and it doesnt reach the size. (each segment is 1.7 MB in S3) "segment.flush.threshold.size": "150000",
Hi @FranMorilloAWS , are you using on-demand or provisioned mode? Have you checked the kinesis stream metadata? This might be due the stream got closed and new shard got created?
Using On Demand. I believed that by using the low consumer, Pinot would handle the closing and creation of new shards, either on demand or provisioned when scaled. Is that not the case?
@FranMorilloAWS I wonder if setting completion mode to DOWNLOAD is causing this issue.
any particular reason why you are using this config?
"completionConfig": {
"completionMode": "DOWNLOAD"
}
Because at some point it was not downloading the segments to s3, i will eliminate the completion config and try again. Thanks Ill update here
Hi, I am now facing the same issue but not even seeing the segments in S3.