【翻译自 :Adding A Custom Attention Layer To Recurrent Neural Network In Keras】
【说明:Jason BrownleePhD大神的文章个人很喜欢,所以闲暇时间里会做一点翻译和学习实践的工作,这里是相应工作的实践记录,希望能帮到有需要的人!】
深度学习网络在过去几年中广受欢迎。 “注意力机制”与深度学习网络相结合以提高其性能。 向网络添加注意力组件在机器翻译、图像识别、文本摘要和类似应用等任务中显示出显着改进。
本教程展示了如何向使用循环神经网络构建的网络添加自定义注意层。 我们将使用一个非常简单的数据集来说明时间序列预测的端到端应用。 本教程专为希望基本了解如何将用户定义的层添加到深度学习网络并使用这个简单示例构建更复杂的应用程序的任何人而设计。
完成本教程后,您将了解:
在 Keras 中创建自定义注意力层需要哪些方法如何将新层合并到用 SimpleRNN 构建的网络中
教程概述
本教程分为三个部分; 他们是:
为时间序列预测准备一个简单的数据集如何使用通过 SimpleRNN 构建的网络进行时间序列预测向 SimpleRNN 网络添加自定义注意力层
先决条件
假设您熟悉以下主题。 您可以单击下面的链接进行概述。
什么是注意力?从零开始的注意力机制RNN 简介以及为它们提供动力的数学了解 Keras 中的简单循环神经网络
数据集
本文的重点是基本了解如何为深度学习网络构建自定义注意力层。 为此,我们将使用一个非常简单的斐波那契数列示例,其中一个数字由前两个数字构成。 序列的前 10 个数字如下所示:
0, 1, 1, 2, 3, 5, 8, 13, 21, 34, ...
当给定之前的“t”数字时,我们能否让机器准确地重建下一个数字? 这意味着丢弃除最后两个之外的所有先前输入并对最后两个数字执行正确的操作。
在本教程中,我们将从 t 个时间步构建训练示例,并使用 t+1 处的值作为目标。 例如,如果 t=3,则训练示例和相应的目标值将如下所示:
SimpleRNN 网络
在本节中,我们将编写基本代码来生成数据集并使用 SimpleRNN 网络来预测斐波那契数列的下一个数字。
输入部分
我们先写导入部分:
from pandas import read_csvimport numpy as npfrom keras import Modelfrom keras.layers import Layerimport keras.backend as Kfrom keras.layers import Input, Dense, SimpleRNNfrom sklearn.preprocessing import MinMaxScalerfrom keras.models import Sequentialfrom keras.metrics import mean_squared_error
准备数据集
下面的函数生成一个由 n 个斐波那契数组成的序列(不计算起始的两个值)。 如果 scale_data 设置为 True,那么它还会使用 scikit-learn 中的 MinMaxScaler 来缩放 0 和 1 之间的值。让我们看看 n=10 时的输出。
def get_fib_seq(n, scale_data=True):# Get the Fibonacci sequenceseq = np.zeros(n)fib_n1 = 0.0fib_n = 1.0 for i in range(n):seq[i] = fib_n1 + fib_nfib_n1 = fib_nfib_n = seq[i] scaler = []if scale_data:scaler = MinMaxScaler(feature_range=(0, 1))seq = np.reshape(seq, (n, 1))seq = scaler.fit_transform(seq).flatten() return seq, scalerfib_seq = get_fib_seq(10, False)[0]print(fib_seq)[ 1. 2. 3. 5. 8. 13. 21. 34. 55. 89.]
接下来,我们需要一个函数 get_fib_XY() 将序列重新格式化为训练示例和 Keras 输入层使用的目标值。 当给定 time_steps 作为参数时,get_fib_XY() 用 time_steps 列数构造数据集的每一行。 该函数不仅从斐波那契数列构建训练集和测试集,还将训练样本打乱并重塑为所需的 TensorFlow 格式,即 total_samples x time_steps x features。 此外,如果 scale_data 设置为 True,该函数将返回缩放值的缩放器对象。
让我们生成一个小的训练集,看看它是什么样的。 我们已经设置了 time_steps=3,total_fib_numbers=12,大约 70% 的例子都朝向测试点。 请注意,训练和测试示例已被 permutation() 函数打乱。
def get_fib_XY(total_fib_numbers, time_steps, train_percent, scale_data=True):dat, scaler = get_fib_seq(total_fib_numbers, scale_data) Y_ind = np.arange(time_steps, len(dat), 1)Y = dat[Y_ind]rows_x = len(Y)X = dat[0:rows_x]for i in range(time_steps-1):temp = dat[i+1:rows_x+i+1]X = np.column_stack((X, temp))# random permutation with fixed seed rand = np.random.RandomState(seed=13)idx = rand.permutation(rows_x)split = int(train_percent*rows_x)train_ind = idx[0:split]test_ind = idx[split:]trainX = X[train_ind]trainY = Y[train_ind]testX = X[test_ind]testY = Y[test_ind]trainX = np.reshape(trainX, (len(trainX), time_steps, 1)) testX = np.reshape(testX, (len(testX), time_steps, 1))return trainX, trainY, testX, testY, scalertrainX, trainY, testX, testY, scaler = get_fib_XY(12, 3, 0.7, False)print('trainX = ', trainX)print('trainY = ', trainY)trainX = [[[ 8.][13.][21.]][[ 5.][ 8.][13.]][[ 2.][ 3.][ 5.]][[13.][21.][34.]][[21.][34.][55.]][[34.][55.][89.]]]trainY = [ 34. 21. 8. 55. 89. 144.]
设置网络
现在让我们建立一个两层的小型网络。 第一个是 SimpleRNN 层,第二个是 Dense 层。 以下是该模型的摘要。
# Set up parameterstime_steps = 20hidden_units = 2epochs = 30# Create a traditional RNN networkdef create_RNN(hidden_units, dense_units, input_shape, activation):model = Sequential()model.add(SimpleRNN(hidden_units, input_shape=input_shape, activation=activation[0]))model.add(Dense(units=dense_units, activation=activation[1]))pile(loss='mse', optimizer='adam')return modelmodel_RNN = create_RNN(hidden_units=hidden_units, dense_units=1, input_shape=(time_steps,1), activation=['tanh', 'tanh'])model_RNN.summary()Model: "sequential_1"_________________________________________________________________Layer (type) Output Shape Param # =================================================================simple_rnn_3 (SimpleRNN)(None, 2) 8 _________________________________________________________________dense_3 (Dense) (None, 1) 3 =================================================================Total params: 11Trainable params: 11Non-trainable params: 0
训练网络并评估
下一步是添加生成数据集、训练网络并对其进行评估的代码。 这一次,我们将在 0 和 1 之间缩放数据。我们不需要传递 scale_data 参数,因为它的默认值为 True。
# Generate the datasettrainX, trainY, testX, testY, scaler = get_fib_XY(1200, time_steps, 0.7)model_RNN.fit(trainX, trainY, epochs=epochs, batch_size=1, verbose=2)# Evalute modeltrain_mse = model_RNN.evaluate(trainX, trainY)test_mse = model_RNN.evaluate(testX, testY)# Print errorprint("Train set MSE = ", train_mse)print("Test set MSE = ", test_mse)
作为输出,您将看到训练进度和以下均方误差值:
Train set MSE = 5.631405292660929e-05Test set MSE = 2.62349731388e-05
向网络添加自定义注意力层
在 Keras 中,通过继承 Layer 类很容易创建一个实现注意力的自定义层。 Keras 指南列出了通过子类创建新层的清晰步骤。我们将在这里使用这些指南。单个层对应的所有权重和偏差都被这个类封装。我们需要编写 __init__ 方法并覆盖以下方法:
build():一旦知道输入的大小,Keras 指南建议在此方法中添加权重。这种方法“懒惰”地创建权重。内置函数 add_weight() 可用于添加注意力层的权重和偏差。
call():call() 方法实现了输入到输出的映射。它应该在训练期间实施前向传递。
注意层的调用方法
注意层的调用方法必须计算对齐分数、权重和上下文。您可以在 Stefania 关于从零开始的注意力机制的优秀文章中详细了解这些参数。我们将在 call() 方法中实现 Bahdanau attention。从 Keras Layer 类继承层并通过 add_weights() 方法添加权重的好处是权重会自动调整。 Keras 对 call() 方法的操作/计算进行了等效的“逆向工程”,并在训练期间计算梯度。添加权重时指定 trainable=True 很重要。您还可以将 train_step() 方法添加到您的自定义层,并根据需要指定您自己的重量训练方法。
下面的代码实现了我们自定义的注意力层。
# Add attention layer to the deep learning networkclass attention(Layer):def __init__(self,**kwargs):super(attention,self).__init__(**kwargs)def build(self,input_shape):self.W=self.add_weight(name='attention_weight', shape=(input_shape[-1],1), initializer='random_normal', trainable=True)self.b=self.add_weight(name='attention_bias', shape=(input_shape[1],1), initializer='zeros', trainable=True) super(attention, self).build(input_shape)def call(self,x):# Alignment scores. Pass them through tanh functione = K.tanh(K.dot(x,self.W)+self.b)# Remove dimension of size 1e = K.squeeze(e, axis=-1) # Compute the weightsalpha = K.softmax(e)# Reshape to tensorFlow formatalpha = K.expand_dims(alpha, axis=-1)# Compute the context vectorcontext = x * alphacontext = K.sum(context, axis=1)return context
带有注意力层的 RNN 网络
现在让我们为之前创建的 RNN 网络添加一个注意力层。 函数 create_RNN_with_attention() 现在指定网络中的 RNN 层、注意力层和密集层。 确保在指定 SimpleRNN 时设置 return_sequences=True。 这将返回所有先前时间步的隐藏单元的输出。
让我们仔细看一下我们模型的摘要。
def create_RNN_with_attention(hidden_units, dense_units, input_shape, activation):x=Input(shape=input_shape)RNN_layer = SimpleRNN(hidden_units, return_sequences=True, activation=activation)(x)attention_layer = attention()(RNN_layer)outputs=Dense(dense_units, trainable=True, activation=activation)(attention_layer)model=Model(x,outputs)pile(loss='mse', optimizer='adam') return model model_attention = create_RNN_with_attention(hidden_units=hidden_units, dense_units=1, input_shape=(time_steps,1), activation='tanh')model_attention.summary()Model: "model_1"_________________________________________________________________Layer (type) Output Shape Param # =================================================================input_2 (InputLayer) [(None, 20, 1)] 0 _________________________________________________________________simple_rnn_2 (SimpleRNN)(None, 20, 2) 8 _________________________________________________________________attention_1 (attention)(None, 2) 22 _________________________________________________________________dense_2 (Dense) (None, 1) 3 =================================================================Total params: 33Trainable params: 33Non-trainable params: 0_________________________________________________________________
用注意力训练和评估深度学习网络
是时候训练和测试我们的模型了,看看它在预测序列的下一个斐波那契数时表现如何。
model_attention.fit(trainX, trainY, epochs=epochs, batch_size=1, verbose=2)# Evalute modeltrain_mse_attn = model_attention.evaluate(trainX, trainY)test_mse_attn = model_attention.evaluate(testX, testY)# Print errorprint("Train set MSE with attention = ", train_mse_attn)print("Test set MSE with attention = ", test_mse_attn)Train set MSE with attention = 5.3511179430643097e-05Test set MSE with attention = 9.053358553501312e-06
我们可以看到,即使对于这个简单的例子,测试集上的均方误差随着注意力层的增加而降低。 您可以通过超参数调整和模型选择获得更好的结果。 一定要在更复杂的问题上尝试一下,并向网络添加更多层。 您还可以使用缩放器对象将数字缩放回其原始值。
您可以通过使用 LSTM 而不是 SimpleRNN 将这个示例更进一步,或者您可以通过卷积和池化层构建网络。 如果您愿意,您也可以将其更改为编码器解码器网络。
完整代码如下:
# Prepare datadef get_fib_seq(n, scale_data=True):# Get the Fibonacci sequenceseq = np.zeros(n)fib_n1 = 0.0fib_n = 1.0 for i in range(n):seq[i] = fib_n1 + fib_nfib_n1 = fib_nfib_n = seq[i] scaler = []if scale_data:scaler = MinMaxScaler(feature_range=(0, 1))seq = np.reshape(seq, (n, 1))seq = scaler.fit_transform(seq).flatten() return seq, scalerdef get_fib_XY(total_fib_numbers, time_steps, train_percent, scale_data=True):dat, scaler = get_fib_seq(total_fib_numbers, scale_data) Y_ind = np.arange(time_steps, len(dat), 1)Y = dat[Y_ind]rows_x = len(Y)X = dat[0:rows_x]for i in range(time_steps-1):temp = dat[i+1:rows_x+i+1]X = np.column_stack((X, temp))# random permutation with fixed seed rand = np.random.RandomState(seed=13)idx = rand.permutation(rows_x)split = int(train_percent*rows_x)train_ind = idx[0:split]test_ind = idx[split:]trainX = X[train_ind]trainY = Y[train_ind]testX = X[test_ind]testY = Y[test_ind]trainX = np.reshape(trainX, (len(trainX), time_steps, 1)) testX = np.reshape(testX, (len(testX), time_steps, 1))return trainX, trainY, testX, testY, scaler# Set up parameterstime_steps = 20hidden_units = 2epochs = 30# Create a traditional RNN networkdef create_RNN(hidden_units, dense_units, input_shape, activation):model = Sequential()model.add(SimpleRNN(hidden_units, input_shape=input_shape, activation=activation[0]))model.add(Dense(units=dense_units, activation=activation[1]))pile(loss='mse', optimizer='adam')return modelmodel_RNN = create_RNN(hidden_units=hidden_units, dense_units=1, input_shape=(time_steps,1), activation=['tanh', 'tanh'])# Generate the dataset for the networktrainX, trainY, testX, testY, scaler = get_fib_XY(1200, time_steps, 0.7)# Train the networkmodel_RNN.fit(trainX, trainY, epochs=epochs, batch_size=1, verbose=2)# Evalute modeltrain_mse = model_RNN.evaluate(trainX, trainY)test_mse = model_RNN.evaluate(testX, testY)# Print errorprint("Train set MSE = ", train_mse)print("Test set MSE = ", test_mse)# Add attention layer to the deep learning networkclass attention(Layer):def __init__(self,**kwargs):super(attention,self).__init__(**kwargs)def build(self,input_shape):self.W=self.add_weight(name='attention_weight', shape=(input_shape[-1],1), initializer='random_normal', trainable=True)self.b=self.add_weight(name='attention_bias', shape=(input_shape[1],1), initializer='zeros', trainable=True) super(attention, self).build(input_shape)def call(self,x):# Alignment scores. Pass them through tanh functione = K.tanh(K.dot(x,self.W)+self.b)# Remove dimension of size 1e = K.squeeze(e, axis=-1) # Compute the weightsalpha = K.softmax(e)# Reshape to tensorFlow formatalpha = K.expand_dims(alpha, axis=-1)# Compute the context vectorcontext = x * alphacontext = K.sum(context, axis=1)return contextdef create_RNN_with_attention(hidden_units, dense_units, input_shape, activation):x=Input(shape=input_shape)RNN_layer = SimpleRNN(hidden_units, return_sequences=True, activation=activation)(x)attention_layer = attention()(RNN_layer)outputs=Dense(dense_units, trainable=True, activation=activation)(attention_layer)model=Model(x,outputs)pile(loss='mse', optimizer='adam') return model # Create the model with attention, train and evaluatemodel_attention = create_RNN_with_attention(hidden_units=hidden_units, dense_units=1, input_shape=(time_steps,1), activation='tanh')model_attention.summary() model_attention.fit(trainX, trainY, epochs=epochs, batch_size=1, verbose=2)# Evalute modeltrain_mse_attn = model_attention.evaluate(trainX, trainY)test_mse_attn = model_attention.evaluate(testX, testY)# Print errorprint("Train set MSE with attention = ", train_mse_attn)print("Test set MSE with attention = ", test_mse_attn)