maggy
maggy copied to clipboard
Can Maggy be used with a Spark Cluster that uses YARN
I was wondering how Maggy knows Question 1: How to contact Spark's driver to make the RPC calls in another Spark infrastructure that is not Hopsworks. Does having a resource manager such as YARN on top of Spark Cluster affect how Maggy should make RPC requests to the Spark driver ? or should it work as normal?
Question 2: If I opt to go for "you can deploy an entire Hopsworks instance to your own AWS account" as also explained here (https://hopsworks.readthedocs.io/en/stable/getting_started/installation_guide/platforms/aws-image.html), a two t2.2xlarge instance type that has 8 vCPUs and 32 GB RAM is a single host and not a cluster. Does the 8 available vCPU equate to the Executors that Spark will use ? Meaning that if I run my trials and would like to experiment the results from different executors I will only have a maximum of 8 available executors, unless if I increase the instance type ?
The reason behind Question 1: is because in this example here (https://github.com/logicalclocks/maggy/blob/master/examples/maggy-ablation-titanic-example.ipynb), a spark session is created but I cannot explicitly see the point where Maggy hands over/submits jobs to Spark. For Instance, In some of the industrial set ups, one would interact with Spark by creating a Spark session then submitting the job like so;
Creating spark session
spark = SparkSession \ .builder \ .appName('spark-ipython') \ .config('spark.shuffle.service.enabled', 'true') \ .config('spark.executor.memory', '2844M') \ .config('spark.dynamicAllocation.enabled', 'true') \ .config('spark.dynamicAllocation.minExecutors', '0') \ .config('spark.dynamicAllocation.maxExecutors', '100') \ .getOrCreate()
Submitting Job to Spark Cluster
spark-submit --master yarn --deploy-mode cluster --archives hdfs:///somelocation/Python.zip#Python --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./Python/bin/python3 --conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=./Python/bin/python3 main.py
whereby main.py contains Maggy code
Example of main.py file
` from maggy import Searchspace
### The searchspace can be instantiated with parameters
sp = Searchspace(kernel=('INTEGER', [2, 8]), pool=('INTEGER', [2, 8]))
# Or additional parameters can be added one by one
sp.add('dropout', ('DOUBLE', [0.01, 0.99]))
from maggy import experiment
from maggy.callbacks import KerasBatchEnd
#########
### maggy: hyperparameters as arguments and including the reporter
#########
def keras(kernel, pool, dropout, reporter):
from tensorflow.python import keras
import tensorflow as tf
from tensorflow.python.keras.datasets import mnist
from tensorflow.python.keras.models import Sequential
from tensorflow.python.keras.layers import Dense, Dropout, Flatten
from tensorflow.python.keras.layers import Conv2D, MaxPooling2D
from tensorflow.python.keras.callbacks import TensorBoard
from tensorflow.python.keras import backend as K
import math
batch_size = 512
num_classes = 10
epochs = 1
# Input image dimensions
img_rows, img_cols = 28, 28
# The data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
if K.image_data_format() == 'channels_first':
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')
x_train /= 255
x_test /= 255
print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
# Convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)
model = Sequential()
model.add(Conv2D(32, kernel_size=(kernel, kernel),
activation='relu',
input_shape=input_shape))
model.add(Conv2D(64, (kernel, kernel), activation='relu'))
model.add(MaxPooling2D(pool_size=(pool, pool)))
model.add(Dropout(dropout))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(dropout))
model.add(Dense(num_classes, activation='softmax'))
opt = keras.optimizers.Adadelta(1.0)
model.compile(loss=keras.losses.categorical_crossentropy,
optimizer=opt,
metrics=['accuracy'])
#########
### maggy: REPORTER API through keras callback
#########
callbacks = [KerasBatchEnd(reporter, metric='acc')]
model.fit(x_train, y_train,
batch_size=batch_size,
callbacks=callbacks, # add callback
epochs=epochs,
verbose=1,
validation_data=(x_test, y_test))
score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
#########
### maggy: return the metric to be optimized, test accuracy in this case
#########
return score[1]
`