dask-examples
dask-examples copied to clipboard
Use an already trained Keras model to predict on lots of data
A common approach is to train on a bit of data and then use that trained model to predict on lots of data. We could do this using ParallelPostFit in dask-ml, or we can use X.map_blocks
or df.map_partitions
. In either case we might want to be a bit careful about avoiding repeated serializations costs. For example, in the following case I suspect that we include the serialized model in every task
# maybe bad?
model = load_model()
predictions = X.map_blocks(model.predict)
It's probably better to encourage the user to keep the model delayed
# maybe bad?
model = dask.delayed(load_model)()
predictions = X.map_blocks(model.predict)
We should also ensure that dask-ml does this correctly, and includes the model as a single task in the graph so that it gets sent around appropriately (cc @TomAugspurger )
I'm also generally curious if a Keras model that lives on the GPU will eventually make its way back onto the GPU when deserializing.
FYI, I started on this at https://gist.github.com/TomAugspurger/2889a052b5fec4d691f83ba2062d2d92
As you predicted X.map_blocks(model.predict)
was slow.
I stopped as soon as I hit an error, and didn't do any profiling yet. I'll pick it up again soon, but don't want else to duplicate effort.
That's great. When I started looking into this I quickly became lost on how to set up the problem. You appear to have enough practical experience that that's not much of an issue for you.
I'll put this on my TODO list. Now that there is a clear thing to optimize/fix it's much easier for me.
On Thu, Oct 25, 2018 at 5:29 PM Tom Augspurger [email protected] wrote:
FYI, I started on this at https://gist.github.com/TomAugspurger/2889a052b5fec4d691f83ba2062d2d92
As you predicted X.map_blocks(model.predict) was slow.
I stopped as soon as I hit an error, and didn't do any profiling yet. I'll pick it up again soon, but don't want else to duplicate effort.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/dask/dask-examples/issues/35#issuecomment-433212328, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszEclxY5admrzWVLQctian5DR8h5qks5uoi0ogaJpZM4WV1oF .
Oh, and /profile-server
is going to be extremely useful here. On a whim, I tried X.map_blocks(delayed(model.predict))
and the scheduler has been at 100% CPU for a minute while the workers are idle.
Right I think I'm stalled on deserializing the TensorFlow graph in a new process https://gist.github.com/33efb49efe611701ef122f577d0e0430
Probably putting this on the backburner for now, if others want to take a look.
@TomAugspurger @mrocklin Once we have our stacked
delayed dask array, can't we just generate batches of data from it on the fly? Something like this
data = [dd.array.from_delayed(x, shape=(224,224, 3),dtype=np.float32) for x in images]
nb_batches = 100
for i in range(100):
batch_images, batch_labels = next(data) #just an example to show.
model.train_on_batch(batch_images, batch_labels)
Is there any way to do this?
It depends on what you mean by "batch" I guess. You can slice into x in a variety of ways
index = np.random.randint(0, x.shape[0], size=10)
batch = x[index]
Some ways of slicing will be cheap (like above), some won't, depending on chunk structure.
Thanks @mrocklin I would elaborate a bit on that. Say I have 50,000
images on my disk. I cannot load all the data in memory once. In normal case, we would use a generator
that yields batch of data. For example, for a batch size of 32
, each batch would contain 32
images. This batch is then fed into the model and trained on it.
Now, with a simple python generator we are using only one core. So, instead of using a python generator, let us say we get delayed
dask arrays as
data = [dd.array.from_delayed(x, shape=(224,224, 3),dtype=np.float32) for x in images]
The shape of the final array would be (50000, 224, 224, 3)
. I am asking that what is the best way to iterate over this delayed array
, such that on each iteration, I get a chunk of data containing 32
images
The same as you would with NumPy
for i in range(0, x.shape[0], 32):
chunk = x[i:i+32, ...]
chunk is a dask array here. I'm not sure if that's what you want. You might want to call compute or delay the fit call (although Keras has issues sometimes with moving to other threads).
Cool. Thanks a lot for your time. Yeah, I am aware of those issues, and that is why I just want to use dask
for batch generation and no delayed calls to fit
.
@AakashKumarNain I have a similar use case, did you find performance improvements when transitioning from Numpy to dask, reading image slices from file?
@AakashKumarNain same question on this. What code did you end up using? How was the performance? I want to use a keras.utils.Sequence subclass to leverage keras fit_generator, so I'm thinking something that keeps the images in a dask array and then loads each batch into memory:
class DaskImageSequence(keras.utils.Sequence):
def __init__(self, x: dask.array, y: dask.array, batch_size: int):
self.x = x
self.y = y
self.batch_size = batch_size
def __len__(self):
len_x = self.x.shape[0].compute()
return int(np.ceil(len_x / self.batch_size))
def __getitem__(self, batch_num) -> Tuple[np.ndarray, np.ndarray]:
batch_x = self.x[batch_num * self.batch_size:(batch_num + 1) * self.batch_size].compute()
batch_y = self.y[batch_num * self.batch_size:(batch_num + 1) * self.batch_size].compute()
return batch_x, batch_y
@skeller88 I didn't try it. I was trying to benchmark it with tf.dataset. But this certainly looks good to this point.
@mrocklin I think I'm stumbling on the exact issue "#maybe bad" mentioned at the top.
psuedo code (working on reproducible)
We have a large number of numpy arrays (geospatial tiles) and an object detection model.
This works in serial.
model = create_model()
results = []
for tile in tilelist:
boxes = model.predict_tile(tile)
results.append(boxes)
Following your thought from above, this
results = []
for tile in tilelist:
model = dask.delayed(create_model)()
boxes = dask.delayed(model.predict_tile)(tile)
results.append(boxes)
all_boxes = dask.compute(*results)
has some sort of multiprocessing tensorflow error
builtins.ValueError: Tensor Tensor("filtered_detections/map/TensorArrayStack/TensorArrayGatherV3:0", shape=(?, 300, 4), dtype=float32) is not an element of this graph.
one level up on traceback is tensorflow/python/framework/ops.py", line 3796, in as_graph_element
with self._lock:
return self._as_graph_element_locked(obj, allow_tensor, allow_operation)
testing on LocalCluster on CPU, but will eventually move to SLURM with GPUs.
perhaps related to (https://github.com/dask/distributed/pull/878, https://github.com/dask/dask-ml/issues/281)
~~For anyone that comes looking, i'm giving up here because I think its a bit of a red herring. I was just using LocalCluster to run tests, and my sense is that is part of the problem. I can see that keras serialization is on going challenge, and my ultimate goal is to get this running on a SLURM cluster, which in this case might be quite a bit simpler. Leaving this note here for others.~~ I will open a reproducible example tomorrow. The problem persists on dask-jobqueue, and I've looked through all the pertinent issues and I feel like the answer is known, but not obvious documented.
I created a working example here for those who find this link: https://github.com/dask/distributed/issues/2333
This keeps coming up. I'm adding it to the core maintenance project board. https://stackoverflow.com/questions/61924824/how-to-do-model-predict-using-distributed-dask-with-a-pre-trained-keras-model
@mrocklin happy to help if I can with tests, I use this kind of workflow frequently with dask-jobqueue for submitting to GPU clusters
On Sat, May 23, 2020 at 10:16 AM Matthew Rocklin [email protected] wrote:
This keeps coming up. I'm adding it to the core maintenance project board.
https://stackoverflow.com/questions/61924824/how-to-do-model-predict-using-distributed-dask-with-a-pre-trained-keras-model
— You are receiving this because you commented.
Reply to this email directly, view it on GitHub https://github.com/dask/dask-examples/issues/35#issuecomment-633098784, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAJHBLA3DWCZVLP33LRE523RTAAITANCNFSM4FSXLICQ .
-- Ben Weinstein, Ph.D. Postdoctoral Fellow University of Florida http://benweinstein.weebly.com/