filesystem_spec
filesystem_spec copied to clipboard
feature: scoped callbacks
Currently, it's hard in fsspec to deregister/close callbacks, especially branched/nested ones after one put_file/get_file operation is done. In dvc, we are trying to do batched uploads for better performance, which means using get/put instead and letting filesystems batch as they see fit. We may be uploading thousands of files, so that'd mean we'd have thousands of callbacks/progress bars. And currently there is no easy way to deregister/close the callbacks once the single operation is finished (we don't want to be left with 100s of finished/stuck progress bars).
- You have to depend upon logic like
self.value >= self.sizeto detect if the callback can be closed, which is fragile. - Or,
__del__which you cannot really depend on or should not depend on. - You have to trust the filesystem to call
set_sizeandrelative_update/absolute_update. This may not always happen or be possible. Sometimes, fs may not providesizeor may be expensive to fetch/compute. Sometimerelative_update/absolute_updatemay be unavailable (eg: when we use clone/reflink files). - The callback will persist even in errors, where you want to upload as much as possible before giving the control back. This may already be the case in AsyncFileSystem where we use
asyncio.gather(), the scheduled tasks still run until we cancel them.
So, my proposal is to make callback a contextmanager.
class Callback:
def __enter__(self):
return self
def __exit__(self, *exc_args):
self.close()
def close(self):
pass
The fsspec.callbacks.Callback will provide a close/__enter__/__exit__ methods. And by default, __exit__ will call close so that we can still close the callbacks without using as a contextmanager. The callback can get access to the exception if it extends __exit__ (which should be a rare usecase).
The branch() also needs to return the instance so that we can use it as in a with statement, instead of just adding to the kwargs. (this is backward-incomaptible though).
Usage
with TqdmCallback() as callback:
fs.put_file("file", "file", callback=callback)
With branches
def put(self, lpath, rpath, **kwargs):
callback.set_size(len(rpaths))
for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
with callback.branch(lpath, rpath, kwargs):
self.put_file(lpath, rpath, **kwargs)
Note that DVC already extends this to do just that: https://github.com/iterative/dvc-objects/blob/929ecec7e4607e410a63502681b13f0e4c15e43d/src/dvc_objects/fs/callbacks.py.
We can fix backward incompatibility in two ways in branch (if we don't want to introduce a new API or want to preserve compatibility):
- Make
callback.branchreturn aCallbackinstance if they want to be scoped.
child = callback.branch(lpath, rpath, kwargs)
with (child or nullcontext(_DEFAULT_CALLBACK)) as callback:
self.put_file(lpath, rpath, **{"callback": callback, **kwargs})
- Reuse
kwargs["callback"]` that the branch adds.
callback.branch(lpath, rpath, kwargs)
with kwargs.get("callback", nullcontext()):
self.put_file(lpath, rpath, **kwargs)
We could also break branch() and make it return an instance of Callback instead of mutating kwargs (default being NoOpCallback).
with callback.branch(lpath, rpath, kwargs) as child:
self.put_file(lpath, rpath, callback=child, **kwargs)
Sorry I have not had a chance to comment here yet, it is still on my list.
I appreciate the pain of having many callback instances, and those potentially persisting following an exception.
I suppose that the callback API is pretty immature still, and breakages to it could be allowable, but obviously not ideal. We do not explicitly require callbacks to be a subclass of Callback, but it is a reasonable assumption - so adding context hooks to the base class as you suggest can cover almost all cases.
I'm not really sure which of the three possibilities feels right without seeing them in full, so I will need to trust your intuition. I don't know if there are many/any callback implementations out there beyond fsspec and dvc, but it would be a shame to break them. Of course, the internal ones in fsspec we can change in step and assure that everything works.
Certainly, the original method of mutating the kwargs feels and felt pretty unclean. If we do need to break things or otherwise change things around, it would be nice to be explicit.