本期将介绍第二种非常优雅的生成模型—流模型,它也是一种概率密度函数可处理的生成模型。本文将对其原理进行介绍,并对nice模型的源码进行讲解。 作者&编辑 | 小米粥 1 流模型 这是一种想法比较直接但实际不容易构造的生成模型,它通过可逆的非线性变换等技巧使得似然函数可以被精确计算出来。对于分布比较简单(例如高斯分布)的隐变量z,其概率分布为 pz(z) ,这时若存在一个连续、可微、可逆的非线性变换g(z),将简单的潜变量z的分布转换成关于样本x的一个复杂分布,将非线性变换g(z)的逆变换记为f(x),则可得到样本x的准确的概率密度函数 px(x) 注意,非线性变换g(z)会引起空间的变形,即px(x)不等于 pz(f(x)),且有pz(z)dz=px(x)dx。若上述模型构建成功,则生成样本时只需从简单分布pz(z)中随机采样然后将其变换为 x=g(z) 即可。 为了训练非线性独立成分估计模型,我们必须计算样本的概率密度函数px(x)。分析上式,概率密度函数px(x)的计算需要计算pz(z)和雅可比矩阵的行列式绝对值。对于前者,要构建一个 g(z) 的逆变换f(x),这样当给定一个样本 x 后,就可以通过z=f(x)计算得到其对应的隐变量,pz(z)通常设计为简单的分布,容易计算;对于后者,要将f(x)设计为某种特殊的形式,使得雅可比矩阵的行列式易于计算。另外,变换的可逆性要求样本x和隐变量z具有相同的维度。综上,需要将生成模型精心设计成一种易于处理且灵活的双射模型,使其逆变换f(x)存在,且对应的雅可比矩阵的行列式可计算。 为了对这类模型有更深刻的理解,我们对NICE模型进行详细的介绍。NICE模型的逆变换f(x)由多个加性耦合层和一个尺度变换层构成。在每个加性耦合层中,首先将 n 维样本 x 分解为两部分x1和x2,例如将x的第1,3,5...个元素划入x1部分,将第2,4,6...个元素划入x2部分,每个部分的维度均为n/2;也可以将 x 的第2,4,6...个元素划入x1部分,将第1,3,5...个元素划入 x2 部分;或者使用其他划分方式。然后对两部分其进行变换: 其中m()为任意函数,注意这里要保证m()的输出结果维度与 x2 保持一致,NICE模型使用多层全连接网络和ReLU激活函数来构建 m() 。容易发现,使用加性耦合层作为逆变换f(x)的其中一部分,它是可逆的并且雅可比矩阵的行列式也是容易计算的。当已知h1和h2时,可得其逆变换: 其雅可比矩阵为: 根据三角矩阵的性质,其行列式为对角元素的乘积,故加性耦合层雅可比矩阵的行列式绝对值为1。当将多个加性耦合层串联时,如下图所示。由于每个层的逆变换是容易计算的,则串联后的逆变换仍然是容易计算的。此时的雅可比矩阵为: 根据矩阵行列式的性质,有: 需要说明的是,必须注意在不同的加性耦合层使用不同的划分策略,使得样本不同维度的信息充分混淆。在尺度变换层,定义了包含n个非负参数的向量s=[s1,s2,...,sn],将加性耦合层的输出结果h(l)与s逐元素相乘可得到对应的隐变量z。这里s用于控制每个维度的特征变换的尺度,可以表征维度的重要性,对应维度的数值较大表明这一维度的重要性低,因为生成样本时隐变量需要先经过尺度变换层,隐变量在尺度变换层需要逐元素乘1/s。显然,尺度变换层的逆变换只需逐元素乘1/s,为了计算雅可比矩阵,将尺度变换写为对角矩阵的形式: 则其雅可比矩阵的行列式为s1s2...sn。现在,我们构造了可逆的、雅可比矩阵的行列式绝对值易于计算的逆变换f(x),对于隐变量z,NICE模型假设其n个维度彼此独立,即 若选择z为高斯分布,则样本x的似然函数为: 若选择z为logistic分布,即 则样本x的似然函数为 现在,我们可以使用极大似然法对NICE模型进行训练,训练完成后也得到了生成模型g(z)。若 z 为高斯分布,则直接从高斯分布中采样可得到z;若选择 为logistic分布,可现在0-1之间的均匀分布中采样得到\epsilon ,然后使用变换z=t(ε)得到隐变量。根据两个随机变量的映射关系 则有 将隐变量z使用非线性变换g(z),即经过尺度变换层的逆变换、多个加性耦合层的逆变换可得到生成样本x。 2 NICE 代码 接下来我们将提供一份完整的NICE的代码讲解,其中训练集为mnist数据集。 首先读取相关python库: import argparse import torch import torchvision import numpy as np import niceimport utils 定义主函数,设置相关参数,并进行模型的训练和验证 定义二维掩膜卷积核,其中有A与B两种类型,区别之处在于中心位置是否被卷积计算: def main(args): device = torch.device("cuda:0") # 训练相关参数设置 dataset = args.dataset batch_size = args.batch_size latent = args.latent max_iter = args.max_iter sample_size = args.sample_size coupling = 4 mask_config = 1. # 优化器设置 lr = args.lr momentum = args.momentum decay = args.decay zca = None mean = None # 选择相应的训练数据集 if dataset == 'mnist': mean = torch.load('./statistics/mnist_mean.pt') (full_dim, mid_dim, hidden) = (1 * 28 * 28, 1000, 5) transform = torchvision.transforms.ToTensor() trainset = torchvision.datasets.MNIST(root='~/torch/data/MNIST', train=True, download=True, transform=transform) trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2) # 使用正态分布或逻辑回归分布 if latent == 'normal': prior = torch.distributions.Normal( torch.tensor(0.).to(device), torch.tensor(1.).to(device)) elif latent == 'logistic': prior = utils.StandardLogistic() filename = '%s_' % dataset \ + 'bs%d_' % batch_size \ + '%s_' % latent \ + 'cp%d_' % coupling \ + 'md%d_' % mid_dim \ + 'hd%d_' % hidden # 实例化流模型 flow = nice.NICE(prior=prior, coupling=coupling, in_out_dim=full_dim, mid_dim=mid_dim, hidden=hidden, mask_config=mask_config).to(device) optimizer = torch.optim.Adam( flow.parameters(), lr=lr, betas=(momentum, decay), eps=1e-4) total_iter = 0 train = True running_loss = 0 # 训练模型 while train: for _, data in enumerate(trainloader, 1): flow.train() # set to training mode if total_iter == max_iter: train = False break total_iter += 1 optimizer.zero_grad() # clear gradient tensors inputs, _ = data inputs = utils.prepare_data( inputs, dataset, zca=zca, mean=mean).to(device) # log-likelihood of input minibatch loss = -flow(inputs).mean() running_loss += float(loss) # backprop and update parameters loss.backward() optimizer.step() if total_iter % 10 == 0: mean_loss = running_loss / 2000 bit_per_dim = (mean_loss + np.log(256.) * full_dim) \ / (full_dim * np.log(2.)) print('iter %s:' % total_iter, 'loss = %.3f' % mean_loss, 'bits/dim = %.3f' % bit_per_dim) running_loss = 0.0 if total_iter % 1000 == 0: flow.eval() # set to inference mode with torch.no_grad(): samples = flow.sample(sample_size).cpu() samples = utils.prepare_data( samples, dataset, zca=zca, mean=mean, reverse=True) torchvision.utils.save_image(torchvision.utils.make_grid(samples), './samples/' + filename +'iter%d.png' % total_iter) # 保存模型 torch.save({ 'total_iter': total_iter, 'model_state_dict': flow.state_dict(), 'optimizer_state_dict': optimizer.state_dict(), 'dataset': dataset, 'batch_size': batch_size, 'latent': latent, 'coupling': coupling, 'mid_dim': mid_dim, 'hidden': hidden, 'mask_config': mask_config}, './models/mnist/' + filename +'iter%d.tar' % total_iter) print('Checkpoint Saved') if __name__ == '__main__': parser = argparse.ArgumentParser('MNIST NICE PyTorch implementation') parser.add_argument('--dataset', help='dataset to be modeled.', type=str, default='mnist') parser.add_argument('--batch_size', help='number of images in a mini-batch.', type=int, default=256) parser.add_argument('--latent', help='latent distribution.', type=str, default='logistic') parser.add_argument('--max_iter', help='maximum number of iterations.', type=int, default=20000) parser.add_argument('--sample_size', help='number of images to generate.', type=int, default=64) parser.add_argument('--lr', help='initial learning rate.', type=float, default=1e-3) parser.add_argument('--momentum', help='beta1 in Adam optimizer.', type=float, default=0.9) parser.add_argument('--decay', help='beta2 in Adam optimizer.', type=float, default=0.999) args = parser.parse_args() main(args) NICE流模型的核心代码: # 加性耦合层class Coupling(nn.Module): def __init__(self, in_out_dim, mid_dim, hidden, mask_config): super(Coupling, self).__init__() self.mask_config = mask_config self.in_block = nn.Sequential( nn.Linear(in_out_dim//2, mid_dim), nn.ReLU()) self.mid_block = nn.ModuleList([ nn.Sequential( nn.Linear(mid_dim, mid_dim), nn.ReLU()) for _ in range(hidden - 1)]) self.out_block = nn.Linear(mid_dim, in_out_dim//2) def forward(self, x, reverse=False): [B, W] = list(x.size()) x = x.reshape((B, W//2, 2)) if self.mask_config: on, off = x[:, :, 0], x[:, :, 1] else: off, on = x[:, :, 0], x[:, :, 1] off_ = self.in_block(off) for i in range(len(self.mid_block)): off_ = self.mid_block[i](off_) shift = self.out_block(off_) if reverse: on = on - shift else: on = on + shift if self.mask_config: x = torch.stack((on, off), dim=2) else: x = torch.stack((off, on), dim=2) return x.reshape((B, W)) # 尺度变换层 class Scaling(nn.Module): def __init__(self, dim): super(Scaling, self).__init__() self.scale = nn.Parameter( torch.zeros((1, dim)), requires_grad=True) def forward(self, x, reverse=False): log_det_J = torch.sum(self.scale) if reverse: x = x * torch.exp(-self.scale) else: x = x * torch.exp(self.scale) return x, log_det_J # NICE模型 class NICE(nn.Module): def __init__(self, prior, coupling, in_out_dim, mid_dim, hidden, mask_config): self.prior = prior self.in_out_dim = in_out_dim self.coupling = nn.ModuleList([ Coupling(in_out_dim=in_out_dim, mid_dim=mid_dim, hidden=hidden, mask_config=(mask_config+i)%2) \ for i in range(coupling)]) self.scaling = Scaling(in_out_dim) def g(self, z): x, _ = self.scaling(z, reverse=True) for i in reversed(range(len(self.coupling))): x = self.coupling[i](x, reverse=True) return x def f(self, x): for i in range(len(self.coupling)): x = self.coupling[i](x) return self.scaling(x) def log_prob(self, x): z, log_det_J = self.f(x) log_ll = torch.sum(self.prior.log_prob(z), dim=1) return log_ll + log_det_J def sample(self, size): z = self.prior.sample((size, self.in_out_dim)).cuda() return self.g(z) def forward(self, x): return self.log_prob(x) [1] Dinh L , Krueger D , Bengio Y . NICE: Non-linear Independent Components Estimation[J]. Computer ence, 2014. [2]Diederik P. Kingma*†, Prafulla Dhariwal⇤. Glow: Generative Flow with Invertible 1x1 Convolutions[J]. 2018. 本期带大家学习了流模型,流模型的生成效果也非常卓越,但是其网络设计比较复杂,训练难度比较大。下一期将为大家继续讲解其他生成模型。 |
|