marquez icon indicating copy to clipboard operation
marquez copied to clipboard

Performance issue with /api/v1/lineage/ endpoint — very slow response time

Open NikitaTolpikin opened this issue 6 months ago • 2 comments

Hi Marquez team,

We're encountering significant performance issues with the /api/v1/lineage/ endpoint. On our production environment, this endpoint consistently takes 10–15 seconds to respond.

After investigating the issue, we found two queries that are likely contributing to the slowdown:

  1. Recursive lineage query:

    Query
    WITH RECURSIVE
      job_io AS (
          SELECT
                  io.job_uuid AS job_uuid,
                  io.job_symlink_target_uuid AS job_symlink_target_uuid,
                  ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='INPUT') AS inputs,
                  ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io.io_type='OUTPUT') AS outputs
          FROM job_versions_io_mapping io
          WHERE io.is_current_job_version = TRUE
          GROUP BY io.job_symlink_target_uuid, io.job_uuid
      ),
      lineage(job_uuid, job_symlink_target_uuid, inputs, outputs) AS (
          SELECT job_uuid,
                  job_symlink_target_uuid,
                  COALESCE(inputs, Array[]::uuid[]) AS inputs,
                  COALESCE(outputs, Array[]::uuid[]) AS outputs,
                  0 AS depth
          FROM job_io
          WHERE job_uuid IN ('6ba22b8f-018d-4f8f-9bd5-fcb58a809961') OR job_symlink_target_uuid IN ('6ba22b8f-018d-4f8f-9bd5-fcb58a809961')
          UNION
          SELECT io.job_uuid, io.job_symlink_target_uuid, io.inputs, io.outputs, l.depth + 1
          FROM job_io io, lineage l
          WHERE (io.job_uuid != l.job_uuid) AND
              array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
            AND depth < 4),
      lineage_outside_job_io(job_uuid) AS (
          SELECT
            param_jobs.param_job_uuid as job_uuid,
            j.symlink_target_uuid,
            Array[]::uuid[] AS inputs,
            Array[]::uuid[] AS outputs,
            0 AS depth
          FROM (SELECT unnest(ARRAY['6ba22b8f-018d-4f8f-9bd5-fcb58a809961']::UUID[]) AS param_job_uuid) param_jobs
          LEFT JOIN lineage l on param_jobs.param_job_uuid = l.job_uuid
          INNER JOIN jobs j ON j.uuid = param_jobs.param_job_uuid
          WHERE l.job_uuid IS NULL
      )
      SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids
      FROM (SELECT * FROM lineage UNION SELECT * FROM lineage_outside_job_io) l2
      INNER JOIN jobs_view j ON (j.uuid=l2.job_uuid OR j.uuid=l2.job_symlink_target_uuid);
    

    With query plan looking like this:

    Query plan
     Unique  (cost=123725192.67..123828671.83 rows=55649 width=867) (actual time=2778.552..2778.605 rows=76 loops=1)
       CTE job_io
         ->  GroupAggregate  (cost=0.42..20357.88 rows=55911 width=96) (actual time=519.645..631.676 rows=55646 loops=1)
               Group Key: io.job_symlink_target_uuid, io.job_uuid
               ->  Index Scan using idx_job_versions_io_mapping_current_true on job_versions_io_mapping io  (cost=0.42..17736.05 rows=118878 width=54) (actual time=0.019..67.093 rows=114964 loops=1)
       CTE lineage
         ->  Recursive Union  (cost=0.00..31341336.18 rows=10348008 width=100) (actual time=571.459..2742.415 rows=170 loops=1)
               ->  CTE Scan on job_io  (cost=0.00..1397.78 rows=558 width=100) (actual time=571.453..648.689 rows=1 loops=1)
                     Filter: ((job_uuid = '6ba22b8f-018d-4f8f-9bd5-fcb58a809961'::uuid) OR (job_symlink_target_uuid = '6ba22b8f-018d-4f8f-9bd5-fcb58a809961'::uuid))
                     Rows Removed by Filter: 55645
               ->  Nested Loop  (cost=0.00..3123645.83 rows=1034745 width=100) (actual time=51.523..418.621 rows=143 loops=5)
                     Join Filter: ((io_1.job_uuid <> l_1.job_uuid) AND (array_cat(io_1.inputs, io_1.outputs) && array_cat(l_1.inputs, l_1.outputs)))
                     Rows Removed by Join Filter: 1046002
                     ->  CTE Scan on job_io io_1  (cost=0.00..1118.22 rows=55911 width=96) (actual time=0.005..6.693 rows=55646 loops=5)
                     ->  WorkTable Scan on lineage l_1  (cost=0.00..125.55 rows=1860 width=84) (actual time=0.001..0.002 rows=19 loops=278230)
                           Filter: (depth < 4)
                           Rows Removed by Filter: 15
       ->  Sort  (cost=92363498.60..92415238.18 rows=20695832 width=867) (actual time=2778.551..2778.566 rows=170 loops=1)
             Sort Key: j.uuid
             Sort Method: quicksort  Memory: 120kB
             ->  Hash Left Join  (cost=3424040.71..58158163.45 rows=20695832 width=867) (actual time=2777.131..2778.481 rows=170 loops=1)
                   Hash Cond: (j.parent_job_uuid = p.uuid)
                   ->  Nested Loop  (cost=3408406.61..54334542.05 rows=20695832 width=687) (actual time=2742.876..2743.547 rows=170 loops=1)
                         ->  Unique  (cost=3408405.76..3563625.90 rows=10348009 width=100) (actual time=2742.830..2742.889 rows=170 loops=1)
                               ->  Sort  (cost=3408405.76..3434275.79 rows=10348009 width=100) (actual time=2742.829..2742.846 rows=170 loops=1)
                                     Sort Key: lineage.job_uuid, lineage.job_symlink_target_uuid, lineage.inputs, lineage.outputs, lineage.depth
                                     Sort Method: quicksort  Memory: 45kB
                                     ->  Append  (cost=0.00..504991.26 rows=10348009 width=100) (actual time=571.462..2742.658 rows=170 loops=1)
                                           ->  CTE Scan on lineage  (cost=0.00..206960.16 rows=10348008 width=100) (actual time=571.461..2742.516 rows=170 loops=1)
                                           ->  Nested Loop  (cost=0.45..246291.06 rows=1 width=100) (actual time=0.086..0.090 rows=0 loops=1)
                                                 ->  Hash Right Anti Join  (cost=0.04..246282.62 rows=1 width=16) (actual time=0.086..0.089 rows=0 loops=1)
                                                       Hash Cond: (l.job_uuid = (unnest('{6ba22b8f-018d-4f8f-9bd5-fcb58a809961}'::uuid[])))
                                                       ->  CTE Scan on lineage l  (cost=0.00..206960.16 rows=10348008 width=16) (actual time=0.001..0.015 rows=170 loops=1)
                                                       ->  Hash  (cost=0.02..0.02 rows=1 width=16) (actual time=0.041..0.042 rows=1 loops=1)
                                                             Buckets: 1024  Batches: 1  Memory Usage: 9kB
                                                             ->  ProjectSet  (cost=0.00..0.02 rows=1 width=16) (actual time=0.034..0.035 rows=1 loops=1)
                                                                   ->  Result  (cost=0.00..0.01 rows=1 width=0) (actual time=0.026..0.027 rows=1 loops=1)
                                                 ->  Index Only Scan using idx_jobs_uuid_symlink on jobs j_1  (cost=0.41..8.43 rows=1 width=32) (never executed)
                                                       Index Cond: (uuid = (unnest('{6ba22b8f-018d-4f8f-9bd5-fcb58a809961}'::uuid[])))
                                                       Heap Fetches: 0
                         ->  Bitmap Heap Scan on jobs j  (cost=0.85..4.88 rows=2 width=623) (actual time=0.003..0.003 rows=1 loops=170)
                               Recheck Cond: ((uuid = lineage.job_uuid) OR (uuid = lineage.job_symlink_target_uuid))
                               Filter: ((is_hidden IS FALSE) AND (symlink_target_uuid IS NULL))
                               Heap Blocks: exact=170
                               ->  BitmapOr  (cost=0.85..0.85 rows=2 width=0) (actual time=0.002..0.002 rows=0 loops=170)
                                     ->  Bitmap Index Scan on jobs_pkey  (cost=0.00..0.42 rows=1 width=0) (actual time=0.002..0.002 rows=1 loops=170)
                                           Index Cond: (uuid = lineage.job_uuid)
                                     ->  Bitmap Index Scan on jobs_pkey  (cost=0.00..0.42 rows=1 width=0) (actual time=0.000..0.000 rows=0 loops=170)
                                           Index Cond: (uuid = lineage.job_symlink_target_uuid)
                   ->  Hash  (cost=14014.49..14014.49 rows=55649 width=107) (actual time=34.197..34.197 rows=55646 loops=1)
                         Buckets: 65536  Batches: 2  Memory Usage: 4301kB
                         ->  Seq Scan on jobs p  (cost=0.00..14014.49 rows=55649 width=107) (actual time=0.007..21.482 rows=55646 loops=1)
     Planning Time: 0.664 ms
     JIT:
       Functions: 49
       Options: Inlining true, Optimization true, Expressions true, Deforming true
       Timing: Generation 2.449 ms, Inlining 20.219 ms, Optimization 283.470 ms, Emission 216.067 ms, Total 522.205 ms
     Execution Time: 2782.760 ms
    
  2. Dataset existence check:

    Query
    SELECT EXISTS (SELECT 1 FROM datasets_view AS d WHERE d.name = 'connection__facebook__2588' AND d.namespace_name = '6509_1578_5f1d142a-9406-4f5f-869e-6fd54e');
    

    With query plan looking like this:

    Query plan
    Result  (cost=22419.90..22419.91 rows=1 width=1) (actual time=32185.672..32185.674 rows=1 loops=1)
    InitPlan 1 (returns $1)
        ->  Merge Join  (cost=175.93..133639.78 rows=6 width=0) (actual time=32185.670..32185.671 rows=0 loops=1)
            Merge Cond: (symlinks.dataset_uuid = d.uuid)
            Join Filter: (((CASE WHEN (((d.namespace_name)::text = (namespaces.name)::text) AND ((d.name)::text = (symlinks.name)::text)) THEN d.name ELSE symlinks.name END)::text = 'connection__facebook__2588'::text) AND ((CASE WHEN (((d.namespace_name)::text = (namespaces.name)::text) AND ((d.name)::text = (symlinks.name)::text)) THEN d.namespace_name ELSE namespaces.name END)::text = '6509_1578_5f1d142a-9406-4f5f-869e-6fd54e'::text))
            Rows Removed by Join Filter: 240294
            ->  Nested Loop  (cost=0.71..34371.35 rows=240883 width=148) (actual time=7.292..11628.589 rows=240991 loops=1)
                    ->  Index Scan using dataset_symlinks_dataset_uuid on dataset_symlinks symlinks  (cost=0.42..28163.77 rows=240883 width=117) (actual time=4.833..11435.024 rows=240991 loops=1)
                    ->  Memoize  (cost=0.29..0.30 rows=1 width=63) (actual time=0.000..0.000 rows=1 loops=240991)
                        Cache Key: symlinks.namespace_uuid
                        Cache Mode: logical
                        Hits: 240123  Misses: 868  Evictions: 0  Overflows: 0  Memory Usage: 150kB
                        ->  Index Only Scan using idx_namespaces_uuid_name on namespaces  (cost=0.28..0.29 rows=1 width=63) (actual time=0.031..0.031 rows=1 loops=868)
                                Index Cond: (uuid = symlinks.namespace_uuid)
                                Heap Fetches: 2
            ->  Index Scan using datasets_pkey on datasets d  (cost=0.42..92523.67 rows=240291 width=147) (actual time=0.013..20444.054 rows=240294 loops=1)
                    Filter: (is_hidden IS FALSE)
    Planning Time: 1.286 ms
    Execution Time: 32185.742 ms
    

Dataset Sizes:

Table Row Count
jobs 38,588
job_versions 38,592
runs 1,569,281
datasets 240,260
dataset_versions 8,046,421
lineage_events 8,901,398

Would be happy to collaborate on optimizing these queries, Thanks for the great work on Marquez!

NikitaTolpikin avatar May 14 '25 14:05 NikitaTolpikin

Thanks for opening your first issue in the Marquez project! Please be sure to follow the issue template!

boring-cyborg[bot] avatar May 14 '25 14:05 boring-cyborg[bot]

Duplicate of #2987, still waiting on a response there 😢

Jacoby6000 avatar May 14 '25 18:05 Jacoby6000

@NikitaTolpikin I think it might be related https://github.com/MarquezProject/marquez/issues/2987#issuecomment-3239584088

thijs-s avatar Aug 30 '25 22:08 thijs-s