pinot icon indicating copy to clipboard operation
pinot copied to clipboard

RealtimeToOfflineSegmentsTask tasks time out with large amount of rows

Open lksvenoy-r7 opened this issue 3 years ago • 9 comments

We are seeing an issue in production where the RealtimeToOfflineSegments task is taking a very long time to complete, and timing out in the end.

There are two areas where it is slow, during segment mapping: 2022-01-12T12:40:28Z Initialized mapper with 43 record readers

And during row sorting 2022-01-13T10:10:09Z Start sorting on numRows: 42757049, numSortFields: 9

We are using Pinot 0.9.3 with 8 minions, with 8 GB heap, 16GB memory and 4 cores of CPU.

Task config:

    "taskTypeConfigsMap": {
      "RealtimeToOfflineSegmentsTask": {
        "bucketTimePeriod": "3h",
        "bufferTimePeriod": "2d",
        "mergeType": "dedup",
        "maxNumRecordsPerSegment": 10000000,
        "roundBucketTimePeriod": "1h"
      }
    }

See log files and profiling screenshots

Full run log log1.txt

Sometimes segment mapping happens with up to 90 record readers. This stage then takes 35 minutes up to 1 hour. Once that’s done, it starts the usual reduction and sorting phase. (Even though the task should have been cancelled by now due to hitting timeout).

It then starts destroying segments, and then immediately finishes with a timeout after destroying segments. I’m not sure exactly what happens here, but I’m wondering if this is causing an issue where the task is not properly cleaning up, causing loss of data. The logs above show the final moments where the task times out after issuing delete segments.

I have been talking with @richardstartin on slack, and he pointed me towards this issue: https://github.com/apache/pinot/issues/7929

To circumvent this problem, I disabled all raw indexes.

After doing that, I ran another task, here is the log output: log2.txt

Looking at this log file, we can see that sorting rows took 1.42 hours for 75 million rows

2022-01-12T14:19:37Z Start sorting on numRows: 75094464, numSortFields: 9
2022-01-12T15:45:24Z Finish sorting in 5147498ms

We decided to profile the application to try figure out what is going on using: jcmd <pid> JFR.start duration=300s filename=minion.jfr settings=profile

Profiling during segment mapping: 2022-01-13T09:53:44Z Initialized mapper with 45 record readers, output dir: /var/pinot/minion/data/RealtimeToOfflineSegmentsTask/tmp-a4085745-be54-4411-abfa-c0dfb8fa05a1/workingDir/mapper_output, timeHandler: class org.apache.pinot.core.segment.processing.timehandler.EpochTimeHandler, partitioners: class org.apache.pinot.core.segment.processing.partitioner.TableConfigPartitioner

Top 5 methods during segment mapping: image (4) image (3) image (2) image (1) image

Top 5 TLAB during segment mapping: image (9) image (8) image (7) image (6) image (5)

Profiling during segment sorting 2022-01-13T10:10:09Z Start sorting on numRows: 42757049, numSortFields: 9

Top 5 methods during segment sorting: image (14) image (13) image (12) image (11) image (10)

Top 5 TLAB during segment sorting: image (19) image (18) image (17) image (16) image (15)

Looking at the EBS volume attached to the minion, which has 500 GB and 3000 IOPs, it's maxing out the iOPS on reads. image (20)

Here is the resource usage of the minion: image (23) image (22) image (21)

@richardstartin indicated that this is due a 9 dimensional sort taking place (on 42 million rows) due to dedup being enabled. I am currently trying to work-around the issue by disabling dedup to see how that goes. This still does not explain why the segment mapping is slow.

Here is the code Richard Startin linked which is causing the excessive disk usage

   _sortedRowIds = new int[numRows];
      for (int i = 0; i < numRows; i++) {
        _sortedRowIds[i] = i;
      }
      Arrays
          .quickSort(0, _endRowId, (i1, i2) -> _fileReader.compare(_sortedRowIds[i1], _sortedRowIds[i2]), (i1, i2) -> {
            int temp = _sortedRowIds[i1];
            _sortedRowIds[i1] = _sortedRowIds[i2];
            _sortedRowIds[i2] = temp;
          });

