编写自定义 TensorFlow/Keras 回调的指南

介绍

假设您希望 Keras 模型在训练、评估或预测期间具有某些特定行为。 例如,您可能希望在每个训练时期保存您的模型。 一种方法是使用回调。

通常,回调是在某些事件发生时调用的函数,并作为参数传递给其他函数。 对于 Keras,它们是自定义模型行为的工具——无论是在训练、评估还是推理期间。 一些应用程序是日志记录、模型持久性、提前停止或改变学习率。 这是通过将回调列表作为参数传递给 keras.Model.fit(),keras.Model.evaluate() or keras.Model.predict().

回调的一些常见用例是修改学习率、记录、监控和提前停止训练。 Keras内置了一些回调函数,详解
在文档中
.

但是,一些更具体的应用程序可能需要自定义回调。 例如, 在保持期后使用余弦衰减实施学习率预热 目前不是内置的,但被广泛用作调度程序。

回调类及其方法

Keras 有一个特定的回调类, keras.callbacks.Callback,具有可以在全局、批量或时代级别的训练、测试和推理期间调用的方法。 为了 创建自定义回调,我们需要创建一个子类并重写这些方法。

keras.callbacks.Callback 类有XNUMX种方法:

  • 全局方法:在开始或结束时调用 fit(), evaluate()predict().
  • 批级方法:在批处理开始或结束时调用。
  • epoch-level 方法:在训练批次的开始或结束时调用。

请注意: 每个方法都可以访问一个名为 logs. 的键和值 logs 是上下文相关的——它们取决于调用该方法的事件。 此外,我们可以通过 self.model 属性。

让我们看一下三个自定义回调示例——一个用于训练,一个用于评估,一个用于预测。 每个人都会在每个阶段打印我们的模型正在做什么以及我们可以访问哪些日志。 这有助于理解在每个阶段可以使用自定义回调做什么。

让我们从定义一个玩具模型开始:

import tensorflow as tf
from tensorflow import keras
import numpy as np

model = keras.Sequential()
model.add(keras.layers.Dense(10, input_dim = 1, activation='relu'))
model.add(keras.layers.Dense(10, activation='relu'))
model.add(keras.layers.Dense(1))
model.compile(
    optimizer=keras.optimizers.RMSprop(learning_rate=0.1),
    loss = "mean_squared_error",
    metrics = ["mean_absolute_error"]
)

x = np.random.uniform(low = 0, high = 10, size = 1000)
y = x**2
x_train, x_test = (x[:900],x[900:])
y_train, y_test = (y[:900],y[900:])

自定义训练回调

我们的第一个回调将在训练期间被调用。 让我们子类化 Callback 类:

class TrainingCallback(keras.callbacks.Callback):
    def __init__(self):
        self.tabulation = {"train":"", 'batch': " "*8, 'epoch':" "*4}
    def on_train_begin(self, logs=None):
        tab = self.tabulation['train']
        print(f"{tab}Training!")
        print(f"{tab}available logs: {logs}")

    def on_train_batch_begin(self, batch, logs=None):
        tab = self.tabulation['batch']
        print(f"{tab}Batch {batch}")
        print(f"{tab}available logs: {logs}")

    def on_train_batch_end(self, batch, logs=None):
        tab = self.tabulation['batch']
        print(f"{tab}End of Batch {batch}")
        print(f"{tab}available logs: {logs}")

    def on_epoch_begin(self, epoch, logs=None):
        tab = self.tabulation['epoch']
        print(f"{tab}Epoch {epoch} of training")
        print(f"{tab}available logs: {logs}")

    def on_epoch_end(self, epoch, logs=None):
        tab = self.tabulation['epoch']
        print(f"{tab}End of Epoch {epoch} of training")
        print(f"{tab}available logs: {logs}")

    def on_train_end(self, logs=None):
        tab = self.tabulation['train']
        print(f"{tab}Finishing training!")
        print(f"{tab}available logs: {logs}")

如果这些方法中的任何一个没有被覆盖——默认行为将像以前一样继续。 在我们的例子中——我们简单地打印出可用的日志和应用回调的级别,并使用适当的缩进。

让我们看一下输出:

model.fit(
    x_train,
    y_train,
    batch_size=500,
    epochs=2,
    verbose=0,
    callbacks=[TrainingCallback()],
)
Training!
available logs: {}
    Epoch 0 of training
    available logs: {}
        Batch 0
        available logs: {}
        End of Batch 0
        available logs: {'loss': 2172.373291015625, 'mean_absolute_error': 34.79669952392578}
        Batch 1
        available logs: {}
        End of Batch 1
        available logs: {'loss': 2030.1309814453125, 'mean_absolute_error': 33.30256271362305}
    End of Epoch 0 of training
    available logs: {'loss': 2030.1309814453125, 'mean_absolute_error': 33.30256271362305}
    Epoch 1 of training
    available logs: {}
        Batch 0
        available logs: {}
        End of Batch 0
        available logs: {'loss': 1746.2772216796875, 'mean_absolute_error': 30.268001556396484}
        Batch 1
        available logs: {}
        End of Batch 1
        available logs: {'loss': 1467.36376953125, 'mean_absolute_error': 27.10252571105957}
    End of Epoch 1 of training
    available logs: {'loss': 1467.36376953125, 'mean_absolute_error': 27.10252571105957}
