FlowKit icon indicating copy to clipboard operation
FlowKit copied to clipboard

Update `LastLocation` to use `SELECT DISTINCT ON`

Open jc-harrison opened this issue 1 year ago • 2 comments

It appears we could get a modest performance improvement in LastLocation queries by re-implementing as

SELECT DISTINCT ON (subscriber_locs.subscriber) subscriber_locs.subscriber, time, {rc}
FROM ({subscriber_locs}) AS subscriber_locs
ORDER BY subscriber_locs.subscriber, time DESC

instead of the current

SELECT final_time.subscriber, {rc}
FROM
     (SELECT subscriber_locs.subscriber, time, {rc},
     row_number() OVER (PARTITION BY subscriber_locs.subscriber ORDER BY time DESC)
         AS rank
     FROM ({subscriber_locs}) AS subscriber_locs) AS final_time
WHERE rank = 1

I ran a test on one day of calls data with ~14M events and ~2M subscribers, and saw a ~10% speed-up from 34.4s to 30.8s for the following query:

fm.features.subscriber.last_location.LastLocation(
    start='2024-03-01',
    stop='2024-03-02',
    spatial_unit=fm.core.spatial_unit.make_spatial_unit(
        spatial_unit_type="admin",
        level=3,
        mapping_table="geography.precomputed_cell_to_admin_mapping",
    ),
    table=["events.calls"],
)

Explain output for the current implementation:

Subquery Scan on final_time  (cost=837293.82..883625.99 rows=7128 width=50) (actual time=24182.071..34179.851 rows=1835670 loops=1)
   Filter: (final_time.rank = 1)
   Rows Removed by Filter: 10968194
   ->  WindowAgg  (cost=837293.82..865805.92 rows=1425605 width=66) (actual time=24182.069..33433.680 rows=12803864 loops=1)
         ->  Sort  (cost=837293.82..840857.84 rows=1425605 width=58) (actual time=24182.049..28152.223 rows=12803864 loops=1)
               Sort Key: calls_20240301.msisdn, calls_20240301.datetime DESC
               Sort Method: external merge  Disk: 952288kB
               ->  Gather  (cost=8733.70..645629.59 rows=1425605 width=58) (actual time=945.625..3623.561 rows=12803864 loops=1)
                     Workers Planned: 6
                     Workers Launched: 6
                     ->  Parallel Hash Join  (cost=7733.70..502069.09 rows=237601 width=58) (actual time=882.983..3431.828 rows=1829123 loops=7)
                           Hash Cond: (calls_20240301.location_id = loc_table.id)
                           Join Filter: (((calls_20240301.datetime)::date >= COALESCE((loc_table.date_of_first_service)::timestamp with time zone, '-infinity'::timestamp with time zone)) AND ((calls_20240301.datetime)::date <= COALESCE((loc_table.date_of_last_service)::timestamp with time zone, 'infinity'::timestamp with time zone)))
                           ->  Parallel Append  (cost=0.00..454828.47 rows=2149445 width=59) (actual time=0.046..1062.639 rows=1842069 loops=7)
                                 ->  Parallel Seq Scan on calls_20240301  (cost=0.00..444081.24 rows=2149445 width=59) (actual time=0.023..835.496 rows=1842069 loops=7)
                                       Filter: ((location_id IS NOT NULL) AND (datetime >= '2024-03-01 00:00:00+00'::timestamp with time zone) AND (datetime < '2024-03-02 00:00:00+00'::timestamp with time zone) AND (location_id <> ''::text))
                                       Rows Removed by Filter: 149306
                                 ->  Parallel Seq Scan on calls  (cost=0.00..0.00 rows=1 width=72) (actual time=0.002..0.002 rows=0 loops=2)
                                       Filter: ((location_id IS NOT NULL) AND (datetime >= '2024-03-01 00:00:00+00'::timestamp with time zone) AND (datetime < '2024-03-02 00:00:00+00'::timestamp with time zone) AND (location_id <> ''::text))
                           ->  Parallel Hash  (cost=7481.59..7481.59 rows=20169 width=39) (actual time=882.716..882.722 rows=4898 loops=7)
                                 Buckets: 65536  Batches: 1  Memory Usage: 3040kB
                                 ->  Merge Join  (cost=7006.73..7481.59 rows=20169 width=39) (actual time=815.473..821.887 rows=4898 loops=7)
                                       Merge Cond: (precomputed_cell_to_admin_mapping.id = loc_table.id)
                                       ->  Sort  (cost=3493.65..3544.08 rows=20169 width=20) (actual time=694.039..694.515 rows=4898 loops=7)
                                             Sort Key: precomputed_cell_to_admin_mapping.id
                                             Sort Method: quicksort  Memory: 827kB
                                             Worker 0:  Sort Method: quicksort  Memory: 2734kB
                                             Worker 1:  Sort Method: quicksort  Memory: 40kB
                                             Worker 2:  Sort Method: quicksort  Memory: 40kB
                                             Worker 3:  Sort Method: quicksort  Memory: 40kB
                                             Worker 4:  Sort Method: quicksort  Memory: 40kB
                                             Worker 5:  Sort Method: quicksort  Memory: 40kB
                                             ->  Hash Left Join  (cost=285.13..2051.59 rows=20169 width=20) (actual time=689.184..691.967 rows=4898 loops=7)
                                                   Hash Cond: (precomputed_cell_to_admin_mapping.admin3pcod = (admin3.admin3pcod)::text)
                                                   ->  Parallel Index Only Scan using precomputed_cell_to_admin_mapping_id_admin3pcod_idx on precomputed_cell_to_admin_mapping  (cost=0.29..1489.42 rows=20169 width=21) (actual time=0.036..1.744 rows=4898 loops=7)
                                                         Heap Fetches: 34288
                                                   ->  Hash  (cost=277.71..277.71 rows=570 width=9) (actual time=689.129..689.131 rows=570 loops=7)
                                                         Buckets: 1024  Batches: 1  Memory Usage: 32kB
                                                         ->  Subquery Scan on admin3  (cost=0.00..277.71 rows=570 width=9) (actual time=687.613..688.965 rows=570 loops=7)
                                                               ->  Seq Scan on geoms  (cost=0.00..272.01 rows=570 width=81) (actual time=687.611..688.877 rows=570 loops=7)
                                                                     Filter: (spatial_resolution = 3)
                                                                     Rows Removed by Filter: 151
                                       ->  Sort  (cost=3513.08..3599.24 rows=34465 width=19) (actual time=121.244..122.172 rows=10547 loops=7)
                                             Sort Key: loc_table.id
                                             Sort Method: quicksort  Memory: 3152kB
                                             Worker 0:  Sort Method: quicksort  Memory: 3152kB
                                             Worker 1:  Sort Method: quicksort  Memory: 3152kB
                                             Worker 2:  Sort Method: quicksort  Memory: 3152kB
                                             Worker 3:  Sort Method: quicksort  Memory: 3152kB
                                             Worker 4:  Sort Method: quicksort  Memory: 3152kB
                                             Worker 5:  Sort Method: quicksort  Memory: 3152kB
                                             ->  Seq Scan on cells loc_table  (cost=0.00..915.65 rows=34465 width=19) (actual time=0.027..8.693 rows=34465 loops=7)
 Planning Time: 123.532 ms
 JIT:
   Functions: 258
   Options: Inlining true, Optimization true, Expressions true, Deforming true
   Timing: Generation 34.692 ms, Inlining 454.648 ms, Optimization 2546.012 ms, Emission 1807.859 ms, Total 4843.212 ms
 Execution Time: 34416.014 ms

