分享

【生成模型】简述概率密度函数可处理流模型

 有三AI 2020-11-27

本期将介绍第二种非常优雅的生成模型—流模型,它也是一种概率密度函数可处理的生成模型。本文将对其原理进行介绍,并对nice模型的源码进行讲解。

                                          作者&编辑 | 小米粥

1 流模型

这是一种想法比较直接但实际不容易构造的生成模型,它通过可逆的非线性变换等技巧使得似然函数可以被精确计算出来。对于分布比较简单(例如高斯分布)的隐变量z,其概率分布为 pz(z) ,这时若存在一个连续、可微、可逆的非线性变换g(z),将简单的潜变量z的分布转换成关于样本x的一个复杂分布,将非线性变换g(z)的逆变换记为f(x),则可得到样本x的准确的概率密度函数 px(x)

注意,非线性变换g(z)会引起空间的变形,即px(x)不等于 pz(f(x)),且有pz(z)dz=px(x)dx。若上述模型构建成功,则生成样本时只需从简单分布pz(z)中随机采样然后将其变换为 x=g(z) 即可。

为了训练非线性独立成分估计模型,我们必须计算样本的概率密度函数px(x)。分析上式,概率密度函数px(x)的计算需要计算pz(z)和雅可比矩阵的行列式绝对值。对于前者,要构建一个 g(z) 的逆变换f(x),这样当给定一个样本 x 后,就可以通过z=f(x)计算得到其对应的隐变量,pz(z)通常设计为简单的分布,容易计算;对于后者,要将f(x)设计为某种特殊的形式,使得雅可比矩阵的行列式易于计算。另外,变换的可逆性要求样本x和隐变量z具有相同的维度。综上,需要将生成模型精心设计成一种易于处理且灵活的双射模型,使其逆变换f(x)存在,且对应的雅可比矩阵的行列式可计算。

为了对这类模型有更深刻的理解,我们对NICE模型进行详细的介绍。NICE模型的逆变换f(x)由多个加性耦合层和一个尺度变换层构成。在每个加性耦合层中,首先将 n 维样本 x 分解为两部分x1和x2,例如将x的第1,3,5...个元素划入x1部分,将第2,4,6...个元素划入x2部分,每个部分的维度均为n/2;也可以将 x 的第2,4,6...个元素划入x1部分,将第1,3,5...个元素划入 x2 部分;或者使用其他划分方式。然后对两部分其进行变换:

其中m()为任意函数,注意这里要保证m()的输出结果维度与 x2 保持一致,NICE模型使用多层全连接网络和ReLU激活函数来构建 m() 。容易发现,使用加性耦合层作为逆变换f(x)的其中一部分,它是可逆的并且雅可比矩阵的行列式也是容易计算的。当已知h1和h2时,可得其逆变换:

其雅可比矩阵为:

根据三角矩阵的性质,其行列式为对角元素的乘积,故加性耦合层雅可比矩阵的行列式绝对值为1。当将多个加性耦合层串联时,如下图所示。由于每个层的逆变换是容易计算的,则串联后的逆变换仍然是容易计算的。此时的雅可比矩阵为:

根据矩阵行列式的性质,有:

需要说明的是,必须注意在不同的加性耦合层使用不同的划分策略,使得样本不同维度的信息充分混淆。在尺度变换层,定义了包含n个非负参数的向量s=[s1,s2,...,sn],将加性耦合层的输出结果h(l)与s逐元素相乘可得到对应的隐变量z。这里s用于控制每个维度的特征变换的尺度,可以表征维度的重要性,对应维度的数值较大表明这一维度的重要性低,因为生成样本时隐变量需要先经过尺度变换层,隐变量在尺度变换层需要逐元素乘1/s。显然,尺度变换层的逆变换只需逐元素乘1/s,为了计算雅可比矩阵,将尺度变换写为对角矩阵的形式:

则其雅可比矩阵的行列式为s1s2...sn。现在,我们构造了可逆的、雅可比矩阵的行列式绝对值易于计算的逆变换f(x),对于隐变量z,NICE模型假设其n个维度彼此独立,即

若选择z为高斯分布,则样本x的似然函数为:

若选择z为logistic分布,即

则样本x的似然函数为

现在,我们可以使用极大似然法对NICE模型进行训练,训练完成后也得到了生成模型g(z)。若  z 为高斯分布,则直接从高斯分布中采样可得到z;若选择  为logistic分布,可现在0-1之间的均匀分布中采样得到\epsilon ,然后使用变换z=t(ε)得到隐变量。根据两个随机变量的映射关系

