Daft icon indicating copy to clipboard operation
Daft copied to clipboard

QA: Compared the performance differences between Ray data and Daft in the same scenario.

Open Jay-ju opened this issue 8 months ago • 6 comments

Is your feature request related to a problem?

I tried to make some comparisons between the custom embedding scenarios of ray data and the scenarios of daft. However, after the comparison, the performance of daft is still a bit below expectations. I would like to consult here if there are any problems with its usage.

Reproduce script:

For Ray

class LoadImageOperator:
    def __init__(self):
        self.tos_client = create_tos_client(TOS_CONFIG)
        self.output_schema = pa.schema([
            ("url", pa.string()),
            ("image_bytes", pa.binary())  # Stores serialized image bytes
        ])

    def __call__(self, batch: pa.Table) -> pa.Table:
        try:
            # Convert URLs and load images
            urls = [normalize_s3_path(url.as_py()) for url in batch["url"]]
            image_bytes = [self._load_image(url) for url in urls]
            print(f"++++++ {len(image_bytes[0])}")
            
            return pa.Table.from_arrays([
                batch["url"],
                pa.array(image_bytes)
            ], schema=self.output_schema)
            
        except Exception as e:
            print(f"Image loading error: {str(e)[:200]}")
            return pa.Table.from_arrays([
                batch["url"],
                pa.nulls(len(batch), pa.binary())
            ], schema=self.output_schema)

    def _load_image(self, url: str) -> bytes:
        with self.tos_client.open(url, "rb") as f:
            return f.read()

class EmbeddingOperator:
    def __init__(self, model_path: str):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self._preprocessor = AutoImageProcessor.from_pretrained(model_path)
        self._model = AutoModel.from_pretrained(model_path).to(self.device)
        self._model.eval()
        
        self.output_schema = pa.schema([
            ("url", pa.string()),
            ("embedding", pa.list_(pa.float32(), 768))
        ])

    def __call__(self, batch: pa.Table) -> pa.Table:
        start = time.perf_counter()
        try:
            # Deserialize image bytes
            images = [self._bytes_to_image(bytes_data.as_py()) for bytes_data in batch["image_bytes"]]
            
            # Generate embeddings
            with torch.no_grad(), torch.cuda.amp.autocast():
                inputs = self._preprocessor(
                    images=images, 
                    return_tensors="pt"
                ).to(self.device)
                
                outputs = self._model(**inputs)
                vectors = torch.mean(outputs.last_hidden_state, dim=1)
                l2_norm = torch.norm(vectors, p=2, dim=1, keepdim=True)
                embeddings = (vectors / l2_norm).cpu().numpy().tolist()
            
            return pa.Table.from_arrays([
                batch["url"],
                pa.array(embeddings, type=pa.list_(pa.float32(), 768))
            ], schema=self.output_schema)
            
        except Exception as e:
            print(f"Embedding error: {str(e)[:200]}")
            return pa.Table.from_arrays([
                batch["url"],
                pa.nulls(len(batch), pa.list_(pa.float32(), 768))
            ], schema=self.output_schema)

    def _bytes_to_image(self, bytes_data: bytes) -> Image.Image:
        import io
        """Restore image from byte stream"""
        s = Image.open(io.BytesIO(bytes_data))
        return s

def main():
    ray.init()
    
    # Stage 1: CPU image loading
    df = ray.data.read_csv(
        paths=[DEFAULT_TOS_META_OUTPUT_FILE],
        filesystem=create_tos_client(TOS_CONFIG),
        override_num_blocks=1000,
    )
    
    df = df.map_batches(
        LoadImageOperator,
        batch_size=2,
        num_cpus=0.1,  # 0.1 CPU cores per task
        concurrency=5,  # Adjusted based on total CPU cores
        batch_format="pyarrow"
    )
    
    df = df.map_batches(
        EmbeddingOperator,
        fn_constructor_args=("/data00/model/dinov2-base",),
        batch_size=BATCH_SIZE,
        num_gpus=0.1,   # 0.1 GPU per task
        num_cpus=0,
        concurrency=10,  # Adjusted based on GPU count
        batch_format="pyarrow"
    )
    
    df.write_lance(
        "/data00/embeddings.lance",
        schema=pa.schema([
            ("url", pa.string()),
            ("embedding", pa.list_(pa.float32(), 768))
        ])
    )
    
    df = daft.read_lance("/data00/embeddings.lance").where("embedding is null")
    df.show(10)

if __name__ == "__main__":
    start = time.perf_counter()
    main()
    print(f"Total time: {time.perf_counter()-start:.2f}s")

For Daft

 daft.set_execution_config(enable_ray_tracing=True, shuffle_algorithm="flight_shuffle")
# Read data
df = daft.read_csv(path=DEFAULT_TOS_META_OUTPUT_FILE, io_config=io_config)
df = df.repartition(4)

# Download images from URLs and store as binary
df = df.with_column(
    "image_bytes", df["url"].url.download(max_connections=1024, on_error="null", io_config=io_config)
)
df = df.with_column("images", df["image_bytes"].image.decode(on_error="null"))

# Generate embeddings
embedding_udf = PictureEmbedding.with_init_args(path="/data00/model/dinov2-base")
df = df.with_column("embedding", embedding_udf(daft.col("images")))

# Save to Lance format
df = df.select("embedding", "url")
df.write_lance("/data00/tmp.lance")

Describe the solution you'd like

null

Describe alternatives you've considered

No response

Additional Context

No response

Would you like to implement a fix?

No