And for the modified implementation using SELECT DISTINCT ON:

Unique  (cost=837293.82..844421.85 rows=1425605 width=58) (actual time=26080.790..30641.673 rows=1835670 loops=1)
   ->  Sort  (cost=837293.82..840857.84 rows=1425605 width=58) (actual time=26080.789..29374.052 rows=12803864 loops=1)
         Sort Key: calls_20240301.msisdn, calls_20240301.datetime DESC
         Sort Method: external merge  Disk: 952288kB
         ->  Gather  (cost=8733.70..645629.59 rows=1425605 width=58) (actual time=952.785..3794.816 rows=12803864 loops=1)
               Workers Planned: 6
               Workers Launched: 6
               ->  Parallel Hash Join  (cost=7733.70..502069.09 rows=237601 width=58) (actual time=890.494..3493.537 rows=1829123 loops=7)
                     Hash Cond: (calls_20240301.location_id = loc_table.id)
                     Join Filter: (((calls_20240301.datetime)::date >= COALESCE((loc_table.date_of_first_service)::timestamp with time zone, '-infinity'::timestamp with time zone)) AND ((calls_20240301.datetime)::date <= COALESCE((loc_table.date_of_last_service)::timestamp with time zone, 'infinity'::timestamp with time zone)))
                     ->  Parallel Append  (cost=0.00..454828.47 rows=2149445 width=59) (actual time=0.052..1081.464 rows=1842069 loops=7)
                           ->  Parallel Seq Scan on calls_20240301  (cost=0.00..444081.24 rows=2149445 width=59) (actual time=0.027..853.991 rows=1842069 loops=7)
                                 Filter: ((location_id IS NOT NULL) AND (datetime >= '2024-03-01 00:00:00+00'::timestamp with time zone) AND (datetime < '2024-03-02 00:00:00+00'::timestamp with time zone) AND (location_id <> ''::text))
                                 Rows Removed by Filter: 149306
                           ->  Parallel Seq Scan on calls  (cost=0.00..0.00 rows=1 width=72) (actual time=0.002..0.002 rows=0 loops=2)
                                 Filter: ((location_id IS NOT NULL) AND (datetime >= '2024-03-01 00:00:00+00'::timestamp with time zone) AND (datetime < '2024-03-02 00:00:00+00'::timestamp with time zone) AND (location_id <> ''::text))
                     ->  Parallel Hash  (cost=7481.59..7481.59 rows=20169 width=39) (actual time=890.217..890.223 rows=4898 loops=7)
                           Buckets: 65536  Batches: 1  Memory Usage: 3040kB
                           ->  Merge Join  (cost=7006.73..7481.59 rows=20169 width=39) (actual time=804.613..810.019 rows=4898 loops=7)
                                 Merge Cond: (precomputed_cell_to_admin_mapping.id = loc_table.id)
                                 ->  Sort  (cost=3493.65..3544.08 rows=20169 width=20) (actual time=698.188..698.687 rows=4898 loops=7)
                                       Sort Key: precomputed_cell_to_admin_mapping.id
                                       Sort Method: quicksort  Memory: 4120kB
                                       Worker 0:  Sort Method: quicksort  Memory: 40kB
                                       Worker 1:  Sort Method: quicksort  Memory: 40kB
                                       Worker 2:  Sort Method: quicksort  Memory: 40kB
                                       Worker 3:  Sort Method: quicksort  Memory: 40kB
                                       Worker 4:  Sort Method: quicksort  Memory: 40kB
                                       Worker 5:  Sort Method: quicksort  Memory: 40kB
                                       ->  Hash Left Join  (cost=285.13..2051.59 rows=20169 width=20) (actual time=693.764..696.258 rows=4898 loops=7)
                                             Hash Cond: (precomputed_cell_to_admin_mapping.admin3pcod = (admin3.admin3pcod)::text)
                                             ->  Parallel Index Only Scan using precomputed_cell_to_admin_mapping_id_admin3pcod_idx on precomputed_cell_to_admin_mapping  (cost=0.29..1489.42 rows=20169 width=21) (actual time=0.040..1.492 rows=4898 loops=7)
                                                   Heap Fetches: 34288
                                             ->  Hash  (cost=277.71..277.71 rows=570 width=9) (actual time=693.702..693.704 rows=570 loops=7)
                                                   Buckets: 1024  Batches: 1  Memory Usage: 32kB
                                                   ->  Subquery Scan on admin3  (cost=0.00..277.71 rows=570 width=9) (actual time=692.423..693.581 rows=570 loops=7)
                                                         ->  Seq Scan on geoms  (cost=0.00..272.01 rows=570 width=81) (actual time=692.421..693.511 rows=570 loops=7)
                                                               Filter: (spatial_resolution = 3)
                                                               Rows Removed by Filter: 151
                                 ->  Sort  (cost=3513.08..3599.24 rows=34465 width=19) (actual time=106.217..106.833 rows=5708 loops=7)
                                       Sort Key: loc_table.id
                                       Sort Method: quicksort  Memory: 3152kB
                                       Worker 0:  Sort Method: quicksort  Memory: 3152kB
                                       Worker 1:  Sort Method: quicksort  Memory: 3152kB
                                       Worker 2:  Sort Method: quicksort  Memory: 3152kB
                                       Worker 3:  Sort Method: quicksort  Memory: 3152kB
                                       Worker 4:  Sort Method: quicksort  Memory: 3152kB
                                       Worker 5:  Sort Method: quicksort  Memory: 3152kB
                                       ->  Seq Scan on cells loc_table  (cost=0.00..915.65 rows=34465 width=19) (actual time=0.028..6.899 rows=34465 loops=7)
 Planning Time: 106.511 ms
 JIT:
   Functions: 253
   Options: Inlining true, Optimization true, Expressions true, Deforming true
   Timing: Generation 36.442 ms, Inlining 551.970 ms, Optimization 2649.197 ms, Emission 1641.969 ms, Total 4879.578 ms
 Execution Time: 30874.473 ms

