分享

Transformer与看图说话

 新用户79878317 2023-01-03 发布于河南

🏅🏅🏅🏅🏅🏅🏅🏅🏅🏅🏅🏅🏅🏅🏅🏅

一年一度的【博客之星】评选活动已开始啦

作为第一次且有幸能够参加评选的小博主

我诚惶诚恐

还请各位花费宝贵的几秒钟时间为我投上五星:

2022年「博客之星」参赛博主:老师我作业忘带了

✨✨✨✨✨谢谢各位✨✨✨✨✨

本项目来使用Transformer实现看图说话,即Image Caption任务。相关涉及的知识点有:迁移学习、EfficientNet、Transformer Encoder、Transformer Decoder、Self-attention。

项目效果如下:

文章末尾也展示了预测失败的时候 

Image Caption:

  • 让机器在图片中生成一段描述性的文字。
  • 机器需要检测出图中的物体、还需要了解物体中相互的关系,最后生成合理的序言描述。
  • 像这种既需要 CV(计算机视觉) 又需要 NLP(自然语言处理) 的称之为多模态

Image Caption论文推荐:

  • MAOJH,XU W,YANG Y,etal.Deep captioning with multi-modal recurrent neural networks (m-RNN)
  • VINYALSO,TOSHEV A,BENGIOS,etal.Showandtell:A neural image caption generator

自然语言及注意力等论文推荐(新手):

  • Efficient Estimation of Word Representations inVector Space
  • Vaswani, Shazeer, Parmar, et al. (2017) Attention Is All You Need NeurIPS 
  • Devlin, Chang, Lee, Toutanova (2019) BERT: Pre-training of Deep Bidirectional Transformers for Language  Understanding NAACL
  • XU K,BA JL,KIROS R,etal.Show,attendandtell:Neural image caption generation with visual attention
  • LU JS,XIONG C M,DEVIP,etal.Knowing whentolook: Adaptive attention via a visual sentinel for image captioning(自适应注意力机制)

项目流程如下:

图片输入CNN进行特征提取后,输入Encoder形成序列,将token信号和Encoder的输出传递给Decoder,经过全连接和Softmax,得到输出结果。

详细网络架构:

其中左下角为CNN特征提取,右下角为自然语言中的文本embedding,上方则为transformer经典网络架构。

数据文件内容如下:

下载链接: 点击此处

其中文件夹中存放本次训练使用的图片集,下方json文件则写有对应图片的标注,如:

代码流程:

本次项目代码比较多,均已写在下方,且注释我已经努力写得很详细了:

一、前期配置

导入相关包 

  1. # 导入相关包
  2. import os
  3. import re
  4. import cv2
  5. import random
  6. import numpy as np
  7. import matplotlib.pyplot as plt
  8. import tensorflow as tf
  9. from tensorflow import keras
  10. from tensorflow.keras import layers
  11. # 导入预训练CNN
  12. from tensorflow.keras.applications import efficientnet
  13. from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
  14. import json
  15. import jieba
  16. import tqdm

设置GPU训练

  1. gpus = tf.config.list_physical_devices('GPU')
  2. if gpus:
  3. gpu0 = gpus[0] #如果有多个GPU,仅使用第0个GPU
  4. tf.config.experimental.set_memory_growth(gpu0, True) #设置GPU显存用量按需使用
  5. tf.config.set_visible_devices([gpu0],'GPU')
  6. gpus
