Keras + Tensorflow and Multiprocessing in Python
From my experience - the problem lies in loading Keras
to one process and then spawning a new process when the keras
has been loaded to your main environment. But for some applications (like e.g. training a mixture of Keras
models) it's simply better to have all of this things in one process. So what I advise is the following (a little bit cumbersome - but working for me) approach:
DO NOT LOAD KERAS TO YOUR MAIN ENVIRONMENT. If you want to load Keras / Theano / TensorFlow do it only in the function environment. E.g. don't do this:
import keras
def training_function(...):
...but do the following:
def training_function(...):
import keras
...Run work connected with each model in a separate process: I'm usually creating workers which are making the job (like e.g. training, tuning, scoring) and I'm running them in separate processes. What is nice about it that whole memory used by this process is completely freed when your process is done. This helps you with loads of memory problems which you usually come across when you are using multiprocessing or even running multiple models in one process. So this looks e.g. like this:
def _training_worker(train_params):
import keras
model = obtain_model(train_params)
model.fit(train_params)
send_message_to_main_process(...)
def train_new_model(train_params):
training_process = multiprocessing.Process(target=_training_worker, args = train_params)
training_process.start()
get_message_from_training_process(...)
training_process.join()
Different approach is simply preparing different scripts for different model actions. But this may cause memory errors especially when your models are memory consuming. NOTE that due to this reason it's better to make your execution strictly sequential.
How can take advantage of multiprocessing and multithreading in Deep learning using Keras?
It's a good thing that training one model doesn't use all 100% of your CPU! Now we have space to train multiple models in parallel and speed up your overall training times.
NB: If you want to just speed up this model, look into GPUs or changing the hyperparameters like batch size and number of neurons (layer size).
Here's how you can use multiprocessing
to train multiple models at the same time (using processes running in parallel on each separate CPU core of your machine).
The multiprocessing.Pool
basically creates a pool of jobs that need doing. The processes will pick up these jobs and run them. When a job is finished, the process will pick up another job from the pool.
import time
import signal
import multiprocessing
def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(layer_size):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
import keras
from keras.models import Sequential
from keras.layers import Dense
print(f'Training a model with layer size {layer_size}')
# build your model here
model_RNN = Sequential()
model_RNN.add(Dense(layer_size))
# fit the model (the bit that takes time!)
model_RNN.fit(...)
# lets demonstrate with a sleep timer
time.sleep(5)
# save trained model to a file
model_RNN.save(...)
# you can also return values eg. the eval score
return model_RNN.evaluate(...)
num_workers = 4
hyperparams = [800, 960, 1100]
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, hyperparams)
print(scores)
Output:
Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]
This is easily demonstrated with a time.sleep
in the code. You'll see that all 3 processes start the training job, and then they all finish at about the same time. If this was single processed, you'd have to wait for each to finish before starting the next (yawn!).
EDIT
OP also wanted full code. This is difficult on Stack Overflow because I can't test in your environment and with your code. I've taken the liberty of copying and pasting your code into my template above. You may need to add some imports but this is as close as you'll get to "runnable" and "full" code.
import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(model_type):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
from keras.layers import LSTM, SimpleRNN, Dense, Activation
from keras.models import Sequential
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.layers.normalization import BatchNormalization
print(f'Training a model: {model_type}')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
]
model = Sequential()
if model_type == 'rnn':
model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
elif model_type == 'lstm':
model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model.add(Dense(480))
model.add(BatchNormalization())
model.add(Activation('tanh'))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(
trainX,
trainY,
epochs=50,
batch_size=20,
validation_data=(testX, testY),
verbose=1,
callbacks=callbacks,
)
# predict
Y_Train_pred = model.predict(trainX)
Y_Test_pred = model.predict(testX)
train_MSE = mean_squared_error(trainY, Y_Train_pred)
test_MSE = mean_squared_error(testY, Y_Test_pred)
# you can also return values eg. the eval score
return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}
# Your code
# ---------
df = pd.read_csv("D:\Train.csv", header=None)
index = [i for i in list(range(1440)) if i % 3 == 2]
Y_train = df[index]
df = df.values
# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
dataX, dataY = [], []
print("Len:", len(dataset) - look_back - 1)
for i in range(len(dataset) - look_back - 1):
a = dataset[i : (i + look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train = np.array(Y_train)
df = np.array(df)
look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)
# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
trainX, trainY, test_size=0.2, shuffle=False
)
# My Code
# -------
num_workers = 2
model_types = ['rnn', 'lstm']
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, model_types)
print(scores)
Output of the program:
[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866},
{'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]
Use keras in multiprocessing
The good news is that tensorflow sessions are thread-safe: Is it thread-safe when using tf.Session in inference service?
To use a keras model in multiple processes, you have to do the following:
- set up the model
- call
_make_predict_function()
- set up a session and use it to get the tensorflow graph
- finalize this graph
- everytime you predict something, supply this graph
as_default_graph()
Here is some sample code:
# the usual imports
import numpy as np
import tensorflow as tf
from keras.models import *
from keras.layers import *
# set up the model
i = Input(shape=(10,))
b = Dense(1)(i)
model = Model(inputs=i, outputs=b)
# now to use it in multiprocessing, the following is necessary
model._make_predict_function()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
default_graph = tf.get_default_graph()
default_graph.finalize()
# now you share the model and graph between processes
# in each process you can call this:
with default_graph.as_default():
return model.predict(something)
Parallelizing Keras Model Predict Using Multiprocessing
Load your model in the _apply_df
function, so it doesn't get involved in pickling and sending to the process.
This is a simple code example without the use of pandas
that runs a model on fashion-mnist data. I think you can adapt it to your use case.
import tensorflow as tf
import numpy as np
from multiprocessing import Pool
def _apply_df(data):
model = tf.keras.models.load_model("my_fashion_mnist_model.h5")
return model.predict(data)
def apply_by_multiprocessing(data, workers):
pool = Pool(processes=workers)
result = pool.map(_apply_df, np.array_split(data, workers))
pool.close()
return list(result)
def main():
fashion_mnist = tf.keras.datasets.fashion_mnist
_, (test_images, test_labels) = fashion_mnist.load_data()
test_images = test_images / 255.0
results = apply_by_multiprocessing(test_images, workers=3)
print(test_images.shape) # (10000, 28, 28)
print(len(results)) # 3
print([x.shape for x in results]) # [(3334, 10), (3333, 10), (3333, 10)]
if __name__ == "__main__":
main()
Tensorflow (Keras) & Multiprocessing results in lack of GPU memory
If you are on windows move all your tf
and keras
imports into the methods.
How to avoid loading a parent module in a forked process with Pythons multiprocessing
Since Windows lacks os.fork() all the imports are imported again in the new process (which in your case include importing tf).
https://docs.python.org/2/library/multiprocessing.html#windows
Parallelizing model predictions in keras using multiprocessing for python
So, I am unsure of some of your design choices but I gave it the best attempt with the given information. Specifically, I think there are maybe some issues with the global variable and the import statement within your parallel function.
You should use shared variables and not global variables to share an input between processes. You can read more about shared memory if you want in the multiprocessing documentation.
I generated models from a tutorial since your models are not included.
You are not joining or closing your pool but with the following code I was able to get the code to execute in parallel successfully. You can close the pool by calling
pool.close()
or with the "with" syntax shown in below. Note, the with syntax doesn't apply to python 2.7.
import numpy as np
import multiprocessing, time, ctypes, os
import tensorflow as tf
mis = (28, 28) #model input shape
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
def createModels(models):
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=mis),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10)
])
model.compile(optimizer='adam',
loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)
for mod in models:
model.save(mod)
def prediction(model_name):
model=tf.keras.models.load_model(model_name)
ret_val=model.predict(input).tolist()[0]
return ret_val
if __name__ == "__main__":
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
dir = os.listdir(".")
if models[0] not in dir:
createModels(models)
# Shared array input
ub = 100
testShape = x_train[:ub].shape
input_base = multiprocessing.Array(ctypes.c_double,
int(np.prod(testShape)),lock=False)
input = np.ctypeslib.as_array(input_base)
input = input.reshape(testShape)
input[:ub] = x_train[:ub]
# with multiprocessing.Pool() as p: #Use me for python 3
p = multiprocessing.Pool() #Use me for python 2.7
start_time=time.time()
res=p.map(prediction,models)
p.close() #Use me for python 2.7
print('Total time taken: {}'.format(time.time() - start_time))
print(res)
I hope this helps.
Tensorflow and Multiprocessing: Passing Sessions
You can't use Python multiprocessing to pass a TensorFlow Session
into a multiprocessing.Pool
in the straightfoward way because the Session
object can't be pickled (it's fundamentally not serializable because it may manage GPU memory and state like that).
I'd suggest parallelizing the code using actors, which are essentially the parallel computing analog of "objects" and use used to manage state in the distributed setting.
Ray is a good framework for doing this. You can define a Python class which manages the TensorFlow Session
and exposes a method for running your simulation.
import ray
import tensorflow as tf
ray.init()
@ray.remote
class Simulator(object):
def __init__(self):
self.sess = tf.Session()
self.simple_model = tf.constant([1.0])
def simulate(self):
return self.sess.run(self.simple_model)
# Create two actors.
simulators = [Simulator.remote() for _ in range(2)]
# Run two simulations in parallel.
results = ray.get([s.simulate.remote() for s in simulators])
Here are a few more examples of parallelizing TensorFlow with Ray.
See the Ray documentation. Note that I'm one of the Ray developers.
Related Topics
Regex to Match Digits and At Most One Space Between Them
Permissionerror: [Errno 13] Permission Denied
Format/Suppress Scientific Notation from Pandas Aggregation Results
How to Find the Unit Digits of a Specific Number
Using Pyserial to Send Binary Data
Output the Same Amount of Rows as Asterisks Using For-Loop
How to Drop Rows of Pandas Dataframe Whose Value in a Certain Column Is Nan
How to Upgrade the Sqlite Version Used by Python'S Sqlite3 Module on Mac
Making Python Dictionary from a Text File With Multiple Keys
How to Automatically Download Files from a Pop Up Dialog Using Selenium-Python
Setting Matplotlib Colorbar Range
How to Check If a String Is Unicode or Ascii
Create an Array With a Pre Determined Mean and Standard Deviation
Interactive Matplotlib Figures in Google Colab
Python Opencv - Add Alpha Channel to Rgb Image
Python Data Frame How to Find the Local Maximum in a 2D Array