Finishing training!
available logs: {'loss': 1467.36376953125, 'mean_absolute_error': 27.10252571105957}


请注意,我们可以在每个步骤中跟踪模型在做什么,以及我们可以访问哪些指标。 在每个批次和时期结束时,我们可以访问样本内损失函数和模型的指标。

自定义评估回调

现在,让我们调用 Model.evaluate() 方法。 我们可以看到,在批处理结束时,我们可以获得当时的损失函数和指标,而在评估结束时,我们可以获得整体损失和指标:

class TestingCallback(keras.callbacks.Callback):
    def __init__(self):
          self.tabulation = {"test":"", 'batch': " "*8}
      
    def on_test_begin(self, logs=None):
        tab = self.tabulation['test']
        print(f'{tab}Evaluating!')
        print(f'{tab}available logs: {logs}')

    def on_test_end(self, logs=None):
        tab = self.tabulation['test']
        print(f'{tab}Finishing evaluation!')
        print(f'{tab}available logs: {logs}')

    def on_test_batch_begin(self, batch, logs=None):
        tab = self.tabulation['batch']
        print(f"{tab}Batch {batch}")
        print(f"{tab}available logs: {logs}")

    def on_test_batch_end(self, batch, logs=None):
        tab = self.tabulation['batch']
        print(f"{tab}End of batch {batch}")
        print(f"{tab}available logs: {logs}")
res = model.evaluate(
    x_test, y_test, batch_size=100, verbose=0, callbacks=[TestingCallback()]
)
Evaluating!
available logs: {}
        Batch 0
        available logs: {}
        End of batch 0
        available logs: {'loss': 382.2723083496094, 'mean_absolute_error': 14.069927215576172}
Finishing evaluation!
available logs: {'loss': 382.2723083496094, 'mean_absolute_error': 14.069927215576172}

自定义预测回调

最后,让我们调用 Model.predict() 方法。 请注意,在每批结束时,我们可以访问模型的预测输出:

class PredictionCallback(keras.callbacks.Callback):
    def __init__(self):
        self.tabulation = {"prediction":"", 'batch': " "*8}

    def on_predict_begin(self, logs=None):
        tab = self.tabulation['prediction']
        print(f"{tab}Predicting!")
        print(f"{tab}available logs: {logs}")

    def on_predict_end(self, logs=None):
        tab = self.tabulation['prediction']
        print(f"{tab}End of Prediction!")
        print(f"{tab}available logs: {logs}")

    def on_predict_batch_begin(self, batch, logs=None):
        tab = self.tabulation['batch']
        print(f"{tab}batch {batch}")
        print(f"{tab}available logs: {logs}")

    def on_predict_batch_end(self, batch, logs=None):
        tab = self.tabulation['batch']
        print(f"{tab}End of batch {batch}")
        print(f"{tab}available logs:n {logs}")
res = model.predict(x_test[:10],
                    verbose = 0, 
                    callbacks=[PredictionCallback()])

查看我们的 Git 学习实践指南,其中包含最佳实践、行业认可的标准以及随附的备忘单。 停止谷歌搜索 Git 命令,实际上 学习 它!

Predicting!
available logs: {}
        batch 0
        available logs: {}
        End of batch 0
        available logs:
 {'outputs': array([[ 7.743822],
       [27.748264],
       [33.082104],
       [26.530678],
       [27.939169],
       [18.414223],
       [42.610645],
       [36.69335 ],
       [13.096557],
       [37.120853]], dtype=float32)}
End of Prediction!
available logs: {}

有了这些——您可以自定义行为、设置监控或以其他方式改变训练、评估或推理的过程。 子类化的替代方法是使用 LambdaCallback.

使用 LambaCallback

Keras 中的内置回调之一是 LambdaCallback 班级。 此回调接受一个定义其行为方式和作用的函数! 从某种意义上说,它允许您使用任意函数作为回调,从而允许您创建自定义回调。

该类具有可选参数:
on_epoch_begin

  • on_epoch_end
  • on_batch_begin
  • on_batch_end
  • on_train_begin
  • on_train_end

每个参数接受 一个函数 在各自的模型事件中调用。 例如,让我们进行回调以在模型完成训练时发送电子邮件:

import smtplib
from email.message import EmailMessage

def send_email(logs): 
    msg = EmailMessage()
    content = f"""The model has finished training."""
    for key, value in logs.items():
      content = content + f"n{key}:{value:.2f}"
    msg.set_content(content)
    msg['Subject'] = f'Training report'
    msg['From'] = '[email protected]'
    msg['To'] = 'receiver-email'

    s = smtplib.SMTP('smtp.gmail.com', 587)
    s.starttls()
    s.login("[email protected]", "your-gmail-app-password")
    s.send_message(msg)
    s.quit()