Update: I've changed the realtime segments task to concat. This seems to alleviate the problem of sorting rows for most tables, with the exception of two large outliers. Below are profiling screenshots from those two tables.

Large Table 1 Segment mapping is still running, it's been running for 1 hour.

Method Profiling Flame View image

TLAB Allocations Flame View image

Large Table 2 Segment mapping is still running, it's been running for 1 hour.

Method Profiling Flame View image

TLAB Allocations Flame View image

lksvenoy-r7 avatar Jan 13 '22 11:01 lksvenoy-r7

@kishoreg this is a similar issues to what we are experiencing during our RealtimeToOfflineSegmentsTask. It's taking almost 8 hours to process 175m rows of data. Initialized mapper with 56 record readers (takes almost 2 hours) Start sorting on numRows: 172432000, numSortFields: 44 takes Finish sorting in 10563254ms (takes almost 3 hours) Finish creating rollup file in 5990480ms (takes about 1.5 hours)

We are using one minion that is 4cpu/32gb ram/300gb storage and it's running on one thread it seems.

One more note. The task will run to completion regardless of the RealtimeToOfflineSegmentsTask.timeoutMs setting. But if it exceeds that timeout, it will report as cancelled and then as TIMED_OUT like so: handling message: 76b09bf7-8cf1-45c5-9fd8-3826969af8b1 transit TaskQueue_RealtimeToOfflineSegmentsTask_Task_RealtimeToOfflineSegmentsTask_1659517259439.TaskQueue_RealtimeToOfflineSegmentsTask_Task_RealtimeToOfflineSegmentsTask_1659517259439_0|[] from:RUNNING to:TIMED_OUT, relayedFrom: null

bdstuart avatar Aug 03 '22 19:08 bdstuart

@bdstuart Seems you are trying to merge 56 segments (172M records) into one, which is causing the timeout. Do you change the config of maxNumRecordsPerTask? Ideally we should generate multiple tasks so that the workload can be distributed across different minions/threads

Jackie-Jiang avatar Aug 03 '22 23:08 Jackie-Jiang

Here is the relevant portion of the configuration: "taskTypeConfigsMap": { "RealtimeToOfflineSegmentsTask": { "bucketTimePeriod": "1d", "bufferTimePeriod": "1d", "roundBucketTimePeriod": "1d", "mergeType": "rollup", "user_raw_risk_score.aggregationType": "max", "user_risk_score.aggregationType": "max", "user_threat_score.aggregationType": "max", "maxNumRecordsPerSegment": "5000000", "schedule": "$REALTIME_TO_OFFLINE_SEGMENT_TASK_SCHEDULE" } The process has worked when we had a smaller number of records, around 18m. It created 4 segments as expected. 3 w/ 5m and 1 w/ the rest.

bdstuart avatar Aug 03 '22 23:08 bdstuart

@Jackie-Jiang , I used this as my guide: https://docs.pinot.apache.org/operators/operating-pinot/pinot-managed-offline-flows

bdstuart avatar Aug 03 '22 23:08 bdstuart

@Jackie-Jiang the process finished in 9.5 hours last night and produced 18 new offline rolled up segments.

bdstuart avatar Aug 04 '22 13:08 bdstuart

Just checked the code and maxNumRecordsPerTask is not honored in the RealtimeToOfflineSegmentsTask, which is the reason why one single thread is processing all the records for each day. cc @npawar

How many stream partitions do you have in this table? What is the commit frequency? One work-around is to reduce the bucketTimePeriod so that each task picks up less segments

Jackie-Jiang avatar Aug 05 '22 21:08 Jackie-Jiang

I think we are ok for now @Jackie-Jiang, with our production level ingestion we only need to run the job 1x every 24 hours and it's running in ~10 hours. Would be nice to have it run faster for the future as we add new data sources, but it sounds like that's in the works for an enhancement.

bdstuart avatar Aug 05 '22 22:08 bdstuart

Yeah, let me open an issue for that so it is easier to track

Jackie-Jiang avatar Aug 05 '22 22:08 Jackie-Jiang

Seems we can close this issue, as the specific thing to address has been identified in #9177 ?

npawar avatar Aug 11 '22 16:08 npawar