200字范文,内容丰富有趣,生活中的好帮手!
200字范文 > 【PyTorch】深度学习实践之RNN高级篇—实现分类

【PyTorch】深度学习实践之RNN高级篇—实现分类

时间:2018-08-30 18:24:40

相关推荐

【PyTorch】深度学习实践之RNN高级篇—实现分类

本文目录

1. RNN分类器2. 分类器实现准备数据准备模型 双向RNN/LSTM/GRUforword过程名字转换tensor训练过程测试过程 3. 完整代码代码:结果:保存模型预测代码: 课后练习:电影评论情感分析代码:结果: 学习资料系列文章索引

1. RNN分类器

数据集:

数据集里有人名和对应的国家,我们需要训练一个模型,输入一个新的名字,模型能预测出这个名字是基于哪种语言的(18种不同的语言,18分类)。

在自然语言处理中,通常的方式:

先把词或字变成一个one-hot向量,one-hot向量维度高,而且过于稀疏,所以一般来说先通过嵌入层(embed)把one-hot向量转化成低维的稠密向量。然后经过RNN层,隐层的输出不一定和最终要求的目标一致,所以要用一个线性层把输出映射成和我们的要求一致。

在上面分类器中,要求输出一个大的分类(名字属于哪一个分类),所以对于最上面部分的输出是没有要求的,也就是说并不要求所有隐藏的输出做线性的变换,而且并不知道输出的结果是什么,所以为了解决这一个问题,网络是可以变得更加简单的。

只需要输出最终的隐藏状态,然后最终的隐藏状态接一个线性层,然后分成18个类别。这样可以实现名字分类的任务。本节主要学习的就是处理自然语言过程的方法和流程。

整体的使用模型结构:

*

比如,Maclean这个名字,我们其实得到的是一个序列M a c l e a n,每一个字符其实就是x1,x2,x3,x4。依次类推,所以看上去我们只是名字一个字段,实际上它是一个序列,而且还有问题就是序列的长短是不一样的,所以我们还要思考序列长度不一致的问题。

模型处理过程:

2. 分类器实现

主体代码:

if __name__ == '__main__':# N_CHARS:序列的字符数 HIDDEN_SIZE:隐藏层尺寸 N_COUNTRY:国家的类别数 N_LAYES:GRU的层数classifier = RNNClassication(N_CHARS,HIDDEN_SIZE,N_COUNTRY,N_LAYES)# 使用GPUif USE_GPU:device = torch.device("cuda:0")model.to(device)# 优化器和损失函数criterion = nn.CrossEntropyLoss()optimizer = torch.optim.Adam(model.parameters(),lr = 0.001)# 计时start = time.time()print("Trainginng for {} epochs...".format(N_EPOCHS))acc_list = []for epoch in range(1,N_EPOCHS):# 训练trainModel()# 测试acc = testModel()acc_list.append(acc)

# 计时模块def time_since(since):s = time.time() - sincem = math.floor(s/60)s -= m*60return "{} {}".format(m,s)

# 绘图模块epoch = np.arange(1,len(acc_list)+1,1)acc_list = np.array(acc_list)plt.plot(epoch,acc_list)plt.xlabel("Epoch")plt.ylabel("Accuracy")plt.grid()plt.show()

准备数据

名字处理

字符串先转变成序列,转成字符列表,列表里面的每一个数就是名字里面的每一个字符。再做词典,用ASCII表,ASCII表是128个字符,我们把字典长度设置成128,求每一个字符对应的ASCII值,拼成我们想要的序列。上图中的最右表中每一个数并不是一个数字,而是一个one-hot向量。例如77,就是一个128维的向量,第77个数的值为1,其他的值都是0。对于Embedding(嵌入层)来说,只要告诉嵌入层第几个维度是1就行了,所以只需要把ASCII值放在这就行了。

数据对齐,使得数据的长度一致,找出最长的,然后其余的补0。做完填充之后,可以保证构成一个张量。

国家处理

将国家转变成一个分类索引

# 准备数据集class NameDataset(Dataset):def __init__(self,is_train_set=True):filename = 'names_train.csv.gz' if is_train_set else 'names_test.csv.gz'with gzip.open(filename,'rt') as f:# 使用gzip读取gz文件reader = csv.reader(f) # 使用csv读取里面内容rows = list(reader) # 存到列表中,每一个都是元组(名字,国家)self.names = [row[0] for row in rows] # 将名字存到列表中self.len = len(self.name)# 记录长度self.countries = [row[1] for row in rows] # 将国家存到列表中self.countries_list = list(sorted(set(self.countries))) # 去重 排序 再存到列表self.countries_dict = self.getCountryDict() # 设置一个字典进行查找国家self.countries_num = len(self.countries_list) # 国家的类别数def __getitem__(self, item):return self.names[item],self.countries_dict[self.countries[item]] # 字典,key是国家名,value是indexdef __len__(self):return self.lendef getCountryDict(self): # 构造国家的查询字典country_dict = dict()for idx,country_name in enumerate(self.countries_list,0):country_dict[country_name] = idxreturn country_dictdef idx2country(self,index): # 根据索引返回国家的字符串return self.countries_list[index]def getCountriesNum(self): # 返回国家的数量return self.countries_num

