在ML Engine上运行Keras模型

Keras是一个很好的库,可以使用深度学习模型。它具有易于训练的最先进模型和友好的编程接口,可实现和定制具有灵活后端选择的层(Tensorflow,Theano,CNTK)。Tensorflow还实现了Keras接口tf.keras,它具有相同的功能,但具有不同的代码库(由Google支持),并且更有可能在未来更好地集成到tensorflow中。
一旦模型针对给定任务进行了预训练,就可以使用基于tensorflow服务的ML引擎轻松地为预测和在线学习进行分布式和版本化控制,该引擎提供了一种全面集成的方法来扩展机器学习模型。它支持Tensorflow,scikitlearn或xgboost模型,并提供可扩展的批处理和在线预测API,而无需担心基础架构,部署或资源使用情况。这样可以大大加快从模型化到生产阶段的时间,并迭代机器学习(ML)模型。
本文将指导您实现我们在ML Engine上训练,转换,测试,导出和使用Keras模型的过程。
为何选择ML引擎
在我们的使用案例中,我们有近十亿个图像要处理 - 主要用于分类任务 - 以及每天要处理的数百个新图像。
我们开始使用k-80 GPU在一个小型谷歌计算引擎实例上训练和测试我们的机器学习模型,并且可以在合理的时间内处理数千到一百万个用于训练和预测的图像以迭代我们的机器学习模型。
不过,我们需要扩大规模,将模型应用到整个历史数据集,并能够预测传入图像(带有延迟约束)。
因此,拥有动态,自动扩展和随需应变资源(GPU,CPU和存储)是我们的应用程序的一部分,我们选择ML引擎进行数据准备的预测和数据流。

Keras训练模式
Keras提供了许多您可以选择的最先进的模型,已经预先训练过的重量可以为您自己的应用重复使用(使用预训练体重的转移学习训练模型或从头开始训练模型)
https://github.com/keras-team/keras/tree/master/keras/applications
从现有模型开始,您通常需要调整:
- 输入和输出层,形状和中间元非图像特征注入dense层
from keras.applications import InceptionV3 from keras.layers import Dense, GlobalAveragePooling2D, Flatten, concatenate, Input from keras.models import Model base_model = InceptionV3(weights='imagenet',input_shape=input_shape, include_top=False) inputs = [base_model.input] x = base_model.output x = GlobalAveragePooling2D()(x) # meta features layers can be added here # features = Input(shape=(len(model_options.features),), name='Features') # features = Dense(20, activation='relu')(features) # x = concatenate([x, features], axis=1) x = Dense(1024, activation='relu')(x) predictions = Dense(model_options.nb_classes, activation='softmax')(x) model = Model(inputs=inputs, outputs=predictions)