Of particular note:

  • Both query plans are the same except for the final step (WindowAgg with Filter vs Unique). If we look at just the time from the end of the common sort operation, the modified query took 1.5s for the 'Unique' operation vs 6.3s for the WindowAgg and Filter.
  • The estimated row count is much closer to the true value in the 'DISTINCT ON' variant than the original (1425605 vs 7128, compared to true row count 1835670), indicating postgres is not able to infer that each window will have exactly one row with rank=1 and plan accordingly.
  • Worth noting that this query was operating on one day of one event type (i.e. a single event table partition), within which rows were already sorted by msisdn. Results may differ if run on a multiple-day data span or on a union of multiple event types, in which case the initial data would not already be sorted by msisdn - although I would expect this to have the same impact on both variants.

There are other instances in flowmachine where the "rank within window; select where rank=1" approach is used (e.g. ModalLocation) - it would be worth checking whether the same modification would help in those instances.

jc-harrison avatar May 16 '24 12:05 jc-harrison

Ooo, that's interesting.. We explicitly avoided us of DISTINCT because it used a slower code path as I recall. This test is still on a pg12 flowdb, right?

greenape avatar May 16 '24 12:05 greenape

COUNT(DISTINCT ...) uses a slower code path. I'm not sure the same is true of SELECT DISTINCT ON.

Yes, this is still on a pg12 flowdb.

jc-harrison avatar May 16 '24 12:05 jc-harrison