lambda_send_email = lambda logs : send_email(logs)

email_callback = keras.callbacks.LambdaCallback(on_train_end = lambda_send_email)

model.fit(
    x_train,
    y_train,
    batch_size=100,
    epochs=1,
    verbose=0,
    callbacks=[email_callback],
)

使用我们的自定义回调 LambdaCallback,我们只需要实现我们想要调用的函数,把它包装成 lambda 函数并将其传递给
LambdaCallback 类作为参数。

可视化模型训练的回调

在本节中,我们将给出一个自定义回调的示例,该回调使我们的模型在训练过程中的性能得到改善。 为了做到这一点,我们将日志的值存储在每个批次的末尾。 然后,在训练循环结束时,我们使用 matplotlib.

为了增强可视化,损失和指标将以对数比例绘制:

import matplotlib.pyplot as plt
import numpy as np
from matplotlib.animation import FuncAnimation
from IPython import display

class TrainingAnimationCallback(keras.callbacks.Callback):
    def __init__(self, duration = 40, fps = 1000/25):
        self.duration = duration
        self.fps = fps
        self.logs_history = []

    def set_plot(self):   
        self.figure = plt.figure()
        
        plt.xticks(
            range(0,self.params['steps']*self.params['epochs'], self.params['steps']),
            range(0,self.params['epochs']))
        plt.xlabel('Epoch')
        plt.ylabel('Loss & Metrics ($Log_{10}$ scale)')

        self.plot = {}
        for metric in self.model.metrics_names:
          self.plot[metric], = plt.plot([],[], label = metric)
          
        max_y = [max(log.values()) for log in self.logs_history]
        
        self.title = plt.title(f'batches:0')
        plt.xlim(0,len(self.logs_history)) 
        plt.ylim(0,max(max_y))

           
        plt.legend(loc='upper right')
  
    def animation_function(self,frame):
        batch = frame % self.params['steps']
        self.title.set_text(f'batch:{batch}')
        x = list(range(frame))
        
        for metric in self.model.metrics_names:
            y = [log[metric] for log in self.logs_history[:frame]]
            self.plot[metric].set_data(x,y)
        
    def on_train_batch_end(self, batch, logs=None):
        logarithm_transform = lambda item: (item[0], np.log(item[1]))
        logs = dict(map(logarithm_transform,logs.items()))
        self.logs_history.append(logs)
       
    def on_train_end(self, logs=None):
        self.set_plot()
        num_frames = int(self.duration*self.fps)
        num_batches = self.params['steps']*self.params['epochs']
        selected_batches = range(0, num_batches , num_batches//num_frames )
        interval = 1000*(1/self.fps)
        anim_created = FuncAnimation(self.figure, 
                                     self.animation_function,
                                     frames=selected_batches,
                                     interval=interval)
        video = anim_created.to_html5_video()
        
        html = display.HTML(video)
        display.display(html)
        plt.close()

我们将使用与之前相同的模型,但训练样本更多:

import tensorflow as tf
from tensorflow import keras
import numpy as np

model = keras.Sequential()
model.add(keras.layers.Dense(10, input_dim = 1, activation='relu'))
model.add(keras.layers.Dense(10, activation='relu'))
model.add(keras.layers.Dense(1))
model.compile(
    optimizer=keras.optimizers.RMSprop(learning_rate=0.1),
    loss = "mean_squared_error",
    metrics = ["mean_absolute_error"]
)

def create_sample(sample_size, train_test_proportion = 0.9):
    x = np.random.uniform(low = 0, high = 10, size = sample_size)
    y = x**2
    train_test_split = int(sample_size*train_test_proportion)
    x_train, x_test = (x[:train_test_split],x[train_test_split:])
    y_train, y_test = (y[:train_test_split],y[train_test_split:])
    return (x_train,x_test,y_train,y_test)

x_train,x_test,y_train,y_test = create_sample(35200)


model.fit(
    x_train,
    y_train,
    batch_size=32,
    epochs=2,
    verbose=0,
    callbacks=[TrainingAnimationCallback()],
)

我们的输出是指标和损失函数在训练过程中发生变化的动画:

您的浏览器不支持HTML视频。

结论

在本指南中,我们了解了 Keras 中自定义回调的实现。
实现自定义回调有两种选择——通过子类化 keras.callbacks.Callback 类,或通过使用 keras.callbacks.LambdaCallback 类。

我们已经看到一个实际的例子使用 LambdaCallback用于在训练循环结束时发送一封电子邮件,以及一个子类化的示例 Callback 创建训练循环动画的类。

尽管 Keras 有许多内置回调,但了解如何实现自定义回调对于更具体的应用程序可能很有用。

时间戳记:

更多来自 堆栈滥用