where op + limit op executed very slowly
Describe the bug
There is no where op, and the result is returned in seconds.
import daft
daft.context.set_runner_ray(noop_if_initialized=True)
daft.read_parquet(
"/mnt/test/processed_data/outputs/"
).limit(10).show()
With where op, it is basically impossible to wait for the result to return.
import daft
daft.context.set_runner_ray(noop_if_initialized=True)
daft.read_parquet(
"/mnt/test/processed_data/outputs/"
).where("id ='807d7439-af41-494a-ae95-96c716ab4fae'").limit(10).show()
To Reproduce
No response
Expected behavior
No response
Component(s)
Parquet
Additional context
No response
Hi,has anyone encountered this problem please?
@srilman @colin-ho do either of you have context on this?
@caican00 Your two code examples are doing very different things. Their runtime behavior is incomparable.
The first case is doing something very simple and will always be very fast. The second example requires doing a scan through the entire data, which means its runtime is proportional to the data you provide.
How large is /mnt/test/processed_data/outputs/? And what's the distribution of things with "id ='807d7439-af41-494a-ae95-96c716ab4fae'" in the data? If your data is very large and "id ='807d7439-af41-494a-ae95-96c716ab4fae'" records are sparse, then getting the first 10 will take some time.
Your first example is just:
- define the data as coming from a local path
- only grabbing the first 8 elements and then displaying them to STDOUT
Your second example is more complex. It is:
- define the data as coming from a local path
- define a
whereclause: start filtering out data that doesn't have"id ='807d7439-af41-494a-ae95-96c716ab4fae'"
- you may have millions of records to inspect before you find one with the correct
idvalue
- grab the first 10 results of the
where - display the first 8 elements of these to STDOUT
Note that if you only have e.g. 9 records with a matching id value, then Daft will have to do a full scan through all data. While Daft is a streaming engine, the limit(10) means that Daft will need to find 10 matching records before it can stop.
As you can see, in the second example, your .where means that Daft has to scan through your data to find records with a matching id value. This takes time :)
Performance here depends on your data size and your data distribution! This is expected behavior, so I'm closing this. If there's something new that you find, feel free to re-open and add. Or, if you find something very different, please feel free to open a new issue.
Hi @malcolmgreaves thanks for you reply.
i got it. however, in my scenario, for this simple query with filter, it took three days to complete the execution, just 2404 parquet files, and each parquet file contains 500,000 records.
I thought that there may be some issues with daft framework. It seems to be executed serially.
Would it be faster if it could be executed in parallel?
That's great insight -- it seems like there's more to this one, so I'll reopen.
The where clause should be implemented in parallel. For flotilla, it should be distributed to each swordfish worker. In swordfish itself, I'm unsure if it's IO bound and thus we still have to do sequential reads (that's my guess). But we should be able to actually search within each large chunk of records we read in parallel.
Let me ask some other Daft folks what's the story here.
Do you have any publicly available data that I could run on to reproduce the same behavior you're seeing?
The where clause should be implemented in parallel. For flotilla, it should be distributed to each swordfish worker. In swordfish itself, I'm unsure if it's IO bound and thus we still have to do sequential reads (that's my guess). But we should be able to actually search within each large chunk of records we read in parallel.
I think so, too.
Do you have any publicly available data that I could run on to reproduce the same behavior you're seeing?
I'm sorry i don't have any public data. But as long as it is a sparse dataset, it is very easy to reproduce.
cc @malcolmgreaves
Hi @caican00, could you share the output of the query plan, e.g. via
df = daft.read_parquet(
"/mnt/test/processed_data/outputs/"
).where("id ='807d7439-af41-494a-ae95-96c716ab4fae'").limit(10)
df.explain(True)
It also seems that you're using the ray runner but to read from a locally mounted folder. Is this happening on a single machine? In that case could you remove daft.context.set_runner_ray(noop_if_initialized=True) and try it out? (and if you don't see any ⚔️ 🐟 logos, then trying daft.context.set_runner_native() may be good too).
Hi @desmondcheongzx, i made an optimization about this and it worked quite well. I will submit a pr later. Thanks for your reply.
Hi @desmondcheongzx, i made an optimization about this and it worked quite well. I will submit a pr later. Thanks for your reply.
Hi @desmondcheongzx @malcolmgreaves i create a pr to optimize this issue, could you help review it? https://github.com/Eventual-Inc/Daft/pull/5601