Jay-ju avatar Apr 18 '25 08:04 Jay-ju

What is relatively magical here is that the utilization rate of the GPU is actually higher for daft. However, the end-to-end time of daft is even longer. I am speculating whether ray data is a streaming execution mode of the pipeline, while daft is stage by stage? The main reason for me to perform repartition after read_csv is that the concurrency of csv seems not to meet the requirements. But I haven't seen how to directly adjust the concurrency at the data source layer.

Jay-ju avatar Apr 18 '25 09:04 Jay-ju

Are you running on a single node or in a cluster? If single node, I'd suggest using the native runner first, i.e. set_runner_native. I'd also recommend specifying with_concurrency and batch_size in the picture embedding UDF as well.

Also regarding the execution configs, enable_ray_tracing=True can at times cause a slight performance hit, I'd be curious to see if turning it off will help.

colin-ho avatar Apr 18 '25 17:04 colin-ho

To try the native runner (do not initialize Ray for this):

# This is the default behavior if Ray is not initialized
daft.set_runner_native()

# Manually set the morsel size to be small -- we are adding some intelligence around this shortly in ~1 week
daft.set_execution_config(default_morsel_size=128)

# Read data
df = daft.read_csv(path=DEFAULT_TOS_META_OUTPUT_FILE, io_config=io_config)

# Native runner does not have the concept of partitions, and this is a no-op
# df = df.repartition(4)

# Download images from URLs and store as binary
df = df.with_column(
    "image_bytes", df["url"].url.download(max_connections=32, on_error="null", io_config=io_config)
)
df = df.with_column("images", df["image_bytes"].image.decode(on_error="null"))

# Generate embeddings
embedding_udf = PictureEmbedding.with_init_args(
    path="/data00/model/dinov2-base"
).with_concurrency(10)  # run 10 of these, as per your Ray Data example
df = df.with_column("embedding", embedding_udf(daft.col("images")))

# Save to Lance format
df = df.select("embedding", "url")
df.write_lance("/data00/tmp.lance")

jaychia avatar Apr 19 '25 02:04 jaychia

Are you running on a single node or in a cluster? If single node, I'd suggest using the native runner first, i.e. set_runner_native. I'd also recommend specifying with_concurrency and batch_size in the picture embedding UDF as well.

Also regarding the execution configs, enable_ray_tracing=True can at times cause a slight performance hit, I'd be curious to see if turning it off will help.

@colin-ho Image It seems that there are no visible indicators.

Jay-ju avatar Apr 19 '25 05:04 Jay-ju

To try the native runner (do not initialize Ray for this):

This is the default behavior if Ray is not initialized

daft.set_runner_native()

Manually set the morsel size to be small -- we are adding some intelligence around this shortly in ~1 week

daft.set_execution_config(default_morsel_size=128)

Read data

df = daft.read_csv(path=DEFAULT_TOS_META_OUTPUT_FILE, io_config=io_config)

Native runner does not have the concept of partitions, and this is a no-op

df = df.repartition(4)

Download images from URLs and store as binary

df = df.with_column( "image_bytes", df["url"].url.download(max_connections=32, on_error="null", io_config=io_config) ) df = df.with_column("images", df["image_bytes"].image.decode(on_error="null"))

Generate embeddings

embedding_udf = PictureEmbedding.with_init_args( path="/data00/model/dinov2-base" ).with_concurrency(10) # run 10 of these, as per your Ray Data example df = df.with_column("embedding", embedding_udf(daft.col("images")))

Save to Lance format

df = df.select("embedding", "url") df.write_lance("/data00/tmp.lance")

@jaychia fallback native engine cause error

Error when running pipeline node ActorPoolProject
Traceback (most recent call last):
  File "/data00/code/Daft/tests/functions/test_daft.py", line 165, in <module>
    main()
  File "/data00/code/Daft/tests/functions/test_daft.py", line 156, in main
    df.write_lance("/data00/tmp.lance")
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/api_annotations.py", line 26, in _wrap
    return timed_method(*args, **kwargs)
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/analytics.py", line 199, in tracked_method
    result = method(*args, **kwargs)
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/dataframe/dataframe.py", line 1245, in write_lance
    write_df.collect()
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/api_annotations.py", line 26, in _wrap
    return timed_method(*args, **kwargs)
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/analytics.py", line 199, in tracked_method
    result = method(*args, **kwargs)
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/dataframe/dataframe.py", line 3037, in collect
    self._materialize_results()
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/dataframe/dataframe.py", line 3018, in _materialize_results
    self._result_cache = context.get_or_create_runner().run(self._builder)
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/runners/native_runner.py", line 62, in run
    results = list(self.run_iter(builder))
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/runners/native_runner.py", line 90, in run_iter
    yield from results_gen
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/execution/native_executor.py", line 37, in <genexpr>
    return (
  File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/execution/actor_pool_udf.py", line 45, in eval_input
    self.handle_conn.send(MicroPartition._from_pymicropartition(input))
  File "/root/miniconda3/envs/py310/lib/python3.10/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/root/miniconda3/envs/py310/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object '_ensure_registered_super_ext_type.<locals>.DaftExtension'

Jay-ju avatar Apr 19 '25 05:04 Jay-ju

Ah, thats a bug. It should be fixed in version 0.4.11. The issue was because extension types like embedding are not able to be pickled and sent between processes.

colin-ho avatar Apr 22 '25 21:04 colin-ho

Closing as the extension types should work on native runner now

colin-ho avatar Aug 06 '25 18:08 colin-ho