- 训练图像预处理和增强以在训练阶段应用
from imgaug import augmenters as iaa
from keras.preprocessing.image import ImageDataGenerator
aug1 = iaa.GaussianBlur(sigma=(0, 2.0)) # adding random blur to training images
def additional_augmenation(image):
image = aug1.augment_image(image)
return image
generator_options = {
'featurewise_center' : True,
'featurewise_std_normalization': True,
'rescale': 1. / 255, # this rescale RGB 255 images to 0-1
'horizontal_flip': True,
'vertical_flip': True,
'zoom_range': 0.2,
'rotation_range': 90.,
'preprocessing_function': additional_augmenation
}
generator = ImageDataGenerator(
**generator_options)
flow_options = {
'target_size': (input_size, input_size),
'batch_size': batch_size,
'class_mode': 'categorical',
'shuffle': True,
'seed': seed,
'classes': class_names
}
image_generator = generator.flow_from_directory(
image_folder_dir, **flow_options
)
- 训练参数:
- 损失函数最有可能取决于您想要实现的(分类、回归等),在CNN模型的最后一层使用的激活函数
- batch_size和主要影响收敛速度的epochs数; 较大的批量大小将减少执行的步骤数,但代价是更多的内存
- 优化器也将主要影响收敛,具体取决于您的应用程序
下面是简单图像分类的示例参数
loss_function = 'categorical_crossentropy' steps_per_epoch = int(nb_training_samples // batch_size) optimizer = keras.optimizers.Adam(lr=init_lr) metrics = ['accuracy'] validation_steps = int( nb_validation_samples // batch_size) model.compile(optimizer, loss_function, metrics=metrics) model.fit_generator( image_generator, steps_per_epoch=steps_per_epoch, epochs=nb_epochs, callbacks=callbacks, verbose=2, workers=1, use_multiprocessing=False )

将Keras模型导出为Tensorflow推理图
如果您使用Keras本机实现,您首先需要将Keras模型转换为Tensorflow推理图并将权重转换为二进制protobuf文件。
由于Keras在幕后使用了Tensorflow后端,所以很容易转换使用过的模型,这些模型可以在Tensorflow的主会话中找到:
import tensorflow as tf from keras import backend as K keras.models.load_model(input_model_path) session = K.get_session() tf.train.write_graph(session.graph.as_graph_def(), graph, logdir, path_to_model_file_pb, as_text=True)

如果您想冻结图以进行优化(许多操作包括加载和保存变量),您可能需要将变量节点转换为常量:
graph = graph_util.convert_variables_to_constants(session, session.graph.as_graph_def(), [n.op.name for n in model.outputs])
自定义输入/输出以添加服务API的元数据
您需要自定义模型提供的输入/输出的主要原因有两个:
- 模型训练通常涉及一些你想要嵌入主图的预处理步骤(解码图像,调整大小,重新缩放......)
- 您可能希望向API输入/输出添加一些元数据(标签,标识符,特征或预测图像的特定功能......)
预处理步骤的示例包括jpeg图像解码和调整大小
def build_serving_inputs(): def decode_and_resize(image_str_tensor): """Decodes jpeg string, resizes it and returns a float32 tensor.""" image = tf.image.decode_jpeg(image_str_tensor, channels=3) image = tf.expand_dims(image, 0) image = tf.image.resize_bilinear( image, [height, width], align_corners=False) image = tf.squeeze(image, squeeze_dims=[0]) image = tf.cast(image, dtype=tf.float32) return image # define a variable size input tensor that takes images encoded as jpeg in base64 string inputs = tf.placeholder(tf.string, name="input_image", shape=[None]) decoded_images = tf.map_fn(decode_and_resize, inputs, back_prop=False) output = tf.identity(decoded_images, name="output") return input, output

要将预处理注入主图,您需要在输入之前添加一些步骤,并使用input_map将它们分支到模型输入占位符:
with tf.gfile.GFile(existing_model_path_pb, "rb") as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
with tf.Graph().as_default() as graph:
# The name var will prefix every op/nodes in your graph
tf.import_graph_def(graph_def, name="my-model-name")
gdef_main_graph = graph.as_graph_def()
with tf.Graph().as_default() as preprocessing_graph:
input, output = build_serving_inputs()
gdef_preprocessing = preprocessing_graph.as_graph_def()
# prepare builder to save serving model with preprocessing stepsbuilder = tf.saved_model.builder.SavedModelBuilder(os.path.join(EXPORT_DIR, 'serve'))
# merge preprocessing graph and main model graph into a single graph
with tf.Graph().as_default() as graph_with_preprocessing:
image_input = tf.placeholder(tf.string, name="image_as_jpeg")
input_metadata = tf.placeholder(tf.string, name="object_id")
metadata = tf.identity(input_metadata, name="metadata_out")
prepared_input, = tf.import_graph_def(gdef_preprocessing,
input_map={"input:0": image_input}, return_elements=["output:0"])
# feed preprocessed inputs into main model graph
output, = tf.import_graph_def(
gdef_main_graph,
input_map={"my-model-name/input_1:0": prepared_input},
return_elements=["my-model-name/dense_2/Softmax:0"],
name=”full_graph”)
保存的模型签名
保存的模型定义了将tf图序列化为元图的方法,元图还将变量、assets和签名(模型的输入和输出,即将由服务模型提供的输入)嵌入到protobuf模式中。
有一些签名助手提供了方便的方法来为保存的模型构建签名。
image_info = tf.saved_model.utils.build_tensor_info(image_input)
in_metadata_info = tf.saved_model.utils.build_tensor_info(input_metadata)
metadata_info = tf.saved_model.utils.build_tensor_info(metadata)
output_info = tf.saved_model.utils.build_tensor_info(output)
# building serving model signature with default tf labels
signature = (tf.saved_model.signature_def_utils.build_signature_def(
inputs={'input': image_info, 'metadata': in_metadata_info},
outputs={tf.saved_model.signature_constants.CLASSIFY_OUTPUT_CLASSES: outputs_classes, 'metadata': metadata_info},
method_name=
tf.saved_model.signature_constants.CLASSIFY_METHOD_NAME))
# adding serving tag and serving signature within main graph
builder.add_meta_graph_and_variables(
session, [tf.saved_model.tag_constants.SERVING], signature_def_map={tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature})
builder.save()
一个更完整的功能预处理示例可以在优秀的google的cloud ml 示例存储库中找到(https://github.com/GoogleCloudPlatform/cloudml-samples/blob/master/flowers/trainer/model.py)。
在本地测试服务模型
保存(已保存)模型后,就可以在本地进行测试了。
您可以使用gcloud客户端或已保存的模型来测试模型。
首先,您可以使用saved_model_cli检查您是否拥有正确的模型签名。
saved_model_cli show --dir . --tag_set serve --all MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs: signature_def[serving_default]: The given SavedModel SignatureDef contains the following input(s): inputs['input'] tensor_info: dtype: DT_STRING shape: (-1) name: image_as_jpeg:0 inputs['metadata'] tensor_info: dtype: DT_STRING shape: (-1) name: object_id:0 The given SavedModel SignatureDef contains the following output(s): outputs['classes'] tensor_info: dtype: DT_FLOAT shape: (-1, 2) name: import_1/inception/dense_2/Softmax:0 outputs['metadata'] tensor_info: dtype: DT_STRING shape: (-1) name: metadata_out:0 Method name is: tensorflow/serving/classify

然后,您需要实现数据序列化以提供构建的服务模型。还有多种选择:
对于gcloud ml-engine客户端:
- 作为基于协议缓冲区的格式的tf记录,可以并且应该用于批量预测(也可以使用压缩的tf记录)
- 作为json行序列化文件,也可用于API调用和批量预测
对于saved_model_cli:
- 作为序列化的numpy数组
- 作为json行序列化文件
换行符分隔json文件
这是开始测试服务部分最直接的选择。基本上你需要提供文件,每行都有一个与元图签名规范相匹配的输入字典:
import base64
import json
from PIL import Image
from io import BytesIO
def convert_image_to_bytes(image_uri, format='JPEG'):
im = Image.open(image_uri)
jpeg_im = BytesIO()
im.save(jpeg_im, format=format)
return jpeg_im
def convert_to_input_dict(image_uri):
return {'metadata': image_uri, 'input': {"b64": base64.b64encode(convert_image_to_bytes(image_uri).getvalue()).decode()}}
def convert_to_json_file(image_uris, filename):
with open(filename, 'w+') as of:
for image_uri in image_uris:
of.write(
json.dumps(convert_to_input_dict(image_uri)) +'
')
%%bash
gcloud ml-engine local predict --model-dir=models/inception-15/test/serve/ --json-instances data/json/test.json
Numpy数组
Numpy数组可以以与json非常相似的方式生成(除了你需要将每个输入导出为它自己的numpy数组,这有点......不方便)
import pandas as pd
import numpy as np
def convert_to_numpy_array(image_uris, filename):
images_records = [{'input': convert_image_to_bytes(image_uri).getvalue()} for image_uri in image_uris]
images_df = pd.DataFrame(images_records).values.flatten().astype(bytes)
meta_records = [{'metadata': image_uri} for image_uri in image_uris]
meta_df = pd.DataFrame([meta_records]).values.flatten().astype(bytes)
np.save(filename, images_df)
np.save('meta_{}'.format(filename), meta_df)
%%bash
saved_model_cli run --dir . --tag_set serve --signature_def serving_default --inputs 'input=inputs.npy;metadata=metadata.npy'
TFRecords
TFRecords基于protobuf(二进制存储格式),并且很好地集成在tensorflow data api中,它提供了方便的方法来处理批处理,迭代以及基于并行/由文件名馈送的队列重新采样。
虽然,因为它涉及protobuf模式,它需要在tf图中的预处理之前注入一些解码步骤。因此,我们需要事先改变服务签名。
首先,我们将开始定义我们的模式(与上面定义的签名基本匹配)和方便的编码和解码方法。
import tensorflow as tf
def _bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def encode(image_uri, filename):
writer = tf.python_io.TFRecordWriter(filename)
jpeg_im = convert_image_to_bytes(image_uri)
example = tf.train.Example(features=tf.train.Features(feature={
'input': _bytes_feature(jpeg_im.getvalue()), 'metadata': _bytes_feature(image_uri.encode())}))
writer.write(example.SerializeToString())
def decode(serialized_example):
features = tf.parse_single_example(serialized_example,
features={
'input': tf.FixedLenFeature([], tf.string),
'metadata': tf.FixedLenFeature([], tf.string),
})
return features['input'], features['metadata']
然后我们将重新定义主图中的解码步骤:
builder = tf.saved_model.builder.SavedModelBuilder(os.path.join(EXPORT_DIR, 'serve-tf-records'))
with tf.Graph().as_default() as graph_with_tf_decode:
serialized_tf_records = tf.placeholder(tf.string,
name="tf_records", shape=None)
inputs, metadata = tf.map_fn(decode,
serialized_tf_records, back_prop=False, dtype=(tf.string, tf.string))
outputs, metadata, = tf.import_graph_def(
graph_with_preprocessing,
input_map={"images_as_jpeg:0": inputs, "object_ids:0": metadata},
return_elements=["full_graph/my-model-name/dense_2/Softmax:0", "metadata_out:0"], name='')
records_info = tf.saved_model.utils.build_tensor_info(serialized_tf_records)
metadata_info = tf.saved_model.utils.build_tensor_info(metadata)
output_info = tf.saved_model.utils.build_tensor_info(outputs)
signature = (tf.saved_model.signature_def_utils.build_signature_def(
inputs={'input': records_info},
outputs={
tf.saved_model.signature_constants.CLASSIFY_OUTPUT_CLASSES: output_info, 'metadata': metadata_info},
method_name=
tf.saved_model.signature_constants.CLASSIFY_METHOD_NAME))
builder.add_meta_graph_and_variables(
sess,
[tf.saved_model.tag_constants.SERVING],
signature_def_map
{tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature})
builder.save()
最后,您可以通过将tf记录数据集提供给此图来测试它
;with tf.Session(graph=graph_with_tf_decode) as session:
filenames = tf.placeholder(tf.string, shape=[None])
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.batch(1)
iterator = dataset.make_initializable_iterator()
session.run(iterator.initializer, feed_dict={filenames: [test_tf_file]})
records = session.run(iterator.get_next())
inputs = graph_with_tf_decode.get_tensor_by_name('tf_records:0')
model = graph_with_tf_decode.get_tensor_by_name('full_graph/my-model-name/dense_2/Softmax:0')
res = session.run(model, feed_dict={inputs: records})
没有本地模式可以提供tf记录,但是一旦你的模型被推到具有预测/训练工作的机器学习(ML)引擎上,你将能够测试它
gcloud ml-engine jobs submit prediction test --version v2 --data-format tf-record --input-paths gs://my-bucket/data/tf_records/tf-records-file --output-path gs://my-bucket/predictions/model/out --model test --region europe-west1
在这里,您将找到另一个使用tf记录进行ML引擎培训的示例:
https://cloud.google.com/blog/products/gcp/performing-prediction-with-tensorflow-object-detection-models-on-google-cloud-machine-learning-engine
将模型导出到机器学习(ML)引擎并从API调用
在本地测试模型后,将其导出到Google云端存储中然后在ML引擎中提供它非常简单。
您首先需要创建一个ml-engine模型,然后从云控制台界面或gcloud ml-engine客户端添加新的模型版本:
gsutil cp local_saved_model_path gs://my-bucket/model/serve gcloud ml-engine versions create v2 --model test --origin gs://my-bucket/model/serve --python-version 3.5 --runtime-version 1.9 --framework tensorflow

在tf记录上使用批量预测
从那里你可以使用数据流或spark来准备你的tf记录数据集以进行训练或预测,你会在cloud ml样本中找到一些完整的tf记录准备实例 (https://github.com/GoogleCloudPlatform/cloudml-samples/blob/master/flowers/trainer/preprocess.py)]。
准备好数据后,推送预测作业可以如上所示与客户端或从API进行交互(在我们的用例中,我们在airflow使用ML引擎预定作业)。
Google最近在tensorflow api中发布了数据流集成,因此我们希望将来能够将更加一致和统一的API结合起来进行数据准备和数据建模。
从API调用
当您必须执行在线学习和/或预测时,您还可以直接使用api预测记录,将输入作为json序列化对象传递:
def predict_json(project, model, instances, credentials, version=None):
"""Send json data to a deployed model for prediction.
Args:
project (str): project where the Cloud ML Engine Model is deployed.
model (str): model name.
instances json properly formatted
version: str, version of the model to target.
Returns:
Mapping[str: any]: dictionary of prediction results defined by the
model.
"""
service = build('ml', 'v1', credentials=credentials)
name = 'projects/{}/models/{}'.format(project, model)
if version is not None:
name += '/versions/{}'.format(version)
response = service.projects().predict(
name=name,
body=instances
).execute()
if 'error' in response:
raise RuntimeError(response['error'])
return response['predictions']
resp = predict_json(
project, 'model',
{'instances':[convert_to_input_dict(element) for element in element]}, gcs_credentials
)
最后
Tensorflow增长非常快。如果它的核心API庞大,快速变化并且具有难以掌握的概念,它集成了更易于访问的接口,例如Keras,已经丰富了数据预处理工具,旨在实现更一致的API整体。