注意:上述代码读取数据集为什么不用Numpy?因为读取数据集有很多种方式,如果是pickle/HDFS/HD5类型的数据,要用相应的包。

trainset = NameDataset(is_train_set=True)trainloader = DataLoader(dataset=trainset,batch_size=BATCH_SIZE,shuffle=True)testset = NameDataset(is_train_set=False)testloader = DataLoader(dataset=testset,batch_size=BATCH_SIZE,shuffle=False)N_COUNTRY = trainset.getCountriesNum() # 决定模型最终输出的维度大小

准备模型

GRU相关的参数:hidden_size和n_layers:

Embedding层的输入、输出维度:

GRU的输入、输出维度:

其中,bidrectional是双向循环神经网络

# 模型class RNNClassication(nn.Module):def __init__(self,input_size,hidden_size,output_size,n_layers=1,bidirectional = True):super(RNNClassication, self).__init__()self.hidden_size = hidden_size # GRU layerself.n_layers = n_layers # GRU layerself.n_directions = 2 if bidirectional else 1 #What is the Bi-Direction RNN/LSTM/GRU?# The input of Embedding Layer with shape:𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒# The output of Embedding Layer with shape:𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒self.embedding = torch.nn.Embedding(input_size,hidden_size)# The inputs of GRU Layer with shape:# 𝑖𝑛𝑝𝑢𝑡: 𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒# ℎ𝑖𝑑𝑑𝑒𝑛: 𝑛𝐿𝑎𝑦𝑒𝑟𝑠 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒# The outputs of GRU Layer with shape:# 𝑜𝑢𝑡𝑝𝑢𝑡: 𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠# ℎ𝑖𝑑𝑑𝑒𝑛: 𝑛𝐿𝑎𝑦𝑒𝑟𝑠 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒self.gru = torch.nn.GRU(hidden_size,hidden_size,n_layers,bidirectional=bidirectional) # 输入输出都是hidden_sizeself.fc = torch.nn.Linear(hidden_size*self.n_directions,output_size)def __init_hidden(self,batch_size): # 创建全0的初始的隐层hidden = torch.zeros(self.n_layers*self.n_directions,batch_size,self.hidden_size)return create_tensor(hidden)

双向RNN/LSTM/GRU

同一层共享权重矩阵,所以这里的w和b是一致的。

上图的情况x_(N-1)只包含过去的信息,但是有时候在NLP中也需要考虑未来的信息,这样就出现了双向循环神经网络。

正向计算一次隐层,反向也计算一次隐层,然后把两者的计算结果进行拼接。

注意:这里的backward不是反向传播计算梯度的那种backwrad,这里只是指反方向进行计算隐层。

输出的是out和hidden两部分,out指上面的全部,而hidden指的是正向末端输出和反向末端输出。

双向循环神经网络,可以发现输出的维度是之前的两份。

forword过程

首先,对数据进行转置。转置后数据变成(seqlen,batchsize),即embed层需要的数据。

为了提供运行的效率,GRU提供一种提速的方法,尤其是面对序列长短不一的情况,因为填充上0其实是没有运算意义。pytorch提供了提高运算效率的工具pack_padded_sequence(),参数一个是embedding,一个是seq_lengths,seq_lengths是forward的参数,在运行的时候需要给定。

阴影部分都是从0转变过去的,应该都是一致的,假设是0.11。

利用这行代码gru_input = pack_padded_sequence(embedding, seq_lengths),输入输出如下:

直接把左侧非0的列排到右侧,把填充的0去掉,GRU可以处理长短不一的数据序列(数据长度保存),但是不能使用打包函数,想要打包的话,必须按照长度降序排列。降序排列如下图:

排好序(其实在组织数据的时候,就需要保证数据是按照序列长度进行排序的)之后,我们在经过嵌入层,得到padding之后的一组数据。

得到数据之后,可以进行罗列了。

打包后的data交给gru之后,gru就可以根据batchsize决定第一个时刻可以取多少行,第二个时刻取多少行,第三个时候取多少,依此类推,这样的工作效率就高了。

pack_padded_sequence(embedding, seq_lengths)对于RNN、GRU以及LSTM都是可以接受的。

代码:

class RNNClassifier(torch.nn.Module):def forward(self,input,seq_lenghts):# input shape: B x S -> S x Binput = input.t() # 转置,因为嵌入层需要 S x Bbatch_size = input.size(1) # 记录batchsizehidden = self.__init_hidden(batch_size) # 初始化hiddenembedding = self.embedding(input)# pack them upgru_input = pack_padded_sequence(embedding,seq_lenghts)# 双向循环神经网络,hidden是由正向和反向两部分构成的,所以需要做一个拼接。output,hidden = self.gru(gru_input,hidden)if self.n_directions == 2:hidden_cat = torch.cat([hidden[-1],hidden[-2]],dim=1)else:hidden_cat = hidden[-1]fc_output = self.fc(hidden_cat) #使用linear分类器return fc_output