则有

将隐变量z使用非线性变换g(z),即经过尺度变换层的逆变换、多个加性耦合层的逆变换可得到生成样本x。

2 NICE 代码

接下来我们将提供一份完整的NICE的代码讲解,其中训练集为mnist数据集。

首先读取相关python库:

import argparse 

import torch 

import torchvision 

import numpy as np 

import niceimport utils

定义主函数,设置相关参数,并进行模型的训练和验证

定义二维掩膜卷积核,其中有A与B两种类型,区别之处在于中心位置是否被卷积计算:

def main(args):

    device = torch.device("cuda:0") 

    # 训练相关参数设置

    dataset = args.dataset

    batch_size = args.batch_size

    latent = args.latent

    max_iter = args.max_iter

    sample_size = args.sample_size

    coupling = 4    mask_config = 1.

    # 优化器设置

    lr = args.lr

    momentum = args.momentum

    decay = args.decay

    zca = None

    mean = None

    # 选择相应的训练数据集

    if dataset == 'mnist':

        mean = torch.load('./statistics/mnist_mean.pt')        (full_dim, mid_dim, hidden) = (1 * 28 * 28, 1000, 5)        transform = torchvision.transforms.ToTensor()        trainset = torchvision.datasets.MNIST(root='~/torch/data/MNIST',            train=True, download=True, transform=transform)        trainloader = torch.utils.data.DataLoader(trainset,            batch_size=batch_size, shuffle=True, num_workers=2)         

  # 使用正态分布或逻辑回归分布

   if latent == 'normal':

        prior = torch.distributions.Normal(

            torch.tensor(0.).to(device), torch.tensor(1.).to(device))

    elif latent == 'logistic':

        prior = utils.StandardLogistic()

    filename = '%s_' % dataset \

             + 'bs%d_' % batch_size \

             + '%s_' % latent \

             + 'cp%d_' % coupling \

             + 'md%d_' % mid_dim \

             + 'hd%d_' % hidden

    # 实例化流模型

    flow = nice.NICE(prior=prior,

                coupling=coupling,

                in_out_dim=full_dim,

                mid_dim=mid_dim,

                hidden=hidden,

                mask_config=mask_config).to(device)

    optimizer = torch.optim.Adam(

        flow.parameters(), lr=lr, betas=(momentum, decay), eps=1e-4)

    total_iter = 0

    train = True

    running_loss = 0

    # 训练模型

    while train:

        for _, data in enumerate(trainloader, 1):

            flow.train()

    # set to training mode

            if total_iter == max_iter:

                train = False

                break

            total_iter += 1

            optimizer.zero_grad()

    # clear gradient tensors

            inputs, _ = data

            inputs = utils.prepare_data(

                inputs, dataset, zca=zca,

 mean=mean).to(device)

            # log-likelihood of input minibatch

            loss = -flow(inputs).mean()

            running_loss += float(loss)

            # backprop and update parameters

            loss.backward()

            optimizer.step()

            if total_iter % 10 == 0:

                mean_loss = running_loss / 2000

                bit_per_dim = (mean_loss + np.log(256.) * full_dim) \

                            / (full_dim * np.log(2.)) 

                print('iter %s:' % total_iter,

                    'loss = %.3f' % mean_loss,

                    'bits/dim = %.3f' % bit_per_dim)

                running_loss = 0.0

            if total_iter % 1000 == 0:

                flow.eval()

        # set to inference mode

                with torch.no_grad():

                    samples = flow.sample(sample_size).cpu()

                    samples = utils.prepare_data(                        samples, dataset, zca=zca, mean=mean, reverse=True)

                torchvision.utils.save_image(torchvision.utils.make_grid(samples),

'./samples/' + filename +'iter%d.png' % total_iter)

    # 保存模型

    torch.save({

        'total_iter': total_iter,

        'model_state_dict': flow.state_dict(),

        'optimizer_state_dict': optimizer.state_dict(),

        'dataset': dataset,

        'batch_size': batch_size,

        'latent': latent,

        'coupling': coupling,

        'mid_dim': mid_dim,

        'hidden': hidden,

        'mask_config': mask_config},

        './models/mnist/' + filename +'iter%d.tar' % total_iter)

    print('Checkpoint Saved')

 if __name__ == '__main__':

    parser = argparse.ArgumentParser('MNIST NICE PyTorch implementation')

    parser.add_argument('--dataset',

                        help='dataset to be modeled.',

                        type=str,

                        default='mnist')    parser.add_argument('--batch_size', 

                        help='number of images in a mini-batch.',

                        type=int,

                        default=256)

    parser.add_argument('--latent',

                        help='latent distribution.',

                        type=str,

                        default='logistic')

    parser.add_argument('--max_iter',

                        help='maximum number of iterations.',

                        type=int,

                        default=20000)

    parser.add_argument('--sample_size',

                        help='number of images to generate.',

                        type=int,

                        default=64)

    parser.add_argument('--lr',

                        help='initial learning rate.',

                        type=float,

                        default=1e-3)

    parser.add_argument('--momentum', 

                        help='beta1 in Adam optimizer.',

                        type=float,

                        default=0.9)

    parser.add_argument('--decay',

                        help='beta2 in Adam optimizer.',

                        type=float,

                        default=0.999)

    args = parser.parse_args()

    main(args)

