分享

MLOps管道中的模型自动调整

 码农9527 2021-05-06

  在本系列文章中,我们将引导您完成将CI/CD应用于AI任务的过程。最后,您将获得一个满足GoogleMLOps成熟度模型第2级要求的功能管道。我们假设您对Python,深度学习,Docker,DevOps和Flask有所了解。

    在上一篇文章中,我们为此项目设置了一个云环境。在这一部分中,我们将引导您完成持续集成,模型自动训练,自动调整和持续交付所需的代码。下图显示了我们在项目流程中的位置。

web

    我们将显示代码的精简版本。有关完整版本,请参见此存储库。我们将在该项目中使用GCRDocker映像(由TensorFlow支持)–但是随时可以使用其他映像。

    首先,我们将讨论在本地运行这些解决方案的代码。稍后,我们将看到如何为云部署做好准备。

    下图显示了我们项目的文件结构。

web

    data_utils.py

    该data_utils.py文件句柄的数据加载,转换和模型保存到GCS。此文件可能因项目而异。本质上,它在模型训练之前执行所有数据处理任务。让我们看一下代码:

import datetime
from google.cloud import storage
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import tensorflow as tf
import gc
from sklearn import preprocessing
import os
import zipfile
import cv2
import sys
 
def dataset_transformation(path):
 images = []
 for dirname, _, filenames in os.walk(path):
  for filename in filenames:
   if filename.endswith('.png'):
 image = cv2.imread(os.path.join(dirname, filename))
 image = cv2.resize(image, (128, 128))
 images.append(image)
 return images
 
def load_data(args): 
 file_1 = '/root/AutomaticTraining-Dataset/COVID_RX/normal_images.zip'
 file_2 = '/root/AutomaticTraining-Dataset/COVID_RX/covid_images.zip'
 file_3 = '/root/AutomaticTraining-Dataset/COVID_RX/viral_images.zip'
 extract_to = '/root/AutomaticTraining-Dataset/COVID_RX/'
 
 with zipfile.ZipFile(file_1, 'r') as zip_ref:
  zip_ref.extractall(extract_to)
 
 with zipfile.ZipFile(file_2, 'r') as zip_ref:
  zip_ref.extractall(extract_to)
 
 with zipfile.ZipFile(file_3, 'r') as zip_ref:
  zip_ref.extractall(extract_to)

 normal = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/normal_images')
 covid = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/covid_images')
 viral = dataset_transformation('/root/AutomaticTraining-Dataset/COVID_RX/viral_images')
 #Train and test - dataset combination
 X = normal + viral + covid
 #Transforming from list to numpy array.
 X = np.array(X)
 
 #Creating labels.
 y = []
 for i in range(len(normal)):
  y.append(0)
 for i in range(len(covid)):
  y.append(1)
 for i in range(len(viral)):
  y.append(2)
 y = np.array(y)

 #Dataset splitting
 X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0, shuffle = True)
 return X_train, X_test, y_train, y_test
 
def save_model(bucket_name, best_model):
 try:
  storage_client = storage.Client() #if running on GCP
  bucket = storage_client.bucket(bucket_name)
  blob1 = bucket.blob('{}/{}'.format('testing',best_model))
  blob1.upload_from_filename(best_model)
  return True,None
 except Exception as e:
  return False,e123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869复制代码类型:[html]

    model_assembly.py

    该model_assembly.py文件包含模型的创建和自动的调整的代码。我们希望从一个非常基本的模型开始–对其进行训练和评估。如果初始模型不能达到理想的性能,我们将进行改进直到达到目标。让我们看一下代码:

from tensorflow.keras.models import load_model
from tensorflow.keras import layers
import tensorflow as tf
import numpy as np
 
def get_base_model():
 input_img = layers.Input(shape=(128, 128, 3))
 x = layers.Conv2D(64,(3, 3), activation='relu')(input_img)
 return input_img,x

def get_additional_layer(filters,x):
 x = layers.MaxPooling2D((2, 2))(x)
 x = layers.Conv2D(filters, (3, 3), activation='relu')(x)
 return x
 
def get_final_layers(neurons,x):
 x = layers.SpatialDropout2D(0.2)(x)
 x = layers.Flatten()(x)
 x = layers.Dense(neurons)(x)
 x = layers.Dense(3)(x)
 return x123456789101112131415161718192021复制代码类型:[html]

    这些函数将在循环中调用,并且在第一次迭代中,我们将获得base_model,final_layers并将它们堆叠起来以构建一个非常简单的模型。如果训练我们发现该模型不执行不够好之后,然后我们将再次得到base_model,加additional_layers,堆栈final_layers,然后训练和一次评估。如果我们仍然无法达到良好的性能,则将在循环中重复最后一个过程,并增加更多的过程,additional_layers直到达到预定义的良好指标为止。

    email_notifications.py

    该email_notifications.py文件是负责通过本地SMTP服务器产品所有者提供的电子邮件。这些电子邮件会告诉所有者是否一切正常,如果不正常,那是什么问题。

import smtplib
import os
 
# Email variables definition
sender = 'example@gmail.com’
receiver = ['svirahonda@gmail.com'] #replace this by the owner's email address
smtp_provider = 'smtp.gmail.com' #replace this by your STMP provider
smtp_port = 587
smtp_account = 'example@gmail.com’
smtp_password = 'your_password’
 