名字转换tensor

编码

填充

转置

排序

代码:

def make_tensors(names,countries):sequences_and_lengths = [name2list(name) for name in names] # 每一个名字都变成ASCII列表name_sequences = [sl[0] for sl in sequences_and_lengths]# 因为name2list既返回了名字的列表也返回了名字的长度seq_lengths = torch.LongTensor(sl[1] for sl in sequences_and_lengths)countries = countries.long() # 从数据集里面拿出来的countries就是一个整数,将其转变为long# 接下来是做padding的过程,先做一个全0的张量,然后再把原先的名字张量粘贴过去seq_tensor = torch.zeros(len(name_sequences),seq_lengths.max()).long()for idx,(seq,seq_len) in enumerate(zip(name_sequences,seq_lengths),0):seq_tensor[idx,:seq_len] = torch.LongTensor(seq)# 排序(按照序列的长度)seq_lengths,perm_idx = seq_lengths.sort(dim=0,descending=True) # 返回排序后的长度和索引seq_tensor = seq_tensor[perm_idx]countries = countries[perm_idx]return create_tensor(seq_tensor),create_tensor(seq_lengths),create_tensor(countries)def create_tensor(tensor):if USE_GPU:device = torch.device("cuda:0")tensor = tensor.to(device)return tensor

def name2list(name):arr = [ord(c) for c in name]return arr, len(arr)

训练过程

# 训练模块def trainModel():total_loss = 0for i, (names, countries) in enumerate(trainloader, 1):inputs, seq_lengths, target = make_tensors(names, countries) # 返回输入矩阵,序列的长度以及标签# 1.forword output of modeloutputs = classifier(inputs, seq_lengths)# 2.forword lossloss = criterion(outputs, target)# 3. zero gradoptimizer.zero_grad()# 4. backwardloss.backward()# 5. updateoptimizer.step()total_loss += loss.item()if i % 10 == 0:print("[{}] Epoch {}".format(time_since(start), epoch), end='')print(" {}/{}".format(i * len(inputs), len(trainset)), end='')print(" loss = {}".format(total_loss / (i * len(inputs))))# print(f'[{i * len(inputs)}/{len(trainset)}]', end='')# print(f'loss={total_loss / (i * len(inputs))}')return total_loss

测试过程

# 测试模块def testModel():correct = 0total = len(testset)print("evaluting trained model...")with torch.no_grad():for i, (names, countries) in enumerate(testloader, 1):inputs, seq_lengths, target = make_tensors(names, countries)output = classifier(inputs, seq_lengths)pred = output.max(dim=1, keepdim=True)[1] # dim=1寻找每一行的最大值,keepdim 表示是否需要保持输出的维度与输入一样,[1]取得是最大值的索引correct += pred.eq(target.view_as(pred)).sum().item()percent = "{:.2f}".format(100 * correct / total)print("Test set:Accuracy {}/{} {}%".format(correct, total, percent))# percent = '%.2f' % (100 * correct / total)# print(f'test set: accuracy {correct}/{total}\n{percent}%')return correct / total

3. 完整代码

代码:

# 实现一个RNN分类器,输入一个名字,输出这个名字属于哪一个国家的,总共18个分类import mathimport timeimport numpy as npimport torchimport matplotlib.pyplot as pltimport torch.nn as nnimport gzipimport csvfrom torch.nn.utils.rnn import pack_padded_sequencefrom torch.utils.data import Dataset, DataLoader# 超参数的设置# 隐藏层的维度BATCH_SIZE = 256HIDDEN_SIZE = 100# RNN的层数N_LAYER = 2# 字符长度,也就是输入的维度N_CHARS = 128# 训练的轮数,暂定100轮N_EPOCHS = 10# 是否使用GPUUSE_GPU = Trueres = []is_train_set = True# 准备数据集class NameDataset(Dataset):def __init__(self, is_train_set=True):filename = 'names_train.csv.gz' if is_train_set else 'names_test.csv.gz'with gzip.open(filename, 'rt') as f: # 使用gzip读取gz文件reader = csv.reader(f) # 使用csv读取里面内容rows = list(reader) # 存到列表中,每一个都是元组(名字,国家)self.names = [row[0] for row in rows] # 将名字存到列表中self.length = len(self.names) # 记录长度self.countries = [row[1] for row in rows] # 将国家存到列表中self.country_list = list(sorted(set(self.countries))) # 去重 排序 再存到列表self.country_dict = self.getCountryDict() # 设置一个字典进行查找国家self.country_num = len(self.country_list) # 国家的类别数def __getitem__(self, index):return self.names[index], self.country_dict[self.countries[index]] # 字典,key是国家名,value是indexdef __len__(self):return self.lengthdef getCountryDict(self): # 构造国家的查询字典country_dict = {}for idx, country_name in enumerate(self.country_list, 0):country_dict[country_name] = idxreturn country_dict#def idx2country(self, index): # 根据索引返回国家的字符串return self.countries_list[index]def getCountryNum(self): # 返回国家的数量return self.country_numtrainset = NameDataset(is_train_set=True)trainloader = DataLoader(dataset=trainset, batch_size=BATCH_SIZE, shuffle=True)testset = NameDataset(is_train_set=False)testloader = DataLoader(dataset=testset, batch_size=BATCH_SIZE, shuffle=False)N_COUNTRY = trainset.getCountryNum() # 决定模型最终输出的维度大小# 模型class RNNClassication(nn.Module):def __init__(self, input_size, hidden_size, output_size, n_layers=1, bidirectional=True):super(RNNClassication, self).__init__()self.hidden_size = hidden_size # GRU layerself.n_layers = n_layers # GRU layerself.n_directions = 2 if bidirectional else 1 # What is the Bi-Direction RNN/LSTM/GRU?# The input of Embedding Layer with shape:𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒# The output of Embedding Layer with shape:𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒self.embedding = torch.nn.Embedding(input_size, hidden_size)# The inputs of GRU Layer with shape:# 𝑖𝑛𝑝𝑢𝑡: 𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒# ℎ𝑖𝑑𝑑𝑒𝑛: 𝑛𝐿𝑎𝑦𝑒𝑟𝑠 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒# The outputs of GRU Layer with shape:# 𝑜𝑢𝑡𝑝𝑢𝑡: 𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠# ℎ𝑖𝑑𝑑𝑒𝑛: 𝑛𝐿𝑎𝑦𝑒𝑟𝑠 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒self.gru = torch.nn.GRU(hidden_size, hidden_size, n_layers, bidirectional=bidirectional) # 输入输出都是hidden_sizeself.fc = torch.nn.Linear(hidden_size * self.n_directions, output_size)def _init_hidden(self, batch_size): # 创建全0的初始的隐层hidden = torch.zeros(self.n_layers * self.n_directions, batch_size, self.hidden_size)return create_tensor(hidden)def forward(self, input, seq_lengths):# input shape: B x S -> S x Binput = input.t() # 转置,因为嵌入层需要 S x Bbatch_size = input.size(1) # 记录batchsizehidden = self._init_hidden(batch_size) # 初始化hiddenembedding = self.embedding(input)# pack them upgru_input = pack_padded_sequence(embedding, seq_lengths)output, hidden = self.gru(gru_input, hidden)if self.n_directions == 2:hidden_cat = torch.cat([hidden[-1], hidden[-2]], dim=1)else:hidden_cat = hidden[-1]fc_output = self.fc(hidden_cat)return fc_output# 创建训练所需要的张量方法def make_tensors(names, countries):sequences_and_lengths = [name2list(name=name) for name in names] # 每一个名字都变成ASCII列表name_sequences = [sl[0] for sl in sequences_and_lengths] # 因为name2list既返回了名字的列表也返回了名字的长度seq_lengths = torch.LongTensor([sl[1] for sl in sequences_and_lengths])countries = countries.long() # 从数据集里面拿出来的countries就是一个整数,将其转变为long# 接下来是做padding的过程,先做一个全0的张量,然后再把原先的名字张量粘贴过去seq_tensor = torch.zeros(len(name_sequences), seq_lengths.max()).long()for idx, (seq, seq_len) in enumerate(zip(name_sequences, seq_lengths), 0):seq_tensor[idx, :seq_len] = torch.LongTensor(seq)# 排序(按照序列的长度)seq_lengths, perm_idx = seq_lengths.sort(dim=0, descending=True) # 返回排序后的长度和索引seq_tensor = seq_tensor[perm_idx]countries = countries[perm_idx]return create_tensor(seq_tensor), create_tensor(seq_lengths), create_tensor(countries)def name2list(name): # 读出每个字符的ASCII码值arr = [ord(c) for c in name]return arr, len(arr)def create_tensor(tensor):if USE_GPU:device = torch.device("cuda:0")tensor = tensor.to(device)return tensor# 训练模块def trainModel():total_loss = 0for i, (names, countries) in enumerate(trainloader, 1):inputs, seq_lengths, target = make_tensors(names, countries) # 返回输入矩阵,序列的长度以及标签outputs = classifier(inputs, seq_lengths.to('cpu'))loss = criterion(outputs, target)optimizer.zero_grad()loss.backward()optimizer.step()total_loss += loss.item()if i % 10 == 0:print("[{}] Epoch {}".format(time_since(start), epoch), end='')print(" {}/{}".format(i * len(inputs), len(trainset)), end='')print(" loss = {}".format(total_loss / (i * len(inputs))))# print(f'[{i * len(inputs)}/{len(trainset)}]', end='')# print(f'loss={total_loss / (i * len(inputs))}')return total_loss# 测试模块def testModel():correct = 0total = len(testset)print("evaluting trained model...")with torch.no_grad():for i, (names, countries) in enumerate(testloader, 1):inputs, seq_lengths, target = make_tensors(names, countries)output = classifier(inputs, seq_lengths.to('cpu'))pred = output.max(dim=1, keepdim=True)[1] # dim=1寻找每一行的最大值,keepdim 表示是否需要保持输出的维度与输入一样,[1]取得是最大值的索引correct += pred.eq(target.view_as(pred)).sum().item()percent = "{:.2f}".format(100 * correct / total)print("Test set:Accuracy {}/{} {}%".format(correct, total, percent))# percent = '%.2f' % (100 * correct / total)# print(f'test set: accuracy {correct}/{total}\n{percent}%')return correct / total# 计时def time_since(since):s = time.time() - sincem = math.floor(s / 60)s -= m * 60return "{}m {:.0f}s".format(m, s)if __name__ == '__main__':# N_CHARS:序列的字符数 HIDDEN_SIZE:隐藏层尺寸 N_COUNTRY:国家的类别数 N_LAYES:GRU的层数classifier = RNNClassication(N_CHARS, HIDDEN_SIZE, N_COUNTRY, N_LAYER)# 使用GPUif USE_GPU:device = torch.device("cuda:0")classifier.to(device)# 优化器和损失函数criterion = nn.CrossEntropyLoss()optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)# 计时start = time.time()print("Trainginng for {} epochs...".format(N_EPOCHS))acc_list = []for epoch in range(1, N_EPOCHS):# 训练trainModel()# 测试acc = testModel()acc_list.append(acc)# print('acc_list: ', acc_list)# 绘图模块epoch = np.arange(1, len(acc_list) + 1, 1)acc_list = np.array(acc_list)plt.plot(epoch, acc_list)plt.xlabel("Epoch")plt.ylabel("Accuracy")plt.grid()plt.show()

