ray icon indicating copy to clipboard operation
ray copied to clipboard

[Core][Labels Scheduling]Finalize the new node affinity scheduling with node labels API in the Python worker

Open larrylian opened this issue 1 year ago • 1 comments

Description

Several directions of discussion for this issue are:

  1. Whether to reuse the "scheduling_strategy" field in "options", or to add new "labels" or "affinity" fields.
  2. Which expression to use for labels in node affinity.

After our offline discussion, we have come up with the following plan. This API plan will review and finalization by more people in the future.

object_ref = Task.options(
        scheduling_strategy=node_affinity({
          "node_id": IN("aaa", "bbb"),
          "gpu_type": NOT_IN("A100", "P100", soft=True),
          "availability_zone": EXISTS(soft=True),
          "taints": DOES_NOT_EXISTS(),
    })
    ).remote()

Why reuse the "scheduling_strategy" field in "options"?

  • a) We will reuse the "scheduling_strategy" field, and if necessary, we can separate it into a new field in the future. If we create a new field now, it may be difficult to modify it later.
  • b) Adding a new field would require consideration of scenarios where Resource, labels, and scheduling_strategy all affect scheduling simultaneously, which could make things very complicated.

Here are several alternative plans that we have discussed:

Plan 1:

use case:

# Scheduled to a node with a specific IP.
actor_1 = Actor.options(
        scheduling_strategy=node_affinity(label_in(key="node_ip", values=["xxx.xxx.xx.xx"], is_soft=false))
    ).remote()

# Try to schedule to the node with A100/P100 graphics card. If not, schedule to other nodes.
actor_1 = Actor.options(
        scheduling_strategy=node_affinity(label_in("gpu_type", ["A100", "P100"], is_soft=true))
    ).remote()

# Do not schedule to the two nodes whose node id is "xxxxxxx"\"aaaaaaaa".
actor_1 = Actor.options(
        scheduling_strategy=node_affinity(label_not_in("node_id", ["xxxxxxx", "aaaaaaaa"], is_soft=false))
    ).remote()

# Schedule to the node with the key label exist "gpu_type".
actor_1 = Actor.options(
        scheduling_strategy=node_affinity(label_exist("gpu_type"))
    ).remote()

# Don't schedule to the node with the key label exist "gpu_type".
object_ref = Task.options(
        scheduling_strategy=node_affinity(label_does_not_exist("gpu_type", is_soft=false))
    ).remote()

# Multiple label expressions can be filled in at the same time, and the relationship between the expressions is "and". The dispatch must satisfy each expression.
# The actual meaning of this expression is that it must be scheduled to a node with a GPU, and as much as possible to a node with a GPU of the A100 type.
actor_1 = Actor.options(
        scheduling_strategy=node_affinity([
            label_in("gpu_type", ["A100"], true),
            label_exists("gpu_type", false)
        ])
    ).remote()

Implementation:

@PublicAPI(stability="beta")
class NodeAffinitySchedulingStrategy:
    def __init__(self, node_id: str = None, soft: bool = False, _spill_on_unavailable: bool = False, match_expressions = []):
        # This will be removed once we standardize on node id being hex string.
        if not isinstance(node_id, str):
            node_id = node_id.hex()

        self.node_id = node_id
        self.soft = soft
        self.match_expressions = match_expressions
        self._spill_on_unavailable = _spill_on_unavailable

SchedulingStrategyT = Union[
    None,
    str,  # Literal["DEFAULT", "SPREAD"]
    PlacementGroupSchedulingStrategy,
    NodeAffinitySchedulingStrategy,
]

class LabelMatchOperator(Enum):
    IN = "IN"
    NOT_IN = "NOT_IN"
    EXISTS = "EXISTS"
    DOES_NOT_EXIST = "DOES_NOT_EXIST"

class LabelMatchExpression:
    def __init__(self, key: str, operator: LabelMatchOperator,
                 values: List[str], soft: bool):
        self.key = key
        self.operator = operator
        self.values = values
        self.soft = soft

def label_in(key, values, is_soft=False):
    return LabelMatchExpression(key, LabelMatchOperator.IN,values, is_soft)

def label_not_in(key, values, is_soft=False):
    return LabelMatchExpression(key, LabelMatchOperator.NOT_IN,values, is_soft)


def label_exists(key, is_soft=False):
    return LabelMatchExpression(key, LabelMatchOperator.EXISTS, [], is_soft)


def label_does_not_exist(key, is_soft=False):
    return LabelMatchExpression(key, LabelMatchOperator.DOES_NOT_EXIST, [], is_soft)

def node_affinity(match_expressions: List[LabelMatchExpression]):
    return NodeAffinitySchedulingStrategy(match_expressions= match_expressions)

Plan 2:

Replace "node_affinity" with "NodeAffinitySchedulingStrategy". use case:

actor_1 = Actor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(label_in(key="node_ip", values=["xxx.xxx.xx.xx"], is_soft=false))
    ).remote()

actor_1 = Actor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(label_in("gpu_type", ["A100", "P100"], is_soft=true))
    ).remote()

actor_1 = Actor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(label_not_in("node_id", ["xxxxxxx", "aaaaaaaa"], is_soft=false))
    ).remote()

