Keras + Tensorflow and Multiprocessing in Python

Keras + Tensorflow and Multiprocessing in Python

From my experience - the problem lies in loading Keras to one process and then spawning a new process when the keras has been loaded to your main environment. But for some applications (like e.g. training a mixture of Kerasmodels) it's simply better to have all of this things in one process. So what I advise is the following (a little bit cumbersome - but working for me) approach:

  1. DO NOT LOAD KERAS TO YOUR MAIN ENVIRONMENT. If you want to load Keras / Theano / TensorFlow do it only in the function environment. E.g. don't do this:

    import keras

    def training_function(...):
    ...

    but do the following:

    def training_function(...):
    import keras
    ...
  2. Run work connected with each model in a separate process: I'm usually creating workers which are making the job (like e.g. training, tuning, scoring) and I'm running them in separate processes. What is nice about it that whole memory used by this process is completely freed when your process is done. This helps you with loads of memory problems which you usually come across when you are using multiprocessing or even running multiple models in one process. So this looks e.g. like this:

    def _training_worker(train_params):
    import keras
    model = obtain_model(train_params)
    model.fit(train_params)
    send_message_to_main_process(...)

    def train_new_model(train_params):
    training_process = multiprocessing.Process(target=_training_worker, args = train_params)
    training_process.start()
    get_message_from_training_process(...)
    training_process.join()

Different approach is simply preparing different scripts for different model actions. But this may cause memory errors especially when your models are memory consuming. NOTE that due to this reason it's better to make your execution strictly sequential.

How can take advantage of multiprocessing and multithreading in Deep learning using Keras?

It's a good thing that training one model doesn't use all 100% of your CPU! Now we have space to train multiple models in parallel and speed up your overall training times.

NB: If you want to just speed up this model, look into GPUs or changing the hyperparameters like batch size and number of neurons (layer size).

Here's how you can use multiprocessing to train multiple models at the same time (using processes running in parallel on each separate CPU core of your machine).

The multiprocessing.Pool basically creates a pool of jobs that need doing. The processes will pick up these jobs and run them. When a job is finished, the process will pick up another job from the pool.

import time
import signal
import multiprocessing

def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(layer_size):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
import keras
from keras.models import Sequential
from keras.layers import Dense

print(f'Training a model with layer size {layer_size}')

# build your model here
model_RNN = Sequential()
model_RNN.add(Dense(layer_size))

# fit the model (the bit that takes time!)
model_RNN.fit(...)

# lets demonstrate with a sleep timer
time.sleep(5)

# save trained model to a file
model_RNN.save(...)

# you can also return values eg. the eval score
return model_RNN.evaluate(...)


num_workers = 4
hyperparams = [800, 960, 1100]

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, hyperparams)

print(scores)

Output:

Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]

This is easily demonstrated with a time.sleep in the code. You'll see that all 3 processes start the training job, and then they all finish at about the same time. If this was single processed, you'd have to wait for each to finish before starting the next (yawn!).

EDIT
OP also wanted full code. This is difficult on Stack Overflow because I can't test in your environment and with your code. I've taken the liberty of copying and pasting your code into my template above. You may need to add some imports but this is as close as you'll get to "runnable" and "full" code.

import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score


def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(model_type):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
from keras.layers import LSTM, SimpleRNN, Dense, Activation
from keras.models import Sequential
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.layers.normalization import BatchNormalization

print(f'Training a model: {model_type}')

callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
]

model = Sequential()

if model_type == 'rnn':
model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
elif model_type == 'lstm':
model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))

model.add(Dense(480))
model.add(BatchNormalization())
model.add(Activation('tanh'))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(
trainX,
trainY,
epochs=50,
batch_size=20,
validation_data=(testX, testY),
verbose=1,
callbacks=callbacks,
)

# predict
Y_Train_pred = model.predict(trainX)
Y_Test_pred = model.predict(testX)

train_MSE = mean_squared_error(trainY, Y_Train_pred)
test_MSE = mean_squared_error(testY, Y_Test_pred)

# you can also return values eg. the eval score
return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}


# Your code
# ---------

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i % 3 == 2]

Y_train = df[index]
df = df.values

# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
dataX, dataY = [], []
print("Len:", len(dataset) - look_back - 1)
for i in range(len(dataset) - look_back - 1):
a = dataset[i : (i + look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)


Y_train = np.array(Y_train)
df = np.array(df)

look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)

# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
trainX, trainY, test_size=0.2, shuffle=False
)

# My Code
# -------

num_workers = 2
model_types = ['rnn', 'lstm']

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, model_types)

print(scores)

Output of the program:

[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866}, 
{'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]

Use keras in multiprocessing

The good news is that tensorflow sessions are thread-safe: Is it thread-safe when using tf.Session in inference service?

To use a keras model in multiple processes, you have to do the following:

  • set up the model
  • call _make_predict_function()
  • set up a session and use it to get the tensorflow graph
  • finalize this graph
  • everytime you predict something, supply this graph as_default_graph()

Here is some sample code:

# the usual imports
import numpy as np
import tensorflow as tf

from keras.models import *
from keras.layers import *

# set up the model
i = Input(shape=(10,))
b = Dense(1)(i)
model = Model(inputs=i, outputs=b)

# now to use it in multiprocessing, the following is necessary
model._make_predict_function()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
default_graph = tf.get_default_graph()
default_graph.finalize()

# now you share the model and graph between processes
# in each process you can call this:
with default_graph.as_default():
return model.predict(something)

Parallelizing Keras Model Predict Using Multiprocessing

Load your model in the _apply_df function, so it doesn't get involved in pickling and sending to the process.

This is a simple code example without the use of pandas that runs a model on fashion-mnist data. I think you can adapt it to your use case.

import tensorflow as tf
import numpy as np
from multiprocessing import Pool


def _apply_df(data):
model = tf.keras.models.load_model("my_fashion_mnist_model.h5")
return model.predict(data)


def apply_by_multiprocessing(data, workers):

pool = Pool(processes=workers)
result = pool.map(_apply_df, np.array_split(data, workers))
pool.close()
return list(result)


def main():
fashion_mnist = tf.keras.datasets.fashion_mnist
_, (test_images, test_labels) = fashion_mnist.load_data()

test_images = test_images / 255.0
results = apply_by_multiprocessing(test_images, workers=3)
print(test_images.shape) # (10000, 28, 28)
print(len(results)) # 3
print([x.shape for x in results]) # [(3334, 10), (3333, 10), (3333, 10)]


if __name__ == "__main__":
main()

Tensorflow (Keras) & Multiprocessing results in lack of GPU memory

If you are on windows move all your tf and keras imports into the methods.

How to avoid loading a parent module in a forked process with Pythons multiprocessing

Since Windows lacks os.fork() all the imports are imported again in the new process (which in your case include importing tf).

https://docs.python.org/2/library/multiprocessing.html#windows

Parallelizing model predictions in keras using multiprocessing for python

So, I am unsure of some of your design choices but I gave it the best attempt with the given information. Specifically, I think there are maybe some issues with the global variable and the import statement within your parallel function.

  1. You should use shared variables and not global variables to share an input between processes. You can read more about shared memory if you want in the multiprocessing documentation.

  2. I generated models from a tutorial since your models are not included.

  3. You are not joining or closing your pool but with the following code I was able to get the code to execute in parallel successfully. You can close the pool by calling pool.close() or with the "with" syntax shown in below. Note, the with syntax doesn't apply to python 2.7.

import numpy as np
import multiprocessing, time, ctypes, os
import tensorflow as tf

mis = (28, 28) #model input shape
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

def createModels(models):
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=mis),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10)
])

model.compile(optimizer='adam',
loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])

model.fit(x_train, y_train, epochs=5)

for mod in models:
model.save(mod)

def prediction(model_name):

model=tf.keras.models.load_model(model_name)
ret_val=model.predict(input).tolist()[0]
return ret_val

if __name__ == "__main__":
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
dir = os.listdir(".")
if models[0] not in dir:
createModels(models)
# Shared array input
ub = 100
testShape = x_train[:ub].shape
input_base = multiprocessing.Array(ctypes.c_double,
int(np.prod(testShape)),lock=False)
input = np.ctypeslib.as_array(input_base)
input = input.reshape(testShape)
input[:ub] = x_train[:ub]

# with multiprocessing.Pool() as p: #Use me for python 3
p = multiprocessing.Pool() #Use me for python 2.7
start_time=time.time()
res=p.map(prediction,models)
p.close() #Use me for python 2.7
print('Total time taken: {}'.format(time.time() - start_time))
print(res)

I hope this helps.

Tensorflow and Multiprocessing: Passing Sessions

You can't use Python multiprocessing to pass a TensorFlow Session into a multiprocessing.Pool in the straightfoward way because the Session object can't be pickled (it's fundamentally not serializable because it may manage GPU memory and state like that).

I'd suggest parallelizing the code using actors, which are essentially the parallel computing analog of "objects" and use used to manage state in the distributed setting.

Ray is a good framework for doing this. You can define a Python class which manages the TensorFlow Session and exposes a method for running your simulation.

import ray
import tensorflow as tf

ray.init()

@ray.remote
class Simulator(object):
def __init__(self):
self.sess = tf.Session()
self.simple_model = tf.constant([1.0])

def simulate(self):
return self.sess.run(self.simple_model)

# Create two actors.
simulators = [Simulator.remote() for _ in range(2)]

# Run two simulations in parallel.
results = ray.get([s.simulate.remote() for s in simulators])

Here are a few more examples of parallelizing TensorFlow with Ray.

See the Ray documentation. Note that I'm one of the Ray developers.



Related Topics



Leave a reply



Submit