FedScale icon indicating copy to clipboard operation
FedScale copied to clipboard

[Async simulation] Implementation idea for task scheduling

Open ewenw opened this issue 1 year ago • 1 comments

Description

Hi FedScale team, here's my suggestion on how to implement the async simulation mode using device traces without needing a constant arrival parameter (related to #162):

sort device traces by start time
queue = initialize min priority queue
while tasks_issued < buffer_size:
   event_time, event_type, client_id = queue.get()
   if event_type == 'start':
        current_concurrency += 1
        if current_concurrency < MAX_CONCURRENCY:
            issue_task(event_time)
    else:
        current_concurrency -= 1
        if current_concurrency == MAX_CONCURRENCY - 1:
            issue_task(event_time)

issue_task(event_time):
    client, trace_start, trace_end = sample next available client at event_time
    add client task to individual executor's queue
    queue.put((trace_start, 'start', client))
    queue.put((trace_end, 'end', client)

This works well in my implementation, but might be harder to integrate into fedscale, hence I'm creating an issue to document it. Let me know if you have any questions / concerns.

Below is the python code for this scheduling algorithm, feel free to run it and validate the output:

import random
from queue import PriorityQueue

id = 0


def generate_start_end(time):
    # next available client
    global id
    start_time = time + random.randint(0, 1)
    duration = random.randint(1, 3)
    id += 1
    return start_time, start_time + duration, id


min_pq = PriorityQueue()
total_tasks = 1

TOTAL_TASKS = 10
MAX_CONCURRENCY = 1
current_concurrency = 0
start_times = {}


def new_task(event_time):
    new_start, new_end, client_id = generate_start_end(event_time)
    min_pq.put((new_start, 'start', client_id))
    min_pq.put((new_end, 'end', client_id))
    start_times[client_id] = new_start


new_task(0)
while not min_pq.empty():
    event_time, event_type, client_id = min_pq.get()
    if event_type == 'start':
        current_concurrency += 1
        if total_tasks < TOTAL_TASKS and current_concurrency < MAX_CONCURRENCY:
            new_task(event_time)
            total_tasks += 1
    else:
        current_concurrency -= 1
        if total_tasks < TOTAL_TASKS and current_concurrency == MAX_CONCURRENCY - 1:
            new_task(event_time)
            total_tasks += 1
        print(f"processing event starting at {start_times[client_id]} and ending at {event_time}")

Use case

No response

ewenw avatar Sep 12 '22 23:09 ewenw

Great! Thanks a lot! Actually, Amber pushed a similar idea yesterday #173, which of course needs more efforts. We will work on this once we have more bandwidth.

fanlai0990 avatar Sep 13 '22 00:09 fanlai0990