QA: Compared the performance differences between Ray data and Daft in the same scenario.
Is your feature request related to a problem?
I tried to make some comparisons between the custom embedding scenarios of ray data and the scenarios of daft. However, after the comparison, the performance of daft is still a bit below expectations. I would like to consult here if there are any problems with its usage.
Reproduce script:
For Ray
class LoadImageOperator:
def __init__(self):
self.tos_client = create_tos_client(TOS_CONFIG)
self.output_schema = pa.schema([
("url", pa.string()),
("image_bytes", pa.binary()) # Stores serialized image bytes
])
def __call__(self, batch: pa.Table) -> pa.Table:
try:
# Convert URLs and load images
urls = [normalize_s3_path(url.as_py()) for url in batch["url"]]
image_bytes = [self._load_image(url) for url in urls]
print(f"++++++ {len(image_bytes[0])}")
return pa.Table.from_arrays([
batch["url"],
pa.array(image_bytes)
], schema=self.output_schema)
except Exception as e:
print(f"Image loading error: {str(e)[:200]}")
return pa.Table.from_arrays([
batch["url"],
pa.nulls(len(batch), pa.binary())
], schema=self.output_schema)
def _load_image(self, url: str) -> bytes:
with self.tos_client.open(url, "rb") as f:
return f.read()
class EmbeddingOperator:
def __init__(self, model_path: str):
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self._preprocessor = AutoImageProcessor.from_pretrained(model_path)
self._model = AutoModel.from_pretrained(model_path).to(self.device)
self._model.eval()
self.output_schema = pa.schema([
("url", pa.string()),
("embedding", pa.list_(pa.float32(), 768))
])
def __call__(self, batch: pa.Table) -> pa.Table:
start = time.perf_counter()
try:
# Deserialize image bytes
images = [self._bytes_to_image(bytes_data.as_py()) for bytes_data in batch["image_bytes"]]
# Generate embeddings
with torch.no_grad(), torch.cuda.amp.autocast():
inputs = self._preprocessor(
images=images,
return_tensors="pt"
).to(self.device)
outputs = self._model(**inputs)
vectors = torch.mean(outputs.last_hidden_state, dim=1)
l2_norm = torch.norm(vectors, p=2, dim=1, keepdim=True)
embeddings = (vectors / l2_norm).cpu().numpy().tolist()
return pa.Table.from_arrays([
batch["url"],
pa.array(embeddings, type=pa.list_(pa.float32(), 768))
], schema=self.output_schema)
except Exception as e:
print(f"Embedding error: {str(e)[:200]}")
return pa.Table.from_arrays([
batch["url"],
pa.nulls(len(batch), pa.list_(pa.float32(), 768))
], schema=self.output_schema)
def _bytes_to_image(self, bytes_data: bytes) -> Image.Image:
import io
"""Restore image from byte stream"""
s = Image.open(io.BytesIO(bytes_data))
return s
def main():
ray.init()
# Stage 1: CPU image loading
df = ray.data.read_csv(
paths=[DEFAULT_TOS_META_OUTPUT_FILE],
filesystem=create_tos_client(TOS_CONFIG),
override_num_blocks=1000,
)
df = df.map_batches(
LoadImageOperator,
batch_size=2,
num_cpus=0.1, # 0.1 CPU cores per task
concurrency=5, # Adjusted based on total CPU cores
batch_format="pyarrow"
)
df = df.map_batches(
EmbeddingOperator,
fn_constructor_args=("/data00/model/dinov2-base",),
batch_size=BATCH_SIZE,
num_gpus=0.1, # 0.1 GPU per task
num_cpus=0,
concurrency=10, # Adjusted based on GPU count
batch_format="pyarrow"
)
df.write_lance(
"/data00/embeddings.lance",
schema=pa.schema([
("url", pa.string()),
("embedding", pa.list_(pa.float32(), 768))
])
)
df = daft.read_lance("/data00/embeddings.lance").where("embedding is null")
df.show(10)
if __name__ == "__main__":
start = time.perf_counter()
main()
print(f"Total time: {time.perf_counter()-start:.2f}s")
For Daft
daft.set_execution_config(enable_ray_tracing=True, shuffle_algorithm="flight_shuffle")
# Read data
df = daft.read_csv(path=DEFAULT_TOS_META_OUTPUT_FILE, io_config=io_config)
df = df.repartition(4)
# Download images from URLs and store as binary
df = df.with_column(
"image_bytes", df["url"].url.download(max_connections=1024, on_error="null", io_config=io_config)
)
df = df.with_column("images", df["image_bytes"].image.decode(on_error="null"))
# Generate embeddings
embedding_udf = PictureEmbedding.with_init_args(path="/data00/model/dinov2-base")
df = df.with_column("embedding", embedding_udf(daft.col("images")))
# Save to Lance format
df = df.select("embedding", "url")
df.write_lance("/data00/tmp.lance")
Describe the solution you'd like
null
Describe alternatives you've considered
No response
Additional Context
No response
Would you like to implement a fix?
No
What is relatively magical here is that the utilization rate of the GPU is actually higher for daft. However, the end-to-end time of daft is even longer. I am speculating whether ray data is a streaming execution mode of the pipeline, while daft is stage by stage? The main reason for me to perform repartition after read_csv is that the concurrency of csv seems not to meet the requirements. But I haven't seen how to directly adjust the concurrency at the data source layer.
Are you running on a single node or in a cluster? If single node, I'd suggest using the native runner first, i.e. set_runner_native. I'd also recommend specifying with_concurrency and batch_size in the picture embedding UDF as well.
Also regarding the execution configs, enable_ray_tracing=True can at times cause a slight performance hit, I'd be curious to see if turning it off will help.
To try the native runner (do not initialize Ray for this):
# This is the default behavior if Ray is not initialized
daft.set_runner_native()
# Manually set the morsel size to be small -- we are adding some intelligence around this shortly in ~1 week
daft.set_execution_config(default_morsel_size=128)
# Read data
df = daft.read_csv(path=DEFAULT_TOS_META_OUTPUT_FILE, io_config=io_config)
# Native runner does not have the concept of partitions, and this is a no-op
# df = df.repartition(4)
# Download images from URLs and store as binary
df = df.with_column(
"image_bytes", df["url"].url.download(max_connections=32, on_error="null", io_config=io_config)
)
df = df.with_column("images", df["image_bytes"].image.decode(on_error="null"))
# Generate embeddings
embedding_udf = PictureEmbedding.with_init_args(
path="/data00/model/dinov2-base"
).with_concurrency(10) # run 10 of these, as per your Ray Data example
df = df.with_column("embedding", embedding_udf(daft.col("images")))
# Save to Lance format
df = df.select("embedding", "url")
df.write_lance("/data00/tmp.lance")
Are you running on a single node or in a cluster? If single node, I'd suggest using the native runner first, i.e.
set_runner_native. I'd also recommend specifyingwith_concurrencyandbatch_sizein the picture embedding UDF as well.Also regarding the execution configs,
enable_ray_tracing=Truecan at times cause a slight performance hit, I'd be curious to see if turning it off will help.
@colin-ho It seems that there are no visible indicators.
To try the native runner (do not initialize Ray for this):
This is the default behavior if Ray is not initialized
daft.set_runner_native()
Manually set the morsel size to be small -- we are adding some intelligence around this shortly in ~1 week
daft.set_execution_config(default_morsel_size=128)
Read data
df = daft.read_csv(path=DEFAULT_TOS_META_OUTPUT_FILE, io_config=io_config)
Native runner does not have the concept of partitions, and this is a no-op
df = df.repartition(4)
Download images from URLs and store as binary
df = df.with_column( "image_bytes", df["url"].url.download(max_connections=32, on_error="null", io_config=io_config) ) df = df.with_column("images", df["image_bytes"].image.decode(on_error="null"))
Generate embeddings
embedding_udf = PictureEmbedding.with_init_args( path="/data00/model/dinov2-base" ).with_concurrency(10) # run 10 of these, as per your Ray Data example df = df.with_column("embedding", embedding_udf(daft.col("images")))
Save to Lance format
df = df.select("embedding", "url") df.write_lance("/data00/tmp.lance")
@jaychia fallback native engine cause error
Error when running pipeline node ActorPoolProject
Traceback (most recent call last):
File "/data00/code/Daft/tests/functions/test_daft.py", line 165, in <module>
main()
File "/data00/code/Daft/tests/functions/test_daft.py", line 156, in main
df.write_lance("/data00/tmp.lance")
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/api_annotations.py", line 26, in _wrap
return timed_method(*args, **kwargs)
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/analytics.py", line 199, in tracked_method
result = method(*args, **kwargs)
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/dataframe/dataframe.py", line 1245, in write_lance
write_df.collect()
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/api_annotations.py", line 26, in _wrap
return timed_method(*args, **kwargs)
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/analytics.py", line 199, in tracked_method
result = method(*args, **kwargs)
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/dataframe/dataframe.py", line 3037, in collect
self._materialize_results()
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/dataframe/dataframe.py", line 3018, in _materialize_results
self._result_cache = context.get_or_create_runner().run(self._builder)
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/runners/native_runner.py", line 62, in run
results = list(self.run_iter(builder))
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/runners/native_runner.py", line 90, in run_iter
yield from results_gen
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/execution/native_executor.py", line 37, in <genexpr>
return (
File "/root/miniconda3/envs/py310/lib/python3.10/site-packages/daft/execution/actor_pool_udf.py", line 45, in eval_input
self.handle_conn.send(MicroPartition._from_pymicropartition(input))
File "/root/miniconda3/envs/py310/lib/python3.10/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/root/miniconda3/envs/py310/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object '_ensure_registered_super_ext_type.<locals>.DaftExtension'
Ah, thats a bug. It should be fixed in version 0.4.11. The issue was because extension types like embedding are not able to be pickled and sent between processes.
Closing as the extension types should work on native runner now