mars icon indicating copy to clipboard operation
mars copied to clipboard

[Proposal] Apply operand closure cleanup

Open chaokunyang opened this issue 3 years ago • 5 comments

For function calls such as DataFrame.apply, DataFrameGroupby.apply, Series.apply, etc., the user will pass in a custom function, i.e. UDF. Mars will serialize the custom function multiple times during graph building, scheduling and execution. If the custom function captures a large amount of data, such as pandas DataFrame internally, it will cause bottlenecks in scheduling and execution:

  • In the tile phase, the udf will be serialized when the key is generated for each operator
  • In the tile phase, the udf is serialized when the key is generated for each chunk
  • In the stage of generating SubtaskGraph, the udf will be serialized when the logic key is generated for each chunk
  • When submitting each Subtask to the Worker main pool, the udf will be serialized for each Subtask
  • When receiving Subtask in Worker main pool, it will deserialize the UDF
  • When the Worker main pool submits each Subtask to the Worker sub pool, the udf will be serialized for each Subtask
  • When the Worker sub pool receives the Subtask, the UDF will be deserialized

Although the serialization of udf functions can be cached by custom Mars serialization (currently cached when calculating chunk key/logic key in our inner codebase), if there is a large amount of data in udf, the result of serialization of a single Subtask may have Hundreds of megabytes, repeatedly sending these data to the Worker main pool and Sub pool during Subtask scheduling, which cause Supervisor bottlenecks and scheduling delays. On the other hand, if a single Subtask object is too large, it will also lead to a large amount of lilneage storage overhead, resulting in task failure since lilneage are evicited. Based on this, we propose a function closure cleaner at the Mars operator level, clean up the function closure in the tile phase, serialize the closure result into Storage, and then the operator only holds the reference of the closure stored in the storage. Before the operator is executed, the closure object is obtained from Storage, the udf is restored, and then the calculation is performed.

Proposal

Function closure cleanup

The function closure object is in the closure attribute of the function. We can get the closure first, then put it into the storage, and then get the specific closure object from the storage to restore it when executing. The pseudo code is as follows:

@classmethod
def tile(cls, op):
    closure = getattr(func, "__closure__")
    closure_ref = storage_api.put(closure)
    self.closure_ref = closure_ref
    if closure is not None:
        closure.__closure__ = None
        
@classmethod
def execute(cls, ctx, op):
    if self.closure_ref is not None:
        closure =  storage_api.get(self.closure_ref)
        func.__closure__ = closure
     # execute operand

Note: If the function closure is relatively small, it can be directly inlined in the code.

Callable cleanup

The UDF passed in by the user may not be a function, but a subclass that implements the callable method. In this case, the captured object cannot be obtained through the closure attribute. In this case special clipping of callable object is required. If the object implements python's reduce or getstate method, further processing is required

chaokunyang avatar Jul 20 '22 07:07 chaokunyang

The function can't be executed correctly if the corresponding closure is missing. So, Why not put the entire UDF to the storage ?

My suggestion about this proposal:

  1. Put the entire UDF (not only the closure) to the storage.
  2. Only do this if the UDF is very large.
  3. Provide an option for users to disable this feature.
  4. Abstract an API instead of use StorageAPI directly because some execution backends may not able to access the StorageAPI.
  5. Manage the UDF storage object by lifecycle.

fyrestone avatar Aug 04 '22 03:08 fyrestone

This comment is used to track relative developments.

  • [x] Basic function of closure clean up and corresponding application on Apply Operand and GroupByApply Operand. #3205
  • [x] Application on DataFrameMapChunk Operand and callable. #3238
  • [x] Lifecycle management of UDF storage object. #3238

vcfgv avatar Aug 23 '22 09:08 vcfgv

After discussions, storage service on supervisor will remain empty. As a result, some refinements on #3205 will come up soon.

vcfgv avatar Sep 13 '22 07:09 vcfgv

This comment is used to track further developments.

  • [ ] Under ray backend, UDF could be cleaned up when generating TileableGraph to avoid expenses of serialization and deserialization between client and supervisor.
  • [ ] Size of UDF could be calculated by the size of serialized UDF.

vcfgv avatar Oct 18 '22 09:10 vcfgv

This comment is used to track further directions

  • [ ] Operand may take large-scale arguments as op.args which are out of expectation but need cleanup as well.

vcfgv avatar Oct 26 '22 07:10 vcfgv