[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

设计任务基本参数

  1. # 设置基本参数
  2. # 图片地址
  3. IMAGES_PATH = './ai_challenger_caption_validation_20170910/caption_validation_images_20170910/'
  4. # 目标大小
  5. IMAGE_SIZE = (299, 299)
  6. # 词汇量大小
  7. VOCAB_SIZE = 10000
  8. # 输出句子单词长度
  9. SEQ_LENGTH = 25
  10. # 特征向量长度
  11. EMBED_DIM = 512
  12. # 输出层维度大小
  13. FF_DIM = 512
  14. # 训练参数
  15. BATCH_SIZE = 64
  16. EPOCHS = 30
  17. AUTOTUNE = tf.data.AUTOTUNE

二、数据预处理

2.1 将图片和它的“label”对应起来

这一步我们把图片的相对路径和其5个caption以字典的形式对应起来,过程中要使用jieba进行中文分割,加上start和end,并且去掉带有不合格caption的例子,保证我们的输入在其中一个维度大小是5。

  1. token_len = [] # 用于后面统计句子中词的长度
  2. def load_captions_json(filename):
  3. caption_mapping = {} # 映射字典 image1(path):[caption1,caption2...]
  4. text_data = [] # 把合格的处理好的caption放到这里 后面用来向量化
  5. images_to_skip = set() # 用来保存不合格的标注,后面在caption_mapping去掉对应的图片及其注释
  6. # 打开并读取json文件
  7. with open(filename) as f:
  8. json_data = json.load(f)
  9. # 遍历3W个json数据
  10. for item in tqdm.tqdm(json_data):
  11. # 图片的名字“id”
  12. img_name = item['image_id']
  13. # 图片的路径
  14. img_path = os.path.join(IMAGES_PATH, img_name.strip())
  15. # 遍历属于每个图片的5个标注
  16. for caption in item['caption']:
  17. # 分词
  18. tokens =[word for word in jieba.cut(caption)]
  19. # 根据tokens构造caption(空格分隔的字符串)
  20. caption = ' '.join(tokens)
  21. # 存入句子中词的长度
  22. token_len.append(len(tokens))
  23. if len(tokens) < 3 or len(tokens) > SEQ_LENGTH:
  24. images_to_skip.add(img_path)
  25. continue
  26. # 如果文件名以jpg结尾,且标注不在images_to_skip中
  27. if img_path.endswith('jpg') and img_path not in images_to_skip:
  28. # 增加开始和结束token
  29. caption = '<start> ' + caption.strip() + ' <end>'
  30. text_data.append(caption)
  31. if img_path in caption_mapping:
  32. # 追加
  33. caption_mapping[img_path].append(caption)
  34. else:
  35. # 初始化
  36. caption_mapping[img_path] = [caption]
  37. # 如果文件名在images_to_skip中,则将caption_mapping中的元素删除掉
  38. # 即这里可能有的caption不是5个
  39. for img_path in images_to_skip:
  40. if img_path in caption_mapping:
  41. del caption_mapping[img_path]
  42. return caption_mapping, text_data
  1. # 加载数据
  2. captions_mapping, text_data = load_captions_json('./ai_challenger_caption_validation_20170910/caption_validation_annotations_20170910.json')
  1. # 可见句子中词的平均长度为13 符合我们上方所设置的参数 如果不符合可以进行微调
  2. np.array(token_len).mean()
13.015133333333333

此时返回的:

captions_mapping则是一个字典,键为图片的相对路径,值是一个列表,里面是其5个caption。 text_data是一个列表,里面是全部的caption,和captions_mapping.values()结果应该是一样的。

2.2 设置训练集和测试集

  1. train_size=0.8
  2. # all_images列表里是所有图片的文件路径
  3. all_images = list(captions_mapping.keys())
  4. # 打乱顺序
  5. np.random.shuffle(all_images)
  6. # 获取训练集数量
  7. train_size = int(len(captions_mapping) * train_size)
  8. train_data = {
  9. img_name: captions_mapping[img_name] for img_name in all_images[:train_size]
  10. }
  11. valid_data = {
  12. img_name: captions_mapping[img_name] for img_name in all_images[train_size:]
  13. }
len(train_data),len(valid_data)
(23896, 5975)

2.3 文本向量化

  1. # 去除句子中的特殊符号
  2. strip_chars = '!\'#$%&'()*+,-./:;<=>?@[\]^_`{|}~'
  3. strip_chars = strip_chars.replace('<', '') # 因为我们句子中有'<start>' '<end>'
  4. strip_chars = strip_chars.replace('>', '')
  5. def custom_standardization(input_string):
  6. # 全部转为小写
  7. lowercase = tf.strings.lower(input_string)
  8. return tf.strings.regex_replace(lowercase, '[%s]' % re.escape(strip_chars), '')
  9. vectorization = TextVectorization(
  10. max_tokens=VOCAB_SIZE, # 词汇量大小 最上方设置10000
  11. output_mode='int',
  12. output_sequence_length=SEQ_LENGTH, # 输出句子长度 最上方设置25
  13. standardize=custom_standardization,
  14. )
  15. vectorization.adapt(text_data)
  1. # 查看所有词汇
  2. vectorization.get_vocabulary()

上方则是出现过的所有词,按照频率排序,索引0为空,1为未登入进来的词。

假如我们自己随便写句话:一个男人在打游戏

可以看到 '一个 男人 在 打 游戏' 这五个词的索引分别是 5,8,6,687 其中0代表补白(当然也可以看作索引0的空),如果句子短不到25就用0补充,超过25了就截断。

解码操作,vocab[向量] 如 vocab[[1,2,3]] 会得根据索引到相应的话

  1. vocab = np.array(vectorization.get_vocabulary())
  2. vocab[[5,8,6,67,687]]
array(['一个', '男人', '在', '打', '游戏'], dtype='<U7')
vocab[[7853,6,967,1]]
array(['我', '在', '准备', '[UNK]'], dtype='<U7')

2.4 制作数据集

这一步 我们要把train_data和valid_data这两个字典中的图片进行压缩resize 生成准备使用的数据集格式

  • tf.data.Dataset.from_tensor_slices() 该函数的作用是接收tensor,对tensor的第一维度进行切分,并返回一个表示该tensor的切片数据集
  1. def decode_and_resize(img_path):
  2. # 读取图片,并缩放
  3. img = tf.io.read_file(img_path)
  4. img = tf.image.decode_jpeg(img, channels=3)
  5. img = tf.image.resize(img, IMAGE_SIZE)
  6. img = tf.image.convert_image_dtype(img, tf.float32)
  7. return img
  8. def process_input(img_path, captions):
  9. return decode_and_resize(img_path), vectorization(captions)
  10. def make_dataset(images, captions):
  11. dataset = tf.data.Dataset.from_tensor_slices((images, captions))
  12. dataset = dataset.shuffle(len(images))
  13. dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE)
  14. dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE)
  15. return dataset
  16. # 制作数据集
  17. train_dataset = make_dataset(list(train_data.keys()), list(train_data.values()))
  18. valid_dataset = make_dataset(list(valid_data.keys()), list(valid_data.values()))

接下来我们从train_dataset中拿出数据来看一下到底是什么,因为它是一个可迭代对象,一批有64个(batch个),一共23896/64=374批只看一个就可以,所以记得加上break。

可以看到输入图片的大小是 299x299x3 后方词向量caption有5个,每个长25

  1. for i in train_dataset:
  2. print(i[0].shape)
  3. print(i[1].shape)
  4. break
(64, 299, 299, 3)
(64, 5, 25)
  1. for i in train_dataset:
  2. # 获取图片
  3. img = i[0][0].numpy().astype('int')
  4. # 获取标注(词向量)
  5. caption = i[1][0].numpy()
  6. # 显示
  7. plt.imshow(img)
  8. # 解码
  9. print(vocab[caption])
  10. break

2.5 数据增强

增的太强了会抑制过拟合 但会降低准确率(当然了0.0) 

  1. # 数据增强
  2. image_augmentation = keras.Sequential(
  3. [
  4. layers.experimental.preprocessing.RandomFlip('horizontal'),
  5. layers.experimental.preprocessing.RandomRotation(0.2),
  6. layers.experimental.preprocessing.RandomContrast(0.3),
  7. ]
  8. )

三、构建模型

3.1 构建CNN提取图片特征

方便起见,这里选择使用 EfficientNet 和 迁移学习 的方式来完成

其中 EfficientNet由16个移动翻转瓶颈卷积模块,2个卷积层,1个全局平均池化层和1个分类层构成。

  1. def get_cnn_model():
  2. # CNN模型
  3. base_model = efficientnet.EfficientNetB0(
  4. input_shape=(*IMAGE_SIZE, 3), include_top=False, weights='imagenet',
  5. )
  6. # 冻住特征提取层
  7. base_model.trainable = False
  8. base_model_out = base_model.output
  9. # 我们要修改输出层,(n,100,1280)
  10. base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out)
  11. cnn_model = keras.models.Model(base_model.input, base_model_out)
  12. return cnn_model
  1. cnn_model = get_cnn_model()
  2. cnn_model.summary()

......

reshape (Reshape)               (None, 100, 1280)    0           top_activation[0][0]             
==================================================================================================
Total params: 4,049,571
Trainable params: 0
Non-trainable params: 4,049,571
__________________________________________________________________________________________________

这样 当我们每输入一批/一个batch个数据时,就会输出一批/一个batch个数据:

即输入 64x299x299x3的图片 输出64个图片的特征,维度是100x1280

测试一下CNN

  1. # 模拟图片测试一下
  2. cnn_test_input = tf.random.normal([64, 299,299,3]) # 随机正态分布64张299x299x3的图片
  3. # 输入网络
  4. cnn_test_output = cnn_model(cnn_test_input, training=False)
  5. cnn_test_output.shape
TensorShape([64, 100, 1280])

3.2 构建编码器transformer encoder

  1. class TransformerEncoderBlock(layers.Layer):
  2. def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
  3. super().__init__()
  4. self.embed_dim = embed_dim
  5. self.dense_dim = dense_dim
  6. self.num_heads = num_heads
  7. self.attention_1 = layers.MultiHeadAttention( # multi head attention
  8. num_heads=num_heads, key_dim=embed_dim, dropout=0.0
  9. ) # 头的数量 输出维度的大小 dropout
  10. self.layernorm_1 = layers.LayerNormalization()
  11. self.layernorm_2 = layers.LayerNormalization()
  12. self.dense_1 = layers.Dense(embed_dim, activation='relu')
  13. def call(self, inputs, training, mask=None):
  14. # layer norm
  15. inputs = self.layernorm_1(inputs)
  16. inputs = self.dense_1(inputs)
  17. # 传入 q k v
  18. attention_output_1 = self.attention_1(
  19. query=inputs,
  20. value=inputs,
  21. key=inputs,
  22. attention_mask=None,
  23. training=training, # training:布尔值,表示推理还是训练(是否使用 dropout)
  24. )
  25. # residual然后再layer norm
  26. out_1 = self.layernorm_2(inputs + attention_output_1) # 残差链接
  27. return out_1

测试一下transformer encoder

  1. # 测试一下 我们把CNN的特征值给它
  2. encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1)
  3. # 输入网络
  4. encoder_test_output = encoder(cnn_test_output, training=False)
  5. encoder_test_output.shape
TensorShape([64, 100, 512])

3.3 位置编码Positional Embedding

这一步做的是: 每张图片对应5个词向量,我们选出一个去掉最后的''(此时长为24了),之前比如词向量是[1,6,4,2,0,0,...,0] 现在对其进行升维,比如用一个二维(该项目升维512)坐标分别表示1,6,4,...,

如: 1->[0.1, 4.2] 6->[4.1, 2.0] ...

  1. class PositionalEmbedding(layers.Layer):
  2. # 位置编码
  3. def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
  4. super().__init__()
  5. '''
  6. embedding用法:https://stats./questions/270546/how-does-keras-embedding-layer-work
  7. input_dim:词汇数量;output_dim:特征向量大小
  8. '''
  9. # token embedding:长度为vocab_size,特征向量为:embed_dim
  10. self.token_embeddings = layers.Embedding(
  11. input_dim=vocab_size, output_dim=embed_dim
  12. )
  13. # position_embeddings:
  14. self.position_embeddings = layers.Embedding(
  15. input_dim=sequence_length, output_dim=embed_dim
  16. )
  17. self.sequence_length = sequence_length
  18. self.vocab_size = vocab_size
  19. self.embed_dim = embed_dim
  20. # 512开根号:22.627416998:https://jalammar./illustrated-transformer/
  21. self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32))
  22. def call(self, inputs):
  23. # 获取caption长度,这里是24个(前24个单词,去掉<end>)
  24. length = tf.shape(inputs)[-1]
  25. # 生成0~length(即24)的数字
  26. positions = tf.range(start=0, limit=length, delta=1)
  27. # 输入的句子index转为embedding特征,大小:(N, 24, 512)
  28. embedded_tokens = self.token_embeddings(inputs)
  29. # 乘以22.62 上面开根号了 这里乘过去 反向传播好算
  30. embedded_tokens = embedded_tokens * self.embed_scale
  31. # 位置编码,大小:(24, 512)
  32. embedded_positions = self.position_embeddings(positions)
  33. # 加和 返回
  34. return embedded_tokens + embedded_positions

测试一下PositionalEmbedding

  1. # 测试模型
  2. test_embedding_model = PositionalEmbedding(embed_dim=EMBED_DIM, sequence_length=SEQ_LENGTH, vocab_size=VOCAB_SIZE)
  3. # 测试输入,选择一个batch中的第一个句子(一共有5个)
  4. for i in train_dataset:
  5. # 获取测试标签中的一个的前24个词 大小(64, 24)
  6. caption = i[1][:,0,:-1]
  7. print(caption.shape)
  8. # 传入模型
  9. positional_output = test_embedding_model(caption)
  10. # 打印结果的大小
  11. print(positional_output.shape)
  12. break
(64, 24)
(64, 24, 512)

强制教学,原来是64x24x1 现在是(64, 24, 512) 简单来说就是词向量映射到高维了

长度为24的单词变为512的embedding向量

3.4 构建解码器transformer decoder

这里不懂的自己查一下吧 三言两语说不完

总之 这一步最后输出为VOCAB_SIZE(这里为1000)大小的向量,对应位置的大小为概率,可以查索引来获取相应原单词,比如变成(64, 24, 10000)

  1. class TransformerDecoderBlock(layers.Layer):
  2. def __init__(self, embed_dim, ff_dim, num_heads, **kwargs):
  3. super().__init__()
  4. self.embed_dim = embed_dim
  5. self.ff_dim = ff_dim
  6. self.num_heads = num_heads
  7. self.attention_1 = layers.MultiHeadAttention(
  8. num_heads=num_heads, key_dim=embed_dim, dropout=0.1
  9. )
  10. self.attention_2 = layers.MultiHeadAttention(
  11. num_heads=num_heads, key_dim=embed_dim, dropout=0.1
  12. )
  13. self.ffn_layer_1 = layers.Dense(ff_dim, activation='relu')
  14. self.ffn_layer_2 = layers.Dense(embed_dim)
  15. self.layernorm_1 = layers.LayerNormalization()
  16. self.layernorm_2 = layers.LayerNormalization()
  17. self.layernorm_3 = layers.LayerNormalization()
  18. # 位置编码
  19. self.embedding = PositionalEmbedding(
  20. embed_dim=EMBED_DIM, sequence_length=SEQ_LENGTH, vocab_size=VOCAB_SIZE
  21. )
  22. self.out = layers.Dense(VOCAB_SIZE, activation='softmax')
  23. self.dropout_1 = layers.Dropout(0.3)
  24. self.dropout_2 = layers.Dropout(0.5)
  25. self.supports_masking = True
  26. def call(self, inputs, encoder_outputs, training, mask=None):
  27. # 获取位置编码,(N,24) --> (N,24,512)
  28. inputs = self.embedding(inputs)
  29. '''
  30. causal_mask 的 shape:(64,24,24)
  31. 64个一模一样,大小为(24, 24)的mask
  32. '''
  33. causal_mask = self.get_causal_attention_mask(inputs)
  34. '''
  35. mask (64,24) --> padding_mask (64, 24, 1)
  36. padding_mask:64个大小为(24, 1)的mask
  37. [[1][1][1]...[0][0][0][0][0]]
  38. '''
  39. padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32)
  40. '''
  41. mask (64,24) --> combined_mask (64, 1, 24)
  42. combined_mask:64个大小为(1, 24)的mask
  43. [[1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]]
  44. '''
  45. combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32)
  46. '''
  47. 在combined_mask与causal_mask选择最小值,大小(64, 24, 24)
  48. 64个不再一模一样,大小为(24, 24)的mask
  49. '''
  50. combined_mask = tf.minimum(combined_mask, causal_mask)
  51. # 第一个masked self attention,QKV都是inputs, mask是causal mask,强制训练时只关注输出位置左侧的token,以便模型可以自回归地推断
  52. attention_output_1 = self.attention_1(
  53. query=inputs,
  54. value=inputs,
  55. key=inputs,
  56. attention_mask=combined_mask,
  57. training=training,
  58. )
  59. out_1 = self.layernorm_1(inputs + attention_output_1)
  60. # cross attention,其中K、V来自encoder,Q来自decoder前一个的attention输出,mask是padding mask,用来遮挡25个单词中补白的部分
  61. attention_output_2 = self.attention_2(
  62. query=out_1,
  63. value=encoder_outputs,
  64. key=encoder_outputs,
  65. attention_mask=padding_mask,
  66. training=training,
  67. )
  68. out_2 = self.layernorm_2(out_1 + attention_output_2)
  69. ffn_out = self.ffn_layer_1(out_2)
  70. ffn_out = self.dropout_1(ffn_out, training=training)
  71. ffn_out = self.ffn_layer_2(ffn_out)
  72. ffn_out = self.layernorm_3(ffn_out + out_2, training=training)
  73. ffn_out = self.dropout_2(ffn_out, training=training)
  74. # 最后输出为VOCAB_SIZE大小的向量,对应位置的大小为概率,可以查索引来获取相应原单词
  75. preds = self.out(ffn_out)
  76. return preds
  77. def get_causal_attention_mask(self, inputs):
  78. '''
  79. causal: 因果关系mask
  80. '''
  81. # (N,24,512)
  82. input_shape = tf.shape(inputs)
  83. # 分别为N,24
  84. batch_size, sequence_length = input_shape[0], input_shape[1]
  85. #范围0~24的列表,变成大小(24, 1)的数组
  86. i = tf.range(sequence_length)[:, tf.newaxis]
  87. #范围0~24的列表
  88. j = tf.range(sequence_length)
  89. mask = tf.cast(i >= j, dtype='int32')
  90. # 大小为(1, 24, 24)
  91. mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
  92. scale = tf.concat(
  93. [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
  94. axis=0,
  95. )
  96. # (1, 24, 24)铺成(64, 24, 24)
  97. result = tf.tile(mask, scale)
  98. return result

测试一下transformer decoder

  1. # 测试模型
  2. decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2)
  3. # decoder.summary()
  1. # 测试输入
  2. for i in train_dataset:
  3. # 前0~ -1(24)个单词(去尾)
  4. batch_seq_inp = i[1][:,0,:-1]
  5. # print(batch_seq_inp.shape)
  6. # 前1~ 个(24)个单词(掐头),用做ground truth标注
  7. batch_seq_true = i[1][:,0,1:]
  8. # print(batch_seq_true.shape)
  9. # 将batch_seq_true中的每一个元素和0作对比,返回类似[true,true,false]形式的mask,遇到0,则会变成false,0表示字符串中长度不够25的补白部分(padding)
  10. mask = tf.math.not_equal(batch_seq_true, 0)
  11. # print(mask.shape)
  12. # 输入decoder预测的序列
  13. batch_seq_pred = decoder(
  14. batch_seq_inp, encoder_test_output, training=False, mask=mask
  15. )
  16. print(batch_seq_pred.shape)
  17. break
(64, 24, 10000)

3.5 构建ImageCaption任务模型

这里调用到上方CNN、encoder和decoder  顺序为该项目的模型流程

  • 获取图片CNN特征--》
  • 传给encoder--》
  • 1.对于decoder先提供<start>--》
  • 2.传给decoder推理--》
  • 3.不断投喂给模型,直到遇到停止.
  • 4.如果循环次数超出句子长度,也停止.
  1. class ImageCaptioningModel(keras.Model):
  2. def __init__(
  3. self, cnn_model, encoder, decoder, num_captions_per_image=5, image_aug=None,
  4. ):
  5. super().__init__()
  6. self.cnn_model = cnn_model
  7. self.encoder = encoder
  8. self.decoder = decoder
  9. self.loss_tracker = keras.metrics.Mean(name='loss')
  10. self.acc_tracker = keras.metrics.Mean(name='accuracy')
  11. self.num_captions_per_image = num_captions_per_image
  12. self.image_aug = image_aug
  13. def calculate_loss(self, y_true, y_pred, mask):
  14. loss = self.loss(y_true, y_pred)
  15. mask = tf.cast(mask, dtype=loss.dtype)
  16. loss *= mask
  17. return tf.reduce_sum(loss) / tf.reduce_sum(mask)
  18. def calculate_accuracy(self, y_true, y_pred, mask):
  19. accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2))
  20. accuracy = tf.math.logical_and(mask, accuracy)
  21. accuracy = tf.cast(accuracy, dtype=tf.float32)
  22. mask = tf.cast(mask, dtype=tf.float32)
  23. return tf.reduce_sum(accuracy) / tf.reduce_sum(mask)
  24. def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True):
  25. '''
  26. 计算loss
  27. '''
  28. # 图片的embedding特征输入encoder,得到新的seq,大小(N,100,512)
  29. encoder_out = self.encoder(img_embed, training=training)
  30. # batch_seq的shape:(64, 25)
  31. # 前24个单词(去尾)
  32. batch_seq_inp = batch_seq[:, :-1]
  33. # 后24个单词(掐头),用做ground truth标注
  34. batch_seq_true = batch_seq[:, 1:]
  35. # mask掩码,将batch_seq_true中的每一个元素和0作对比,返回类似[true,true,false]形式的mask,遇到0,则会变成false,0表示字符串中长度不够25的补白部分(padding)
  36. mask = tf.math.not_equal(batch_seq_true, 0)
  37. # 输入decoder预测的序列
  38. batch_seq_pred = self.decoder(
  39. batch_seq_inp, encoder_out, training=training, mask=mask
  40. )
  41. # 计算loss和acc
  42. loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask)
  43. acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask)
  44. return loss, acc
  45. def train_step(self, batch_data):
  46. '''
  47. 训练步骤
  48. '''
  49. # 获取图片和标注
  50. batch_img, batch_seq = batch_data
  51. # 初始化
  52. batch_loss = 0
  53. batch_acc = 0
  54. # 是否使用数据增强
  55. if self.image_aug:
  56. batch_img = self.image_aug(batch_img)
  57. # 获取图片embedding特征
  58. img_embed = self.cnn_model(batch_img)
  59. # 遍历5个文本标注
  60. for i in range(self.num_captions_per_image):
  61. with tf.GradientTape() as tape:
  62. # 计算loss和acc
  63. # batch_seq的shape:(64, 5, 25)
  64. loss, acc = self._compute_caption_loss_and_acc(
  65. img_embed, batch_seq[:, i, :], training=True
  66. )
  67. # 更新loss和acc
  68. batch_loss += loss
  69. batch_acc += acc
  70. # 获取所有可训练参数
  71. train_vars = (
  72. self.encoder.trainable_variables + self.decoder.trainable_variables
  73. )
  74. # 获取梯度
  75. grads = tape.gradient(loss, train_vars)
  76. # 更新参数
  77. self.optimizer.apply_gradients(zip(grads, train_vars))
  78. # 更新
  79. batch_acc /= float(self.num_captions_per_image)
  80. self.loss_tracker.update_state(batch_loss)
  81. self.acc_tracker.update_state(batch_acc)
  82. return {'loss': self.loss_tracker.result(), 'acc': self.acc_tracker.result()}
  83. def test_step(self, batch_data):
  84. batch_img, batch_seq = batch_data
  85. batch_loss = 0
  86. batch_acc = 0
  87. # 获取图片embedding特征
  88. img_embed = self.cnn_model(batch_img)
  89. # 遍历5个文本标注
  90. for i in range(self.num_captions_per_image):
  91. loss, acc = self._compute_caption_loss_and_acc(
  92. img_embed, batch_seq[:, i, :], training=False
  93. )
  94. batch_loss += loss
  95. batch_acc += acc
  96. batch_acc /= float(self.num_captions_per_image)
  97. self.loss_tracker.update_state(batch_loss)
  98. self.acc_tracker.update_state(batch_acc)
  99. return {'loss': self.loss_tracker.result(), 'acc': self.acc_tracker.result()}
  100. @property
  101. def metrics(self):
  102. return [self.loss_tracker, self.acc_tracker]

四、编译模型

4.1 模型实例化

  1. cnn_model = get_cnn_model()
  2. encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1)
  3. decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2)
  4. caption_model = ImageCaptioningModel(
  5. cnn_model=cnn_model, encoder=encoder, decoder=decoder, image_aug=image_augmentation,
  6. )

4.2 设置loss、早停参数

  1. # loss
  2. cross_entropy = keras.losses.SparseCategoricalCrossentropy(
  3. from_logits=False, reduction='none'
  4. )
  5. # 提前终止
  6. early_stopping = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True)
  7. class LRSchedule(keras.optimizers.schedules.LearningRateSchedule):
  8. def __init__(self, post_warmup_learning_rate, warmup_steps):
  9. super().__init__()
  10. self.post_warmup_learning_rate = post_warmup_learning_rate
  11. self.warmup_steps = warmup_steps
  12. def __call__(self, step):
  13. global_step = tf.cast(step, tf.float32)
  14. warmup_steps = tf.cast(self.warmup_steps, tf.float32)
  15. warmup_progress = global_step / warmup_steps
  16. warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress
  17. return tf.cond(
  18. global_step < warmup_steps,
  19. lambda: warmup_learning_rate,
  20. lambda: self.post_warmup_learning_rate,
  21. )
  22. # LR调节
  23. num_train_steps = len(train_dataset) * EPOCHS
  24. num_warmup_steps = num_train_steps // 15
  25. lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps)

4.3 编译并训练

  1. # 编译
  2. caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy)
  3. # 训练
  4. caption_model.fit(
  5. train_dataset,
  6. epochs=EPOCHS,
  7. validation_data=valid_dataset,
  8. callbacks=[early_stopping],
  9. )

4.4 保存权重

caption_model.save_weights('./my_model/checkpoint')

4.5 加载权重进行测试

load_status = caption_model.load_weights('./my_model/checkpoint')
  1. vocab = vectorization.get_vocabulary()
  2. index_lookup = dict(zip(range(len(vocab)), vocab))
  3. max_decoded_sentence_length = SEQ_LENGTH - 1
  4. valid_images = list(valid_data.keys())
  5. valid_caption = list(valid_data.values())
  6. valid_len = len(valid_images)
  • 获取图片CNN特征--》
  • 传给encoder--》
  • 1.对于decoder先提供<start>--》
  • 2.传给decoder推理--》
  • 3.不断投喂给模型,直到遇到停止.
  • 4.如果循环次数超出句子长度,也停止.
  1. def generate_caption():
  2. # 在测试集中随机取一张图片
  3. random_index = random.randrange(0,valid_len)
  4. sample_img = valid_images[random_index]
  5. sample_caption = valid_caption[random_index][0]
  6. # 读取图片
  7. sample_img = decode_and_resize(sample_img)
  8. img_show = sample_img.numpy().clip(0, 255).astype(np.uint8)
  9. plt.imshow(img_show)
  10. plt.axis('off')
  11. plt.show()
  12. # 保存
  13. cv2.imwrite('./img/raw.jpg',cv2.cvtColor(img_show,cv2.COLOR_RGB2BGR))
  14. # 获取CNN特征
  15. img = tf.expand_dims(sample_img, 0)
  16. img = caption_model.cnn_model(img)
  17. # 传给encoder
  18. encoded_img = caption_model.encoder(img, training=False)
  19. # 1.先提供'<start> '
  20. # 2.传给decoder推理,
  21. # 3.不断投喂给模型,直到遇到<end>停止
  22. # 4.如果循环次数超出句子长度,也停止
  23. decoded_caption = '<start> '
  24. for i in range(max_decoded_sentence_length): # 24
  25. tokenized_caption = vectorization([decoded_caption])[:, :-1]
  26. mask = tf.math.not_equal(tokenized_caption, 0)
  27. # 预测
  28. predictions = caption_model.decoder(
  29. tokenized_caption, encoded_img, training=False, mask=mask
  30. )
  31. sampled_token_index = np.argmax(predictions[0, i, :])
  32. sampled_token = index_lookup[sampled_token_index]
  33. if sampled_token == ' <end>':
  34. break
  35. decoded_caption += ' ' + sampled_token
  36. decoded_caption = decoded_caption.replace('<start> ', '')
  37. decoded_caption = decoded_caption.replace(' <end>', '').strip()
  38. sample_caption = sample_caption.replace('<start> ', '')
  39. sample_caption = sample_caption.replace(' <end>', '').strip()
  40. print('预测: ', decoded_caption)
  41. print('真实:',sample_caption)
generate_caption()

generate_caption()

4.6 测试自己的图片

  1. def predict_imgs(path):
  2. input_img = decode_and_resize(path).numpy().clip(0, 255).astype(np.uint8)
  3. plt.imshow(input_img)
  4. plt.axis('off')
  5. plt.show()
  6. # 获取CNN特征
  7. img = tf.expand_dims(input_img, 0)
  8. img = caption_model.cnn_model(img)
  9. # 传给encoder
  10. encoded_img = caption_model.encoder(img, training=False)
  11. # 1.先提供'<start> '
  12. # 2.传给decoder推理,
  13. # 3.不断投喂给模型,直到遇到<end>停止
  14. # 4.如果循环次数超出句子长度,也停止
  15. decoded_caption = '<start> '
  16. for i in range(max_decoded_sentence_length): # 24
  17. tokenized_caption = vectorization([decoded_caption])[:, :-1]
  18. mask = tf.math.not_equal(tokenized_caption, 0)
  19. # 预测
  20. predictions = caption_model.decoder(
  21. tokenized_caption, encoded_img, training=False, mask=mask
  22. )
  23. sampled_token_index = np.argmax(predictions[0, i, :])
  24. sampled_token = index_lookup[sampled_token_index]
  25. if sampled_token == ' <end>':
  26. break
  27. decoded_caption += ' ' + sampled_token
  28. decoded_caption = decoded_caption.replace('<start> ', '')
  29. decoded_caption = decoded_caption.replace(' <end>', '').strip()
  30. print('预测: ', decoded_caption)
  1. path = './a01.jpg'
  2. predict_imgs(path)

主要原因是我们的数据集见识比较少,导致预测错误。 

  1. path = './a02.jpg'
  2. predict_imgs(path)

这次模型将功补过,成功悟出了一代巨星的动作! 

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多