ray icon indicating copy to clipboard operation
ray copied to clipboard

Time ray data tasks (total time and udf time)

Open omatthew98 opened this issue 1 year ago • 0 comments

Why are these changes needed?

This allows us to track both the total time spent in Ray Data tasks, and the time specifically spent in UDFs (primarily through map and filter functions applied to the dataset). An example output can be found below:

Operator 1 ReadRange->Map(sleep): 25 tasks executed, 25 blocks produced in 49.54s
* Remote wall time: 8.07s min, 20.68s max, 20.16s mean, 503.97s total
* Remote cpu time: 42.02ms min, 113.24ms max, 96.57ms mean, 2.41s total
* UDF time: 8.07s min, 20.67s max, 20.15s mean, 503.75s total
* Peak heap memory usage (MiB): 143890.62 min, 145140.62 max, 144508 mean
* Output num rows per block: 16 min, 41 max, 40 mean, 1000 total
* Output size bytes per block: 128 min, 328 max, 320 mean, 8000 total
* Output rows per task: 16 min, 41 max, 40 mean, 25 tasks used
* Tasks per node: 25 min, 25 max, 25 mean; 1 nodes used
* Extra metrics: {'num_inputs_received': 12, 'bytes_inputs_received': 30893, 'num_task_inputs_processed': 1, 'bytes_task_inputs_processed': 2574, 'bytes_inputs_of_submitted_tasks': 30893, 'num_task_outputs_generated': 1, 'bytes_task_outputs_generated': 328, 'rows_task_outputs_generated': 41, 'num_outputs_taken': 1, 'bytes_outputs_taken': 328, 'num_outputs_of_finished_tasks': 1, 'bytes_outputs_of_finished_tasks': 328, 'num_tasks_submitted': 12, 'num_tasks_running': 11, 'num_tasks_have_outputs': 1, 'num_tasks_finished': 1, 'num_tasks_failed': 0, 'obj_store_mem_internal_inqueue': 0, 'obj_store_mem_internal_outqueue': 0, 'obj_store_mem_pending_task_inputs': 28319, 'obj_store_mem_freed': 2574, 'obj_store_mem_spilled': 0, 'block_generation_time': 20.659513541, 'total_data_tasks_time': 20.75380225, 'total_data_udfs_time': 0, 'cpu_usage': 11, 'gpu_usage': 0, 'ray_remote_args': {'num_cpus': 1, 'scheduling_strategy': 'SPREAD'}}

Dataset iterator time breakdown:
* Total time overall: 49.59s
    * Total time in Ray Data iterator initialization code: 20.79s
    * Total time user thread is blocked by Ray Data iter_batches: 28.79s
    * Total execution time for user thread: 2.71ms
* Batch iteration time breakdown (summed across prefetch threads):
    * In ray.get(): 52.83us min, 484.83us max, 144.99us avg, 3.62ms total
    * In batch creation: 10.71us min, 2.49ms max, 19.49us avg, 19.49ms total
    * In batch formatting: 8.38us min, 355.29us max, 17.13us avg, 17.13ms total

Code to generate the above:

import ray
import sys
import time
from ray.data import DataContext

context = DataContext.get_current()
context.verbose_stats_logs = True

def sleep(x):
    time.sleep(0.5)
    return x

num_rows = sys.argv[1] if len(sys.argv) > 1 else 10

# make this an actor class, don't pass a function
ds = ray.data.range(num_rows).map(sleep)

for _ in ds.iter_batches(batch_size=1):
    continue

print(ds.stats())

Related issue number

Closes #42801

Checks

  • [ ] I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • [ ] I've run scripts/format.sh to lint the changes in this PR.
  • [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
    • [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in doc/source/tune/api/ under the corresponding .rst file.
  • [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • [ ] Unit tests
    • [ ] Release tests
    • [ ] This PR is not tested :(

omatthew98 avatar Feb 17 '24 00:02 omatthew98