Daft icon indicating copy to clipboard operation
Daft copied to clipboard

where op + limit op executed very slowly

Open caican00 opened this issue 2 months ago • 8 comments

Describe the bug

There is no where op, and the result is returned in seconds.

import daft

daft.context.set_runner_ray(noop_if_initialized=True)

daft.read_parquet(
    "/mnt/test/processed_data/outputs/"
).limit(10).show()

With where op, it is basically impossible to wait for the result to return.

import daft

daft.context.set_runner_ray(noop_if_initialized=True)

daft.read_parquet(
    "/mnt/test/processed_data/outputs/"
).where("id ='807d7439-af41-494a-ae95-96c716ab4fae'").limit(10).show()

To Reproduce

No response

Expected behavior

No response

Component(s)

Parquet

Additional context

No response

caican00 avatar Oct 17 '25 05:10 caican00

Hi,has anyone encountered this problem please?

caican00 avatar Oct 17 '25 05:10 caican00

@srilman @colin-ho do either of you have context on this?

ohbh avatar Oct 17 '25 22:10 ohbh

@caican00 Your two code examples are doing very different things. Their runtime behavior is incomparable.

The first case is doing something very simple and will always be very fast. The second example requires doing a scan through the entire data, which means its runtime is proportional to the data you provide.

How large is /mnt/test/processed_data/outputs/? And what's the distribution of things with "id ='807d7439-af41-494a-ae95-96c716ab4fae'" in the data? If your data is very large and "id ='807d7439-af41-494a-ae95-96c716ab4fae'" records are sparse, then getting the first 10 will take some time.

Your first example is just:

  • define the data as coming from a local path
  • only grabbing the first 8 elements and then displaying them to STDOUT

Your second example is more complex. It is:

  • define the data as coming from a local path
  • define a where clause: start filtering out data that doesn't have "id ='807d7439-af41-494a-ae95-96c716ab4fae'"
  • you may have millions of records to inspect before you find one with the correct id value
  • grab the first 10 results of the where
  • display the first 8 elements of these to STDOUT

Note that if you only have e.g. 9 records with a matching id value, then Daft will have to do a full scan through all data. While Daft is a streaming engine, the limit(10) means that Daft will need to find 10 matching records before it can stop.

As you can see, in the second example, your .where means that Daft has to scan through your data to find records with a matching id value. This takes time :)

Performance here depends on your data size and your data distribution! This is expected behavior, so I'm closing this. If there's something new that you find, feel free to re-open and add. Or, if you find something very different, please feel free to open a new issue.

malcolmgreaves avatar Oct 20 '25 23:10 malcolmgreaves

Hi @malcolmgreaves thanks for you reply.

i got it. however, in my scenario, for this simple query with filter, it took three days to complete the execution, just 2404 parquet files, and each parquet file contains 500,000 records.

I thought that there may be some issues with daft framework. It seems to be executed serially.

Would it be faster if it could be executed in parallel?

caican00 avatar Oct 21 '25 03:10 caican00

That's great insight -- it seems like there's more to this one, so I'll reopen.

The where clause should be implemented in parallel. For flotilla, it should be distributed to each swordfish worker. In swordfish itself, I'm unsure if it's IO bound and thus we still have to do sequential reads (that's my guess). But we should be able to actually search within each large chunk of records we read in parallel.

Let me ask some other Daft folks what's the story here.

Do you have any publicly available data that I could run on to reproduce the same behavior you're seeing?

malcolmgreaves avatar Oct 21 '25 16:10 malcolmgreaves

The where clause should be implemented in parallel. For flotilla, it should be distributed to each swordfish worker. In swordfish itself, I'm unsure if it's IO bound and thus we still have to do sequential reads (that's my guess). But we should be able to actually search within each large chunk of records we read in parallel.

I think so, too.

Do you have any publicly available data that I could run on to reproduce the same behavior you're seeing?

I'm sorry i don't have any public data. But as long as it is a sparse dataset, it is very easy to reproduce.

cc @malcolmgreaves

caican00 avatar Oct 22 '25 04:10 caican00

Hi @caican00, could you share the output of the query plan, e.g. via

df = daft.read_parquet(
    "/mnt/test/processed_data/outputs/"
).where("id ='807d7439-af41-494a-ae95-96c716ab4fae'").limit(10)

df.explain(True)

It also seems that you're using the ray runner but to read from a locally mounted folder. Is this happening on a single machine? In that case could you remove daft.context.set_runner_ray(noop_if_initialized=True) and try it out? (and if you don't see any ⚔️ 🐟 logos, then trying daft.context.set_runner_native() may be good too).

desmondcheongzx avatar Oct 23 '25 00:10 desmondcheongzx

Hi @desmondcheongzx, i made an optimization about this and it worked quite well. I will submit a pr later. Thanks for your reply.

caican00 avatar Oct 27 '25 02:10 caican00

Hi @desmondcheongzx, i made an optimization about this and it worked quite well. I will submit a pr later. Thanks for your reply.

Hi @desmondcheongzx @malcolmgreaves i create a pr to optimize this issue, could you help review it? https://github.com/Eventual-Inc/Daft/pull/5601

caican00 avatar Nov 20 '25 03:11 caican00