结果:

保存模型

/weixin_40522801/article/details/106563354

torch.save(classifier.state_dict(), 'name_classifier_model.pt')

预测代码:

import mathimport timeimport numpy as npimport torchimport matplotlib.pyplot as pltimport torch.nn as nnimport gzipimport csvfrom torch.nn.utils.rnn import pack_padded_sequencefrom torch.utils.data import Dataset, DataLoaderclass NameDataset(Dataset):def __init__(self, is_train_set=True):filename = 'names_train.csv.gz' if is_train_set else 'names_test.csv.gz'with gzip.open(filename, 'rt') as f: # 使用gzip读取gz文件reader = csv.reader(f) # 使用csv读取里面内容rows = list(reader) # 存到列表中,每一个都是元组(名字,国家)self.names = [row[0] for row in rows] # 将名字存到列表中self.length = len(self.names) # 记录长度self.countries = [row[1] for row in rows] # 将国家存到列表中self.country_list = list(sorted(set(self.countries))) # 去重 排序 再存到列表self.country_dict = self.getCountryDict() # 设置一个字典进行查找国家self.country_num = len(self.country_list) # 国家的类别数def __getitem__(self, index):return self.names[index], self.country_dict[self.countries[index]] # 字典,key是国家名,value是indexdef __len__(self):return self.lengthdef getCountryDict(self): # 构造国家的查询字典country_dict = {}for idx, country_name in enumerate(self.country_list, 0):country_dict[country_name] = idxreturn country_dict#def idx2country(self, index): # 根据索引返回国家的字符串return self.country_list[index]def getCountryNum(self): # 返回国家的数量return self.country_numtrain_set = NameDataset(is_train_set=True)HIDDEN_SIZE = 100BATCH_SIZE = 256# RNN的层数N_LAYERS = 2# 字符长度,也就是输入的维度N_CHARS = 128# 获取国家数N_COUNTRY = train_set.getCountryNum() # 输出的类别数# 是否使用GPUUSE_GPU = False# 将所有的名字string转换成ASCII列表def name2list(name):# 将名字转换成ASCII标中对应的数字,并返回序列,以及序列长度arr = [ord(c) for c in name]return arr, len(arr)# 判断是否使用GPU的方法def create_tensor(tensor):if USE_GPU:device = torch.device('cuda:0')tensor = tensor.to(device)return tensor# 模型class RNNClassication(nn.Module):def __init__(self, input_size, hidden_size, output_size, n_layers=1, bidirectional=True):super(RNNClassication, self).__init__()self.hidden_size = hidden_size # GRU layerself.n_layers = n_layers # GRU layerself.n_directions = 2 if bidirectional else 1 # What is the Bi-Direction RNN/LSTM/GRU?# The input of Embedding Layer with shape:𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒# The output of Embedding Layer with shape:𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒self.embedding = torch.nn.Embedding(input_size, hidden_size)# The inputs of GRU Layer with shape:# 𝑖𝑛𝑝𝑢𝑡: 𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒# ℎ𝑖𝑑𝑑𝑒𝑛: 𝑛𝐿𝑎𝑦𝑒𝑟𝑠 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒# The outputs of GRU Layer with shape:# 𝑜𝑢𝑡𝑝𝑢𝑡: 𝑠𝑒𝑞𝐿𝑒𝑛, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠# ℎ𝑖𝑑𝑑𝑒𝑛: 𝑛𝐿𝑎𝑦𝑒𝑟𝑠 ∗ 𝑛𝐷𝑖𝑟𝑒𝑐𝑡𝑖𝑜𝑛𝑠, 𝑏𝑎𝑡𝑐ℎ𝑆𝑖𝑧𝑒, ℎ𝑖𝑑𝑑𝑒𝑛𝑆𝑖𝑧𝑒self.gru = torch.nn.GRU(hidden_size, hidden_size, n_layers, bidirectional=bidirectional) # 输入输出都是hidden_sizeself.fc = torch.nn.Linear(hidden_size * self.n_directions, output_size)def _init_hidden(self, batch_size): # 创建全0的初始的隐层hidden = torch.zeros(self.n_layers * self.n_directions, batch_size, self.hidden_size)return create_tensor(hidden)def forward(self, input, seq_lengths):# input shape: B x S -> S x Binput = input.t() # 转置,因为嵌入层需要 S x Bbatch_size = input.size(1) # 记录batchsizehidden = self._init_hidden(batch_size) # 初始化hiddenembedding = self.embedding(input)# pack them upgru_input = pack_padded_sequence(embedding, seq_lengths)output, hidden = self.gru(gru_input, hidden)if self.n_directions == 2:hidden_cat = torch.cat([hidden[-1], hidden[-2]], dim=1)else:hidden_cat = hidden[-1]fc_output = self.fc(hidden_cat)return fc_outputclassifier = RNNClassication(N_CHARS, HIDDEN_SIZE, N_COUNTRY, N_LAYERS)classifier.load_state_dict(torch.load('name_classifier_model.pt')) # 模型加载保存的权重def predict_country(name):# 同上,名字序列和长度,这里长度为1,因为输入的是单一的名字sequences_and_lengths = [name2list(name=name)]# 名字的序列映射name_sequences = [sequences_and_lengths[0][0]]# 序列的长度的张量seq_lengths = torch.LongTensor([sequences_and_lengths[0][1]])print("sequences_and_lengths:",sequences_and_lengths)# 创建序列的张量seq_tensor = torch.zeros(len(name_sequences), seq_lengths.max()).long()for idx, (seq, seq_len) in enumerate(zip(name_sequences, seq_lengths), 0):seq_tensor[idx, :seq_len] = torch.LongTensor(seq)# 名字的张量inputs = create_tensor(seq_tensor)# seq_lengths的张量seq_lengths = create_tensor(seq_lengths)# 通过模型进行预测输出output张量output = classifier(inputs,seq_lengths)# 通过线性层输出取最大项作为预测输出pred = output.max(dim=1,keepdim=True)[1]# 返回预测的indexreturn pred.item()scanf_name = "Putin" # 输入的名字可以进行修改print("Enter the name to be predicted:{}".format(scanf_name))print("predict country:",train_set.idx2country(predict_country(scanf_name)))