actor_1 = Actor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(label_exist("gpu_type"))
    ).remote()

object_ref = Task.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(label_does_not_exist("gpu_type", is_soft=false))
    ).remote()

actor_1 = Actor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy([
            label_in("gpu_type", ["A100"], true),
            label_exists("gpu_type", false)
        ])
    ).remote()

Plan 3

use case:

actor = Actor.options(
    labels={
        "node_id": LabelIn("aaa", "bbb"),
        "gpu_type": LabelNotIn("A100", "P100", soft=True),
        "availability_zone": LabelExists(soft=True),
        "market": LabelIn("spot"),
        "taints": LabelDoesNotExist,
    }
).remote()

larrylian avatar Jun 14 '23 15:06 larrylian

actor = Actor.options(
    labels={
        "node_id": ["aaa", "bbb"],
        "market": "spot",
    }
).remote()
actor = Actor.options(
    LabelFilter(
    	must_in_set={
	        "node_id": {"aaa", "bbb"},
	        "market": {"spot"},
	    },
	    prefer_in_set={"gpu_type": {"A100", "P100"}},
	    prefer_have=["availability_zone"],
	    exclude=["taints"],

	),
).remote()
label_filter="node_id in (aaa bbb) && !gpu_type in (a100 p100) && az exists"

jjyao avatar Jun 15 '23 01:06 jjyao

After more research, I found the following expressiveness gaps between our proposed API and k8s one:

  1. k8s support OR, AND and NOT for complete boolean expression: You can do ((A AND B) OR (C AND D) OR (E AND F)).
If you specify multiple terms in nodeSelectorTerms associated with nodeAffinity types, then the Pod can be scheduled onto a node if one of the specified terms can be satisfied (terms are ORed).

If you specify multiple expressions in a single matchExpressions field associated with a term in nodeSelectorTerms, then the Pod can be scheduled onto a node only if all the expressions are satisfied (expressions are ANDed).
  1. for the same key you can have multiple operators: "k1" EXISTS AND "k1" NotIn ["v1", v2"]
  2. Soft is not per operator but per expression: (A AND B) is soft

Due to the above, I'm proposing a revised API:

scheduling_strategy=LabelSchedulingStrategy(
   hard=[
   {"k2": Exists, "k1": [Exists, NotIn("v1", "v2")]}, {"k2": NotExists, "k1": In("v1")}, {...}
   ], # {AND} OR {AND} OR {AND}
   soft=[
   {...}, {...}, {...}
   ],
)

jjyao avatar Jun 16 '23 18:06 jjyao

@jjyao There are too many nested []\{} in this new scheme API. I think it is too complicated and will make it difficult for users to use.

1. for the same key you can have multiple operators: "k1" EXISTS AND "k1" NotIn ["v1", v2"]: I suggest to use my original plan:

scheduling_strategy=node_affinity([
            label_not_in(key="gpu_type", "A100", "H100", soft=true),
            label_exists(key="gpu_type", soft=false)
 ])

2. For support OR, we can do the extension in the following way
Here the elements of the two arrays are in the relationship of OR.

scheduling_strategy=node_affinity([
[
            label_not_in(key="gpu_type", "A100", "H100", soft=true),
            label_exists(key="gpu_type", soft=false)
 ],
[
            label_not_in(key="gpu_type", "T100", soft=true),
]
])

larrylian avatar Jun 19 '23 11:06 larrylian

After discussion with @larrylian, we have two API proposals:

1

scheduling_strategy=LabelSchedulingStrategy(
   hard=[
   {"k2": Exists, "k1": [Exists, NotIn("v1", "v2")]}, {"k2": NotExists, "k1": In("v1")}, {...}
   ], # {AND} OR {AND} OR {AND}
   soft=[
   {...}, {...}, {...}
   ],
)

2

scheduling_strategy=LabelSchedulingStrategy(
hard=[
[Exists("k2"), Exists("k1"), NotIn("k1", ["v1", "v2"])], [NotExists("k2"), In("k1", ["v1"])], [...]
],  # [AND] OR [AND] OR [AND]
soft=[
[...], [....], [....]
],
)

Open question:

  1. Do we want to support OR in the first version.

jjyao avatar Jun 21 '23 04:06 jjyao

@jjyao I support plan 1 now. I have organized the new format of API and updated it in the ISSUE summary. image

larrylian avatar Jun 25 '23 10:06 larrylian

Based on a given example, option 1 seems much easier to understand (since when people reading code, key -> value is more easier to read and understand);

Actor.options(scheduling_strategy=LabelSchedulingStrategy(
    hard=[{
        "region": [In("west")]
    }]
))

Besides, I feel like schema is too strict + too many keyargs which makes me feel like it is not pythonic. I wonder if we should make the easy case simple like this?

# allow to only specify hard + single predicate
Actor.optinos(scheduling_strategy=Actor.options(scheduling_strategy=LabelSchedulingStrategy(
    {
         "region": In("west")
    }
)

rkooo567 avatar Jun 26 '23 23:06 rkooo567

oh yeah so basically same as ^ comment.

rkooo567 avatar Jun 26 '23 23:06 rkooo567