NICE流模型的核心代码:

# 加性耦合层class Coupling(nn.Module):

    def __init__(self, in_out_dim, mid_dim, hidden, mask_config):

        super(Coupling, self).__init__()

        self.mask_config = mask_config

        self.in_block = nn.Sequential(

            nn.Linear(in_out_dim//2, mid_dim),

            nn.ReLU())

        self.mid_block = nn.ModuleList([

            nn.Sequential(

                nn.Linear(mid_dim, mid_dim),

                nn.ReLU()) for _ in range(hidden - 1)])

        self.out_block = nn.Linear(mid_dim, in_out_dim//2)

    def forward(self, x, reverse=False):

        [B, W] = list(x.size())

        x = x.reshape((B, W//2, 2))

        if self.mask_config:

            on, off = x[:, :, 0], x[:, :, 1]

        else:

            off, on = x[:, :, 0], x[:, :, 1]

        off_ = self.in_block(off)

        for i in range(len(self.mid_block)):

            off_ = self.mid_block[i](off_)

        shift = self.out_block(off_)

        if reverse:

            on = on - shift

        else:

            on = on + shift

        if self.mask_config:

            x = torch.stack((on, off), dim=2)

        else:

            x = torch.stack((off, on), dim=2)

        return x.reshape((B, W))

 # 尺度变换层

 class Scaling(nn.Module):

    def __init__(self, dim):

        super(Scaling, self).__init__()

        self.scale = nn.Parameter(

            torch.zeros((1, dim)), requires_grad=True)

    def forward(self, x, reverse=False):

        log_det_J = torch.sum(self.scale)

        if reverse:

            x = x * torch.exp(-self.scale)

        else:

            x = x * torch.exp(self.scale)

        return x, log_det_J

 # NICE模型

 class NICE(nn.Module):

    def __init__(self, prior, coupling,

        in_out_dim, mid_dim, hidden, mask_config):

        self.prior = prior

        self.in_out_dim = in_out_dim

        self.coupling = nn.ModuleList([

            Coupling(in_out_dim=in_out_dim,

                     mid_dim=mid_dim,

                     hidden=hidden,

                     mask_config=(mask_config+i)%2) \            for i in range(coupling)])

        self.scaling = Scaling(in_out_dim)

    def g(self, z):

        x, _ = self.scaling(z, reverse=True)

        for i in reversed(range(len(self.coupling))):

            x = self.coupling[i](x, reverse=True)

        return x

    def f(self, x):

        for i in range(len(self.coupling)):

            x = self.coupling[i](x)

        return self.scaling(x)

    def log_prob(self, x):

        z, log_det_J = self.f(x)

        log_ll = torch.sum(self.prior.log_prob(z), dim=1)

        return log_ll + log_det_J

    def sample(self, size):

        z = self.prior.sample((size, self.in_out_dim)).cuda()        return self.g(z)

    def forward(self, x):

        return self.log_prob(x)

[1] Dinh L , Krueger D , Bengio Y . NICE: Non-linear Independent Components Estimation[J]. Computer ence, 2014.

[2]Diederik P. Kingma*†, Prafulla Dhariwal⇤. Glow: Generative Flow with Invertible 1x1 Convolutions[J]. 2018.

总结

本期带大家学习了流模型,流模型的生成效果也非常卓越,但是其网络设计比较复杂,训练难度比较大。下一期将为大家继续讲解其他生成模型。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多