课后练习:电影评论情感分析

数据集:/c/sentiment-analysis-on-movie-reviews/data

输入电影文本,输出五个分类。

代码:

'''Sentiment Analysis on Movie Reviews'''import mathimport torchfrom itertools import chainimport pandas as pdfrom torch.nn.utils.rnn import pack_padded_sequencefrom torch.utils.data import Dataset, DataLoaderimport timeimport matplotlib.pyplot as pltclass SAData(Dataset):def __init__(self, train):# 构建数据样本self.train = trainself.data = pd.read_csv('./SentimentOnMovieReview/train.tsv', sep='\t')if self.train:# 随机选取80%作为训练集,不可按索引顺序取,数据会不全面self.data = self.data.sample(frac=0.8, replace=False, random_state=1, axis=0)# self.data = self.data[:int(self.data.shape[0] * 0.8)]self.data = self.data.reset_index(drop=True) # 重新生成索引### 正式训练要训练所有数据 #### self.data = self.dataself.len = self.data.shape[0]else:# 20%作为验证集self.data = self.data.sample(frac=0.2, replace=False, random_state=1, axis=0)# self.data = self.data[int(self.data.shape[0] * 0.8):]self.data = self.data.reset_index(drop=True) # 重新生成索引self.len = self.data.shape[0]self.x_data, self.y_data = self.data['Phrase'], self.data['Sentiment']def __getitem__(self, index):# 根据数据索引获取样本return self.x_data[index], self.y_data[index]def __len__(self):# 返回数据长度return self.len# 训练集验证集数据对象train_set = SAData(train=True)validation_set = SAData(train=False)# Hyper ParametersN_CHARS = 128 # ASCII码个数HIDDEN_SIZE = 128N_LAYER = 2BATCH_SIZE = 128N_EPOCHS = 10USE_GPU = TrueN_CLASS = len(set(train_set.y_data))# 训练集验证集数据加载对象train_loader = DataLoader(dataset=train_set,batch_size=BATCH_SIZE,shuffle=True,# num_workers=2)validation_loader = DataLoader(dataset=validation_set,batch_size=BATCH_SIZE,shuffle=False, # 测试集不打乱有利于观察结果# num_workers=2)def time_since(since):s = time.time() - sincem = math.floor(s / 60)s -= m * 60return '%dm %ds' % (m, s)def phrase2list(phrase):arr = [ord(c) for c in phrase] # ord() 返回对应的ASCII码return arr, len(arr)def create_tensor(tensor):if USE_GPU:device = torch.device('cuda:0')tensor = tensor.to(device)return tensordef make_tensor(phrase, sentiment):sequences_and_lengths = [phrase2list(phrase) for phrase in phrase] # 名字字符串->字符数组->对应ASCII码phrase_sequences = [sl[0] for sl in sequences_and_lengths]seq_lengths = torch.LongTensor([sl[1] for sl in sequences_and_lengths])sentiment = sentiment.long()# make tensor of name, batchSize x seqLenseq_tensor = torch.zeros(len(phrase_sequences), seq_lengths.max()).long()for idx, (seq, seq_len) in enumerate(zip(phrase_sequences, seq_lengths)): # 填充零seq_tensor[idx, :seq_len] = torch.LongTensor(seq) # name_sequences不够最大长度的位置补零# 排序 sort by length to use pack_padded_sequenceseq_lengths, perm_idx = seq_lengths.sort(dim=0, descending=True) # perm_idx表示排完序元素原本的索引seq_tensor = seq_tensor[perm_idx] # 对补零后的name_sequences按照长度排序sentiment = sentiment[perm_idx]return create_tensor(seq_tensor), create_tensor(seq_lengths), create_tensor(sentiment)class RNNClassifier(torch.nn.Module):def __init__(self, input_size, hidden_size, output_size, n_layers=1, bidirection=True):super(RNNClassifier, self).__init__()self.hidden_size = hidden_sizeself.n_layers = n_layersself.n_directions = 2 if bidirection else 1self.embedding = torch.nn.Embedding(input_size, hidden_size)self.gru = torch.nn.GRU(hidden_size, hidden_size, n_layers, bidirectional=bidirection)self.fc = torch.nn.Linear(hidden_size * self.n_directions, output_size)def _init_hidden(self, batch_size):hidden = torch.zeros(self.n_layers * self.n_directions, batch_size, self.hidden_size)return create_tensor(hidden)def forward(self, input, seq_lengths):input = input.t() # 转置 B x S -> S x Bbatch_size = input.size(1)hidden = self._init_hidden(batch_size)embedding = self.embedding(input)# 这里的pack,理解成压紧比较好。# 将一个 填充过的变长序列 压紧。(填充时候,会有冗余,所以压紧一下)gru_input = pack_padded_sequence(embedding, seq_lengths) # pack them upoutput, hidden = self.gru(gru_input, hidden)if self.n_directions == 2:hidden_cat = torch.cat([hidden[-1], hidden[-2]], dim=1)else:hidden_cat = hidden[-1]fc_output = self.fc(hidden_cat)return fc_outputdef trainModel():total_loss = 0for i, (phrase, sentiment) in enumerate(train_loader, 1):inputs, seq_lengths, target = make_tensor(phrase, sentiment)output = classifier(inputs, seq_lengths.to('cpu'))loss = criterion(output, target)optimizer.zero_grad()loss.backward()optimizer.step()total_loss += loss.item()if i % 10 == 0:print(f'[{time_since(start)}] Epoch {epoch}', end='')print(f'[{i * len(inputs)}/{len(train_set)}]', end='')print(f'loss={total_loss / (i * len(inputs))}')def evalModel():correct = 0total = len(validation_set)print("Evaluating trained model...")with torch.no_grad():for i, (phrase, sentiment) in enumerate(validation_loader, 1):inputs, seq_lengths, target = make_tensor(phrase, sentiment)output = classifier(inputs, seq_lengths.to('cpu'))pred = output.max(dim=1, keepdim=True)[1]correct += pred.eq(target.view_as(pred)).sum().item()percent = '%.2f' % (100 * correct / total)print(f'Test set: Accuracy {correct}/{total} {percent}%')return correct / total# 获取测试集def get_test_set():test_set = pd.read_csv('./SentimentOnMovieReview/test.tsv', '\t')PhraseId = test_set['PhraseId']Phrase = test_set['Phrase']return PhraseId, Phrase# 为测试集写的处理文本函数def make_tensor_test(phrase):sequences_and_lengths = [phrase2list(phrase) for phrase in phrase] # 名字字符串->字符数组->对应ASCII码phrase_sequences = [sl[0] for sl in sequences_and_lengths]seq_lengths = torch.LongTensor([sl[1] for sl in sequences_and_lengths])# make tensor of name, batchSize x seqLenseq_tensor = torch.zeros(len(phrase_sequences), seq_lengths.max()).long()for idx, (seq, seq_len) in enumerate(zip(phrase_sequences, seq_lengths)): # 填充零seq_tensor[idx, :seq_len] = torch.LongTensor(seq) # name_sequences不够最大长度的位置补零# 排序 sort by length to use pack_padded_sequenceseq_lengths, perm_idx = seq_lengths.sort(dim=0, descending=True) # perm_idx表示排完序元素原本的索引seq_tensor = seq_tensor[perm_idx] # 对补零后的name_sequences按照长度排序# 因为这里将测试集的每个Batch的文本顺序打乱了,记录原本的顺序org_idx,以便将预测出的结果顺序还原_, org_idx = perm_idx.sort(descending=False)return create_tensor(seq_tensor), create_tensor(seq_lengths), org_idxdef predict():# 使用模型得到结果PhraseId, Phrase = get_test_set() # 获取测试集sentiment_list = [] # 定义预测结果列表batchNum = math.ceil(PhraseId.shape[0] / BATCH_SIZE) # 获取总的Batch数classifier = torch.load('./model/sentimentAnalyst_best.pkl')if USE_GPU:device = torch.device("cuda:0")classifier.to(device)with torch.no_grad():for i in range(batchNum):print(i)if i == batchNum - 1:phraseBatch = Phrase[BATCH_SIZE * i:] # 处理最后不足BATCH_SIZE的情况else:phraseBatch = Phrase[BATCH_SIZE * i:BATCH_SIZE * (i + 1)]inputs, seq_lengths, org_idx = make_tensor_test(phraseBatch)output = classifier(inputs, seq_lengths.to('cpu'))sentiment = output.max(dim=1, keepdim=True)[1]sentiment = sentiment[org_idx].squeeze(1)sentiment_list.append(sentiment.cpu().numpy().tolist())sentiment_list = list(chain.from_iterable(sentiment_list)) # 将sentiment_list按行拼成一维列表result = pd.DataFrame({'PhraseId': PhraseId, 'Sentiment': sentiment_list})result.to_csv('./SentimentOnMovieReview/SA_predict.csv', index=False) # 保存结果# Main Cycleif __name__ == '__main__':classifier = RNNClassifier(N_CHARS, HIDDEN_SIZE, N_CLASS, N_LAYER)if USE_GPU:device = torch.device("cuda:0")classifier.to(device)criterion = torch.nn.CrossEntropyLoss()optimizer = torch.optim.Adam(classifier.parameters(), lr=0.001)start = time.time()print("Training for %d epochs..." % N_EPOCHS)acc_list = []for epoch in range(1, N_EPOCHS + 1):trainModel()acc = evalModel()acc_list.append(acc)# 保存最优时的模型 ################################if acc >= max(acc_list):torch.save(classifier, './model/sentimentAnalyst_best.pkl')print('Save Model!')predict() # 在测试集上预测结果# Plot Accuracyepoch = [epoch + 1 for epoch in range(len(acc_list))]plt.plot(epoch, acc_list)plt.xlabel('Epoch')plt.ylabel('Accuracy')plt.grid()plt.show()# 模型训练一段时间会出现的问题:# RuntimeError: cuDNN error: CUDNN_STATUS_INTERNAL_ERROR# 大约是因为显存不足

结果:

学习资料

/lizhuangabby/article/details/125866957/weixin_46047643/article/details/115398171?utm_medium=distribute.pc_relevant.none-task-blog-2defaultbaidujs_title~default-1-115398171-blog-125866957.pc_relevant_multi_platform_whitelistv4&spm=1001.2101.3001.4242.2&utm_relevant_index=4

系列文章索引

教程指路:【《PyTorch深度学习实践》完结合集】 /video/BV1Y7411d7Ys?share_source=copy_web&vd_source=3d4224b4fa4af57813fe954f52f8fbe7

线性模型 Linear Model梯度下降 Gradient Descent反向传播 Back Propagation用PyTorch实现线性回归 Linear Regression with Pytorch逻辑斯蒂回归 Logistic Regression多维度输入 Multiple Dimension Input加载数据集Dataset and Dataloader用Softmax和CrossEntroyLoss解决多分类问题(Minst数据集)CNN基础篇——卷积神经网络跑Minst数据集CNN高级篇——实现复杂网络RNN基础篇——实现RNNRNN高级篇—实现分类

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。