def training_result(result,model_acc):
 if result == 'ok':
  message = 'The model reached '+str(model_acc)+', It has been saved to GCS.'
 if result == 'failed':
  message = 'None of the models reached an acceptable accuracy, training execution had to be forcefully ended.’
 message = 'Subject: {}\n\n{}'.format('An automatic training job has ended recently', message)
 try:
  server = smtplib.SMTP(smtp_provider,smtp_port)
  server.starttls()
  server.login(smtp_account,smtp_password)
  server.sendmail(sender, receiver, message)   
  return
 except Exception as e:
  print('Something went wrong. Unable to send email: '+str(e),flush=True)
  return
 
def exception(e_message):
 try:
  message = 'Subject: {}\n\n{}'.format('An automatic training job has failed.', e_message)
  server = smtplib.SMTP(smtp_provider,smtp_port)
  server.starttls()
  server.login(smtp_account,smtp_password)
  server.sendmail(sender, receiver, message)   
  return
 except Exception as e:
  print('Something went wrong. Unable to send email: '+str(e),flush=True)
  return1234567891011121314151617181920212223242526272829303132333435363738复制代码类型:[html]

    task.py

    该task.py文件编排节目。它会初始化GPU(如果有),以开始模型训练,并在需要时调整模型。它还接收传递给应用程序的参数。这是代码:

import tensorflow as tf
from tensorflow.keras import Model, layers, optimizers
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import Model
from tensorflow.keras.models import load_model
import argparse
import model_assembly, data_utils, email_notifications
import sys
import os
import gc
from google.cloud import storage
import datetime
import math
 
# general variables declaration
model_name = 'best_model.hdf5'
 
def initialize_gpu():
 if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
  tf.config.set_soft_device_placement(True)
  tf.debugging.set_log_device_placement(True)
  return
 
def start_training(args):
 # Loading splitted data
 X_train, X_test, y_train, y_test = data_utils.load_data(args)
 # Initializing GPU if available
 initialize_gpu()
 train_model(X_train, X_test, y_train, y_test, args)
 
def train_model(X_train, X_test, y_train, y_test,args):
 try:
  model_loss, model_acc = [0,0]
  counter = 0
  while model_acc <= 0.85:
   input_img,x = model_assembly.get_base_model()
   if counter == 0:
 x = model_assembly.get_final_layers(64,x)
   else:
 for i in range(counter):
  x = model_assembly.get_additional_layer(int(64*(math.pow(2,counter))),x)
 x = model_assembly.get_final_layers(int(64*(math.pow(2,counter))),x)
   cnn = Model(input_img, x,name="CNN_COVID_"+str(counter))
   cnn.summary()
   cnn.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy'])
   checkpoint = ModelCheckpoint(model_name, monitor='val_loss', verbose=1,save_best_only=True, mode='auto', save_freq="epoch")
   cnn.fit(X_train, y_train, epochs=args.epochs, validation_data=(X_test, y_test),callbacks=[checkpoint])
   cnn = load_model(model_name)
   model_loss, model_acc = cnn.evaluate(X_test, y_test,verbose=2)
   if model_acc > 0.85:
 saved_ok = data_utils.save_model(args.bucket_name,model_name)
  if saved_ok[0] == True:
   email_notifications.training_result('ok',model_acc)
   sys.exit(0)
  else:
 email_notifications.exception(saved_ok[1])
 sys.exit(1)
   else:
 pass
   if counter >= 5:
 email_notifications.training_result('failed',None)
 sys.exit(1)
   counter += 1
 except Exception as e:
  email_notifications.exception('An exception when training the model has occurred: '+str(e))
  sys.exit(1)
 
def get_args():
 parser = argparse.ArgumentParser()
 parser.add_argument('--bucket-name', type=str, default = 'automatictrainingcicd-aiplatform', help = 'GCP bucket name')
 parser.add_argument('--epochs', type=int, default=3, help='Epochs number')
 args = parser.parse_args()
 return args
 
def main():
 args = get_args()
 start_training(args)
 
if __name__ == '__main__':
 main()1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980复制代码类型:[html]

    Docker文件

    我们的Dockerfile负责将指令传递给Docker守护程序以构建适当的容器。看起来是这样的:

FROM gcr.io/deeplearning-platform-release/tf2-cpu.2-0
 
WORKDIR /root
 
RUN pip install pandas numpy google-cloud-storage scikit-learn opencv-python
RUN apt-get update; apt-get install git -y; apt-get install -y libgl1-mesa-dev
 
ADD "https://www./cgi-bin/randbyte?nbytes=10&format=h" skipcache
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-Dataset.git
ADD "https://www./cgi-bin/randbyte?nbytes=10&format=h" skipcache
RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-CodeCommit.git
 
RUN mv /root/AutomaticTraining-CodeCommit/model_assembly.py /root
RUN mv /root/AutomaticTraining-CodeCommit/task.py /root
RUN mv /root/AutomaticTraining-CodeCommit/data_utils.py /root
RUN mv /root/AutomaticTraining-CodeCommit/email_notifications.py /root
 
ENTRYPOINT ["python","task.py"]123456789101112131415161718复制代码类型:[html]

    上面的文件使用该gcr.io/deeplearning-platform-release/tf2-cpu.2-0映像,安装依赖项,克隆所需的存储库,将文件移至主目录,并设置容器执行的入口点。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多