ecosystem
ecosystem copied to clipboard
Failing to save the model pickle. and how to check for the predictions?
Hi Team ,
I am trying to train the Tensorflow LSTM model over the Spark after the epoch training, it is failing with the below error:-
==========================
Doing CPU training...
Will run with 8 Spark tasks.
/usr/local/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:1844: UserWarning: Model.fit_generator
is deprecated and will be removed in a future version. Please use Model.fit
, which supports generators.
warnings.warn('Model.fit_generator
is deprecated and '
2022-02-10 11:41:59.702186: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)
2022-02-10 11:41:59.718653: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2999995000 Hz
Epoch 1/5
247/247 [==============================] - 141s 543ms/step - loss: 0.0380 - mean_absolute_error: 0.1516 - val_loss: 0.0141 - val_mean_absolute_error: 0.0775
Epoch 2/5
247/247 [==============================] - 129s 522ms/step - loss: 0.0126 - mean_absolute_error: 0.0821 - val_loss: 0.0103 - val_mean_absolute_error: 0.0648
Epoch 3/5
247/247 [==============================] - 137s 554ms/step - loss: 0.0094 - mean_absolute_error: 0.0695 - val_loss: 0.0082 - val_mean_absolute_error: 0.0519
Epoch 4/5
247/247 [==============================] - 149s 605ms/step - loss: 0.0079 - mean_absolute_error: 0.0614 - val_loss: 0.0070 - val_mean_absolute_error: 0.0479
Epoch 5/5
247/247 [==============================] - 132s 536ms/step - loss: 0.0067 - mean_absolute_error: 0.0567 - val_loss: 0.0060 - val_mean_absolute_error: 0.0422
Distributed training in progress...
View Spark executor stderr logs to inspect training...
Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/serializers.py", line 437, in dumps
return cloudpickle.dumps(obj, pickle_protocol)
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 101, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 540, in dump
return Pickler.dump(self, obj)
File "/usr/lib64/python3.7/pickle.py", line 437, in dump
self.save(obj)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/usr/local/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py", line 664, in _save_reduce_pickle5
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 789, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 638, in save_reduce
save(args)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 774, in save_tuple
save(element)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 549, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib64/python3.7/pickle.py", line 662, in save_reduce
save(state)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 885, in _batch_setitems
save(v)
File "/usr/lib64/python3.7/pickle.py", line 504, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python3.7/pickle.py", line 859, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib64/python3.7/pickle.py", line 884, in _batch_setitems
save(k)
File "/usr/lib64/python3.7/pickle.py", line 524, in save
rv = reduce(self.proto)
TypeError: can't pickle weakref objects
======================================
- Are there any example on the model prediction after the distributed training using MirroredStrategyRunner.?
I am sorry I can not help you with the first part, I also would like to know if and how that would work.
However, the second part works at least if you put training and prediction in one function. I tried it by adapting from the official example at https://docs.microsoft.com/en-us/azure/databricks/_static/notebooks/deep-learning/spark-tensorflow-distributor.html
### 1.) Combined train and predict (helper functions listed below)
def train_and_predict( batch_size ):
train_datasets = make_training_dataset(batch_size)
train_dataset_x , train_dataset_y = make_prediction_dataset(batch_size)
multi_worker_model = build_and_compile_cnn_model()
multi_worker_model.fit(x=train_datasets, epochs=3, steps_per_epoch=5)
prediction = multi_worker_model.predict( x=train_dataset_x , batch_size=batch_size )
return prediction
### 2.) Execute with Runner
from spark_tensorflow_distributor import MirroredStrategyRunner
runner = MirroredStrategyRunner(num_slots=NUM_SLOTS, local_mode=False, use_gpu=USE_GPU)
prediction = runner.run( train_and_predict , batch_size=batch_size )
---------------------------------------------------------------------
### A.) Functions to generate data
def load_mnist():
(x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
# The `x` arrays are in uint8 and have values in the [0, 255] range.
# You need to convert them to float32 with values in the [0, 1] range.
x_train = x_train / np.float32(255)
y_train = y_train.astype(np.int64)
return x_train, y_train
def make_training_dataset(batch_size):
x_train, y_train = load_mnist()
train_dataset = tf.data.Dataset.from_tensor_slices(
(x_train, y_train) ).repeat().batch(batch_size)
# Specify the data auto-shard policy: DATA
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
train_dataset = train_dataset.with_options(options)
return train_dataset
def make_prediction_dataset(batch_size):
x_train, y_train = load_mnist()
train_dataset_x = tf.data.Dataset.from_tensor_slices( x_train ).batch(batch_size)
train_dataset_y = tf.data.Dataset.from_tensor_slices( y_train ).batch(batch_size)
# Specify the data auto-shard policy: DATA
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
train_dataset_x = train_dataset_x.with_options(options)
train_dataset_x = train_dataset_x.with_options(options)
return train_dataset_x , train_dataset_y
### B.) function to build model
def build_and_compile_cnn_model():
model = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=(28, 28)),
tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
tf.keras.layers.Conv2D(32, 3, activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dense(10, activation='softmax'),
])
model.compile(
loss=tf.keras.losses.sparse_categorical_crossentropy,
optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'],
)
return model
However, this does not get around the problem of saving the model. I would also like to have the answer to that question??