Featured image of post 大模型基础知识补习&GPT2代码分析

大模型基础知识补习&GPT2代码分析

实操一下GPT2的代码,深刻理解transformer模型的结构

大模型基础知识

代码来源:https://github.com/openai/gpt-2

GPT2源代码提供了以下几个文件:

encoder.py

generate_unconditional_samples.py

interactive_conditional_samples.py

model.py

sample.py

model

1
2
3
4
5
6
7
8
def default_hparams():
    return HParams(
        n_vocab=0,        # 词汇表大小(通常是词汇表中包含的词语或符号数量)
        n_ctx=1024,       # 输入序列的最大长度(例如,一个输入文本序列的最大 token 数量)
        n_embd=768,       # 嵌入层的维度大小(每个词或符号在嵌入空间中的表示长度)
        n_head=12,        # 多头自注意力机制中的头数(即注意力头的数量)
        n_layer=12,       # Transformer 模型中的层数(即堆叠的 transformer 层数)
    )

词汇表

词汇表(Vocabulary)是自然语言处理(NLP)模型中用于将语言转换为数字表示的一组词或符号的集合。在深度学习模型中,词汇表用于将文本数据中的每个单词或符号映射为一个唯一的数字(通常是一个整数),以便计算机能够理解和处理。

词汇表的作用:

  1. 映射文本到数字:由于机器学习模型无法直接处理文本数据,必须将文本中的每个词或符号转换为数字。词汇表就是这个映射的核心工具。例如,对于句子“我爱自然语言处理”,我们需要将“我”、“爱”、“自然”、“语言”、“处理”这些词转化为模型可以理解的数字。
  2. 限制模型输入的规模:在实际操作中,词汇表的大小有限。通常,我们不会为每个可能的字词创建一个索引,而是根据训练数据中出现的词汇频率来限制词汇表的大小。这样,词汇表的规模可以控制模型的输入维度,避免过大的计算和内存开销。
  3. 处理未知词汇:词汇表通常会为所有未出现在训练数据中的词汇或符号保留一个特殊的“未知词”标记(如 [UNK])。当模型遇到这些未见过的词时,会用这个特殊标记来代替。

词汇表的组成:

  1. :如果你使用的是基于单词的词汇表,那么词汇表中的元素就是单词。例如,假设你有一个很小的词汇表:

    1
    2
    3
    4
    5
    
    css
    
    
    复制代码
    ['我', '爱', '自然', '语言', '处理']
    
  2. 子词(Subword):对于一些较为复杂的语言模型,尤其是像 GPT-2 和 BERT 这样的模型,它们通常使用 子词 作为词汇表的基本单元。子词是一个比单词更小的语言单位,它可以是词的一部分或者是一个常见的词根、前缀、后缀等。例如,词汇表中可能包含 ##ingun 等子词单元,而不是整个单词 runningundo。这有助于模型更好地处理未见过的词汇,并减少词汇表的大小。

  3. 特殊标记:除了实际的词或子词,词汇表通常还会包含一些特殊标记:

    • [PAD]:填充标记,用于将输入文本填充到相同的长度。
    • [UNK]:未知标记,用于处理那些在训练集中没有出现过的词。
    • [CLS]:分类标记,通常用于标记输入的开始(特别是在 BERT 等模型中)。
    • [SEP]:分隔标记,通常用于分隔不同句子或文本段落(在 BERT 等模型中也有使用)。

向量空间(Vector Space)

向量空间是一个数学概念,指的是所有向量的集合,可以在其中进行加法和标量乘法等操作。在机器学习和自然语言处理中,我们通常将每个词、子词或符号表示为一个高维空间中的向量。例如,一个简单的二维向量空间可能包含了这样的一组向量:

  • (1, 0)
  • (0, 1)
  • (1, 1)

在 NLP 中,这些向量并没有具体的几何含义,而是用来表示不同的语义特征。每个向量的元素(即各维度的值)代表了某种抽象的语义属性。

词向量空间意味着每个词都被映射为一个向量,且这些向量之间有一定的关系。例如,“king” 和 “queen” 可能在向量空间中非常接近,因为它们的语义相似,尽管它们的拼写不同。嵌入的目标就是将语言中的词语、句子或其他语言单位映射到这样一个向量空间中。

嵌入(Embedding)

**嵌入(Embedding)**是将一个离散的词汇项(如单词、子词)转换为连续的向量表示的过程。这些向量通常具有固定的维度(例如 300 维、768 维等),并且能够捕捉该词的语义信息。

换句话说,嵌入是对词汇的数字表示,它通过将每个词或符号映射到一个向量空间中,使得模型能够理解和处理语言中的复杂结构。

例如,假设你有以下三个词汇:

  • “猫”(cat)
  • “狗”(dog)
  • “汽车”(car)

通过嵌入,这些词语会被映射为固定长度的向量,例如:

  • “猫” → [0.45, 0.87, 0.34, ...]
  • “狗” → [0.42, 0.89, 0.31, ...]
  • “汽车” → [0.21, 0.08, 0.76, ...]

这些向量的维度(例如 300 维或 768 维)反映了嵌入空间的复杂度。模型通过训练来优化这些向量,使得语义相似的词在嵌入空间中距离更近,语义不同的词距离较远。

向量(Vector)

向量是在数学和计算机科学中用来表示方向和大小的对象。每个词或符号被嵌入为一个向量,这个向量通常是一个浮动的数字数组。向量的每个元素都表示一个特定的特征维度。

在 NLP 中,词的向量通常有以下几个特点:

  • 稠密表示:每个词的向量是一个低维的稠密向量,不像传统的“one-hot”编码(仅使用0和1来表示词汇)。稠密向量包含了丰富的语义信息。
  • 语义关系:通过训练,词向量可以捕捉到词汇之间的语义关系。例如,“king” 和 “queen” 可能有相似的向量,因为它们共享许多语义特征。

例如,假设我们训练了一个模型,得到以下的词向量:

  • “苹果” → [0.5, -0.2, 0.1, 0.7]
  • “香蕉” → [0.4, -0.1, 0.2, 0.6]

这些向量的数值表示了每个词的特征和语义信息。在训练过程中,模型会通过调整这些向量,使得语义相似的词的向量在空间中彼此接近。

为什么要使用嵌入?

  1. 捕捉语义相似性:通过嵌入,模型可以理解哪些词是相似的,哪些是不同的。比如“猫”和“狗”虽然拼写不同,但它们都是“动物”,因此它们的向量会很接近。
  2. 降维:将词语从“one-hot”编码的高维空间(例如,假设有 10,000 个不同的词,one-hot 向量就是 10,000 维的)映射到一个低维空间(如 300 维)。这样可以减少计算的复杂度和内存消耗。
  3. 增强模型理解:通过将词转换为向量,模型可以更加有效地理解文本中的语义、句法关系和上下文信息。这对于复杂的任务(如文本分类、情感分析、翻译等)非常重要。
1
2
3
4
5
def shape_list(x):
    """Deal with dynamic shape in tensorflow cleanly."""
    static = x.shape.as_list()  # 获取静态形状
    dynamic = tf.shape(x)  # 获取动态形状
    return [dynamic[i] if s is None else s for i, s in enumerate(static)]  # 返回合并后的形状
  1. static = x.shape.as_list()

    • x.shape 是 TensorFlow 张量 x 的静态形状。
    • as_list() 将这个形状从 TensorShape 对象转换为 Python 列表。静态形状的值如果是已知的,会直接作为数值存在。如果某个维度的值是 None(表示该维度在静态形状中不确定),那么会是 None
    • 例如,如果 x 的形状是 [None, 32, 32, 3],则 static 会是 [None, 32, 32, 3]
  2. dynamic = tf.shape(x)

    • tf.shape(x) 返回张量 x 的动态形状,它是一个 TensorFlow 张量,表示运行时该张量的形状。与静态形状不同,动态形状会根据实际运行时的输入数据进行计算。
    • 返回值是一个形状为 [d0, d1, ..., dn] 的张量(d0d1 等是张量的各个维度的动态大小)。
  3. return [dynamic[i] if s is None else s for i, s in enumerate(static)]

    • 这个列表推导式的目的是将静态形状和动态形状合并成一个形状列表。如果某个静态形状的维度是 None(即在静态形状中未指定),那么就使用对应的动态形状。
    • enumerate(static) 会遍历静态形状中的每个维度,如果该维度是 None,就从动态形状中取对应的维度值;如果该维度不是 None,则保留静态形状中的值。

    例如:

    • 假设 static = [None, 32, 32, 3](第一维度 None,表示它是动态的)和 dynamic = [64, 32, 32, 3](运行时该张量的实际形状是 [64, 32, 32, 3])。
    • 那么,函数的返回值将是 [64, 32, 32, 3]
  4. 为什么需要这个函数?

在 TensorFlow 中,许多操作要求处理的张量形状是已知的(静态形状),但有些情况下形状在执行时才会动态确定,特别是在处理输入数据的批量大小、图像尺寸等变量时。shape_list 函数就是用来统一处理这两种情况的,使得无论是在静态还是动态形状下,代码都能够正常运行而不会出错。

  1. 举个例子:

假设我们有一个张量 x,它的形状是 [None, 64, 64, 3](批量大小未知):

1
2
3
4
5
6
import tensorflow as tf

x = tf.placeholder(tf.float32, shape=[None, 64, 64, 3])  # 动态批量大小
shape = shape_list(x)

print(shape)  # 输出形状

在这个例子中,如果你传递一个大小为 [32, 64, 64, 3] 的输入,shape_list 函数会返回 [32, 64, 64, 3],即将 None 替换为实际的批量大小 32

1
2
3
4
def softmax(x, axis=-1): 
    x = x - tf.reduce_max(x, axis=axis, keepdims=True)
    ex = tf.exp(x)
    return ex / tf.reduce_sum(ex, axis=axis, keepdims=True)

这个函数实现了 Softmax 操作,将输入 x 转换为概率分布,常用于多分类模型的输出层。通过减去最大值避免指数计算中的数值溢出,确保计算稳定性。

1
2
def gelu(x):
    return 0.5*x*(1+tf.tanh(np.sqrt(2/np.pi)*(x+0.044715*tf.pow(x, 3))))

GELU 是一种平滑的、概率性的激活函数,能够更自然地通过高斯分布近似非线性激活。相比于 ReLU,GELU 在训练时通常能更好地保留梯度,并且能够减少死神经元的问题。

代码中使用了 tf.tanhtf.pow(TensorFlow 的函数)与 NumPy 的常量进行计算,确保了在 TensorFlow 中的高效计算。

Layer Normalization 是一种在深度学习中常用的技术,旨在对每个样本的特征进行归一化,使得特征的均值为0,方差为1,从而加速训练并提高模型的性能。

代码解释:

1
2
3
4
5
6
def norm(x, scope, *, axis=-1, epsilon=1e-5):
    """Normalize to mean = 0, std = 1, then do a diagonal affine transform."""
    with tf.variable_scope(scope):
        n_state = x.shape[-1].value
        g = tf.get_variable('g', [n_state], initializer=tf.constant_initializer(1))
        b = tf.get_variable('b', [n_state], initializer=tf.constant_initializer(0))
  1. x: 输入张量,通常是经过激活函数后的某一层的输出。
  2. scope: 变量作用域,用于创建和管理变量。
  3. axis=-1: 归一化时的维度,通常在 Layer Normalization 中是最后一维(即特征维度)。axis=-1 表示归一化最后一维,通常是一个样本的所有特征。
  4. epsilon=1e-5: 为了避免在除法中出现除零错误,epsilon 是一个很小的常数(默认值为 1e−51e-51e−5)。
  5. n_state = x.shape[-1].value: 获取输入张量 x 在最后一维的大小,代表特征数目。

接下来,定义了两个变量 gb,分别用于对归一化后的结果进行线性变换:

  • g 是缩放因子(通常称为 Gamma)。
  • b 是平移因子(通常称为 Beta)。
1
2
       g = tf.get_variable('g', [n_state], initializer=tf.constant_initializer(1))
        b = tf.get_variable('b', [n_state], initializer=tf.constant_initializer(0))
  • gb 都是与输入 x 的最后一维特征数目相同的向量。g 初始化为 1,b 初始化为 0。

归一化步骤:

1
2
        u = tf.reduce_mean(x, axis=axis, keepdims=True)
        s = tf.reduce_mean(tf.square(x - u), axis=axis, keepdims=True)
  • u = tf.reduce_mean(x, axis=axis, keepdims=True): 计算输入 x 在指定 axis 维度上的均值(平均值)。这里,axis 默认为 -1,即沿着最后一维计算均值。
  • s = tf.reduce_mean(tf.square(x - u), axis=axis, keepdims=True): 计算输入 x 与均值 u 的差的平方的平均值,即方差。keepdims=True 保证输出维度与输入相同,便于后续操作。
1
        x = (x - u) * tf.rsqrt(s + epsilon)
  • x = (x - u) * tf.rsqrt(s + epsilon)

    对输入

    1
    
    x
    

    进行标准化(即归一化处理),使得均值为 0,方差为 1。

    • x - u:减去均值,使得每个元素的均值为 0。
    • tf.rsqrt(s + epsilon):计算标准差的倒数并进行缩放,rsqrt1 / sqrt(x)epsilon 用于防止除 0 错误。
1
2
        x = x * g + b
        return x
  • x = x \* g + b: 最后,对标准化后的 x 进行线性变换,使用参数 g(缩放因子)和 b(平移因子)。这种变换使得归一化的结果可以恢复一定的表达能力,类似于 Batch Normalization 中的 affine 变换。
  • return x: 返回归一化并经过变换的结果。
1
2
3
4
def split_states(x, n):
    """Reshape the last dimension of x into [n, x.shape[-1]/n]."""
    *start, m = shape_list(x)
    return tf.reshape(x, start + [n, m//n])

这个函数 split_states 的作用是将输入张量 x 的最后一维拆分成两个部分,其中一个部分的大小是 n,另一个部分的大小是 m // nm 是输入张量的最后一维大小)。

\*start, m = shape_list(x)

  • shape_list(x) 会返回张量 x 的形状(作为一个列表)。*start 会将除了最后一维之外的所有维度(即张量的前几维)存入 start 变量中,m 则保存张量最后一维的大小。
  • 例如,如果 x 的形状是 [batch_size, time_steps, features],那么 start 将保存 [batch_size, time_steps]m 将保存 features

tf.reshape(x, start + [n, m // n])

  • startx 的前几个维度的列表,n 是传入的参数,表示你想将最后一维拆分成 n 部分,m // n 是每一部分的大小。
  • tf.reshape 函数将输入张量 x 重新调整形状,形状变为 [start, n, m // n]
  • 例如,如果 x 的形状是 [batch_size, time_steps, features],并且传入 n = 2,那么新的形状将是 [batch_size, time_steps, 2, features // 2]
1
2
3
4
def merge_states(x):
    """Smash the last two dimensions of x into a single dimension."""
    *start, a, b = shape_list(x)
    return tf.reshape(x, start + [a*b])

这个函数 merge_states 的作用是将输入张量 x 的最后两个维度合并成一个维度。也就是说,它通过将最后两个维度相乘,将它们“压扁”为一个新的维度。

\*start, a, b = shape_list(x):

  • shape_list(x) 返回张量 x 的形状,作为一个列表。
  • *start 会将 x 的所有维度(除了最后两个维度)存入 startab 分别保存 x 的倒数第二维和最后一维的大小。

举个例子:

  • 假设 x 的形状是 [batch_size, time_steps, features, channels],那么 start = [batch_size, time_steps, features]a = featuresb = channels

tf.reshape(x, start + [a \* b]):

  • tf.reshape(x, start + [a * b]) 会将 x 重新调整形状,新的形状由 start(原先的所有维度,除了最后两个)和 a * b(将最后两个维度相乘的结果)组成。
  • a * b 是将原本的最后两个维度合并成一个新的维度。

这个 conv1d 函数实现了一个 1D 卷积 操作。虽然它的名字和传统的 1D 卷积类似,但它实际上通过矩阵乘法和适当的形状变换来模拟 1D 卷积的操作。以下是代码逐行解释:

代码解释:

1
2
3
4
5
6
7
def conv1d(x, scope, nf, *, w_init_stdev=0.02):
    with tf.variable_scope(scope):
        *start, nx = shape_list(x)
        w = tf.get_variable('w', [1, nx, nf], initializer=tf.random_normal_initializer(stddev=w_init_stdev))
        b = tf.get_variable('b', [nf], initializer=tf.constant_initializer(0))
        c = tf.reshape(tf.matmul(tf.reshape(x, [-1, nx]), tf.reshape(w, [-1, nf])) + b, start + [nf])
        return c
  1. \*start, nx = shape_list(x):
    • shape_list(x) 返回张量 x 的形状,作为一个列表。*start 会将除了最后一维之外的所有维度(即张量的前几维)存入 startnx 保存 x 的最后一维大小。
    • 比如,如果 x 的形状是 [batch_size, length, in_channels],那么 start 将是 [batch_size, length]nx 就是 in_channels
  2. w = tf.get_variable('w', [1, nx, nf], initializer=tf.random_normal_initializer(stddev=w_init_stdev)):
    • 创建一个卷积核(权重) w,其形状为 [1, nx, nf]。这里的 1 是卷积核的大小(即卷积操作是 1D 的),nx 是输入的最后一维大小(通常是输入通道数),nf 是卷积操作后输出通道的数量。
    • initializer=tf.random_normal_initializer(stddev=w_init_stdev) 初始化权重值为从正态分布中采样,标准差为 w_init_stdev,默认为 0.02
  3. b = tf.get_variable('b', [nf], initializer=tf.constant_initializer(0)):
    • 创建偏置 b,其形状为 [nf],即与输出通道数相同,初始化为零。
  4. c = tf.reshape(tf.matmul(tf.reshape(x, [-1, nx]), tf.reshape(w, [-1, nf])) + b, start + [nf]):
    • tf.reshape(x, [-1, nx]): 将输入张量 x 重塑为形状 [-1, nx],即将所有前面的维度展平,保留最后一维 nx。这一步相当于将输入的多维张量展平成二维矩阵,每一行代表一个数据样本的输入通道。
    • tf.reshape(w, [-1, nf]): 将卷积核 w 重塑为形状 [-1, nf],这意味着卷积核的权重被展平为一个矩阵,其中每个输入通道的权重被分配到输出通道。
    • tf.matmul(...): 执行矩阵乘法。此处是将展平后的输入张量 x 与展平后的卷积核 w 相乘,类似于卷积操作中的加权求和过程。
    • + b: 将偏置加到结果中。
    • tf.reshape(..., start + [nf]): 将矩阵乘法后的结果再重塑回原来的形状,保留除了最后一维以外的所有维度,最后一维变为 nf,即输出通道数。
  5. return c:
    • 返回卷积操作后的结果 c,它的形状是 [batch_size, length, nf],即输出的批次大小、长度和输出通道数。

总结:

这个 conv1d 函数实现的是一种通过矩阵乘法模拟 1D 卷积操作的方式。传统的 1D 卷积通过滑动窗口对输入进行局部加权求和,而这个实现通过将输入展平成矩阵后与卷积核进行矩阵乘法,从而实现类似的效果。

主要特点:

  • 权重形状[1, nx, nf],表示卷积核的大小为 1xnx,即卷积操作是对输入的每个位置进行加权求和,输出通道数为 nf
  • 矩阵乘法实现卷积:通过 tf.matmul 将输入张量和卷积核做矩阵乘法,而不是传统的滑动窗口卷积。
  • 展平操作:输入张量和卷积核都通过 tf.reshape 进行了展平处理,以便进行矩阵乘法。

这种方法虽然在实现上与传统的卷积不同,但效果是相同的,并且可以通过矩阵乘法的方式提高计算效率或适应某些特定的任务需求。

这段代码定义了一个函数 attention_mask,用于生成注意力掩码(attention mask)

1
2
3
4
5
6
7
8
9
def attention_mask(nd, ns, *, dtype):
    """1's in the lower triangle, counting from the lower right corner.

    Same as tf.matrix_band_part(tf.ones([nd, ns]), -1, ns-nd), but doesn't produce garbage on TPUs.
    """
    i = tf.range(nd)[:,None]
    j = tf.range(ns)
    m = i >= j - ns + nd
    return tf.cast(m, dtype)

这个函数实现了一个多头注意力(Multi-head Attention)机制:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def attn(x, scope, n_state, *, past, hparams):
    assert x.shape.ndims == 3  # Should be [batch, sequence, features]
    assert n_state % hparams.n_head == 0
    if past is not None:
        assert past.shape.ndims == 5  # Should be [batch, 2, heads, sequence, features], where 2 is [k, v]

    def split_heads(x):
        # From [batch, sequence, features] to [batch, heads, sequence, features]
        return tf.transpose(split_states(x, hparams.n_head), [0, 2, 1, 3])

    def merge_heads(x):
        # Reverse of split_heads
        return merge_states(tf.transpose(x, [0, 2, 1, 3]))

    def mask_attn_weights(w):
        # w has shape [batch, heads, dst_sequence, src_sequence], where information flows from src to dst.
        _, _, nd, ns = shape_list(w)
        b = attention_mask(nd, ns, dtype=w.dtype)
        b = tf.reshape(b, [1, 1, nd, ns])
        w = w*b - tf.cast(1e10, w.dtype)*(1-b)
        return w

    def multihead_attn(q, k, v):
        # q, k, v have shape [batch, heads, sequence, features]
        w = tf.matmul(q, k, transpose_b=True)
        w = w * tf.rsqrt(tf.cast(v.shape[-1].value, w.dtype))

        w = mask_attn_weights(w)
        w = softmax(w)
        a = tf.matmul(w, v)
        return a

    with tf.variable_scope(scope):
        c = conv1d(x, 'c_attn', n_state*3)
        q, k, v = map(split_heads, tf.split(c, 3, axis=2))
        present = tf.stack([k, v], axis=1)
        if past is not None:
            pk, pv = tf.unstack(past, axis=1)
            k = tf.concat([pk, k], axis=-2)
            v = tf.concat([pv, v], axis=-2)
        a = multihead_attn(q, k, v)
        a = merge_heads(a)
        a = conv1d(a, 'c_proj', n_state)
        return a, present

这段代码定义了一个多层感知机 (MLP) 模块,通常在深度学习模型中用于非线性变换:

1
2
3
4
5
6
def mlp(x, scope, n_state, *, hparams):
    with tf.variable_scope(scope):
        nx = x.shape[-1].value
        h = gelu(conv1d(x, 'c_fc', n_state))
        h2 = conv1d(h, 'c_proj', nx)
        return h2

这段代码实现了一个标准的 Transformer 块,其中包括以下步骤:

  1. 层归一化 + 自注意力机制 + 残差连接
  2. 层归一化 + 多层感知机(MLP) + 残差连接
  3. 返回更新后的输出 x 和注意力层的缓存 present

这个设计符合 Transformer 模型的标准结构,适用于自然语言处理任务中的序列建模。

1
2
3
4
5
6
7
8
def block(x, scope, *, past, hparams):
    with tf.variable_scope(scope):
        nx = x.shape[-1].value
        a, present = attn(norm(x, 'ln_1'), 'attn', nx, past=past, hparams=hparams)
        x = x + a
        m = mlp(norm(x, 'ln_2'), 'mlp', nx*4, hparams=hparams)
        x = x + m
        return x, present

past_shape: 定义缓存张量的形状,用于存储注意力的 Key 和 Value。

expand_tile: 扩展输入张量的维度,并复制以适配批量处理。

positions_for: 生成位置索引序列,并考虑历史时间步偏移。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def past_shape(*, hparams, batch_size=None, sequence=None):
    return [batch_size, hparams.n_layer, 2, hparams.n_head, sequence, hparams.n_embd // hparams.n_head]

def expand_tile(value, size):
    """Add a new axis of given size."""
    value = tf.convert_to_tensor(value, name='value')
    ndims = value.shape.ndims
    return tf.tile(tf.expand_dims(value, axis=0), [size] + [1]*ndims)

def positions_for(tokens, past_length):
    batch_size = tf.shape(tokens)[0]
    nsteps = tf.shape(tokens)[1]
    return expand_tile(past_length + tf.range(nsteps), batch_size)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def model(hparams, X, past=None, scope='model', reuse=False):
    with tf.variable_scope(scope, reuse=reuse):
        results = {}
        batch, sequence = shape_list(X)

        wpe = tf.get_variable('wpe', [hparams.n_ctx, hparams.n_embd],
                             initializer=tf.random_normal_initializer(stddev=0.01))
        wte = tf.get_variable('wte', [hparams.n_vocab, hparams.n_embd],
                             initializer=tf.random_normal_initializer(stddev=0.02))
        past_length = 0 if past is None else tf.shape(past)[-2]
        h = tf.gather(wte, X) + tf.gather(wpe, positions_for(X, past_length))

        # Transformer
        presents = []
        pasts = tf.unstack(past, axis=1) if past is not None else [None] * hparams.n_layer
        assert len(pasts) == hparams.n_layer
        for layer, past in enumerate(pasts):
            h, present = block(h, 'h%d' % layer, past=past, hparams=hparams)
            presents.append(present)
        results['present'] = tf.stack(presents, axis=1)
        h = norm(h, 'ln_f')

        # Language model loss.  Do tokens <n predict token n?
        h_flat = tf.reshape(h, [batch*sequence, hparams.n_embd])
        logits = tf.matmul(h_flat, wte, transpose_b=True)
        logits = tf.reshape(logits, [batch, sequence, hparams.n_vocab])
        results['logits'] = logits
        return results

这段代码定义了一个深度学习模型 model,它实现了一个基于 Transformer 的语言模型(如 GPT)。以下是代码的详细解释:


函数签名

1
def model(hparams, X, past=None, scope='model', reuse=False):

参数:

  • hparams: 超参数对象,包含模型的配置(如嵌入维度、上下文长度、词汇表大小等)。

  • X: 输入张量,形状为 [batch_size, sequence_length],表示输入的 token 序列(通常是词的索引)。

  • past

    缓存张量,存储来自先前时间步的注意力状态(Key 和 Value)。形状为:

    1
    
    [batch_size, n_layer, 2, n_head, past_length, n_embd//n_head]
    

    如果为空,则表示模型从头开始推理。

  • scope: 变量作用域的名称。

  • reuse: 是否重用作用域内的变量(用于变量共享)。

返回值:

一个字典 results,包含:

  • present: 当前层的注意力缓存状态。
  • logits: 模型的输出(未归一化的概率分布)。

模型实现

1. 定义变量作用域

1
with tf.variable_scope(scope, reuse=reuse):
  • 使用 TensorFlow 的变量作用域,确保变量命名唯一,并支持变量共享。

2. 初始化变量

1
batch, sequence = shape_list(X)
  • 使用 shape_list 获取输入 X 的批次大小(batch)和序列长度(sequence)。

3. 定义嵌入矩阵

1
2
3
4
wpe = tf.get_variable('wpe', [hparams.n_ctx, hparams.n_embd],
                      initializer=tf.random_normal_initializer(stddev=0.01))
wte = tf.get_variable('wte', [hparams.n_vocab, hparams.n_embd],
                      initializer=tf.random_normal_initializer(stddev=0.02))
  • wpe: 位置嵌入矩阵,形状为 [上下文长度, 嵌入维度]
  • wte: 词嵌入矩阵,形状为 [词汇表大小, 嵌入维度]
  • 使用随机正态分布初始化权重。

4. 计算位置偏移

1
2
past_length = 0 if past is None else tf.shape(past)[-2]
h = tf.gather(wte, X) + tf.gather(wpe, positions_for(X, past_length))
  • past_length: 如果没有缓存 past,则 past_length=0,否则为缓存的时间步长度。
  • tf.gather(wte, X): 通过索引 X 获取词嵌入。
  • tf.gather(wpe, positions_for(X, past_length)): 获取位置嵌入(利用 positions_for 生成位置索引)。
  • 将词嵌入和位置嵌入相加,得到初始的输入表示 h

5. Transformer 层的前向计算

1
2
3
presents = []
pasts = tf.unstack(past, axis=1) if past is not None else [None] * hparams.n_layer
assert len(pasts) == hparams.n_layer
  • presents: 存储当前层的注意力状态(Key 和 Value)。
  • pasts: 如果存在缓存,则按层解包缓存张量;否则初始化为空。
1
2
3
4
for layer, past in enumerate(pasts):
    h, present = block(h, 'h%d' % layer, past=past, hparams=hparams)
    presents.append(present)
results['present'] = tf.stack(presents, axis=1)
  • 遍历每一层的 Transformer 块 :

    1
    
    block
    
    • 传入输入表示 h 和当前层的缓存状态 past
    • 计算输出 h 和当前层的注意力状态 present
  • 将所有层的 present 堆叠,形状为 [batch_size, n_layer, 2, n_head, sequence, n_embd//n_head],并存入结果字典。


6. 输出层归一化

1
h = norm(h, 'ln_f')
  • 对最后一层的输出 h 进行层归一化(Layer Normalization)。

7. 输出层计算

1
2
3
4
h_flat = tf.reshape(h, [batch*sequence, hparams.n_embd])
logits = tf.matmul(h_flat, wte, transpose_b=True)
logits = tf.reshape(logits, [batch, sequence, hparams.n_vocab])
results['logits'] = logits
  • h_flat: 将 h 重塑为二维张量 [batch*sequence, 嵌入维度],方便后续矩阵乘法操作。
  • logits:
    • 与词嵌入矩阵 wte 的转置相乘,计算每个 token 的得分(未归一化概率)。
    • 将结果重塑为 [batch, sequence, vocab_size]
  • logits 存入结果字典。

总结

  • 输入:词序列 X 和缓存状态 past
  • 词和位置嵌入:通过词嵌入和位置嵌入初始化输入表示。
  • Transformer 层:
    • 通过多层 Transformer 块(包含注意力机制和 MLP)计算表示。
    • 保存当前时间步的注意力状态。
  • 输出层:通过词嵌入矩阵的转置计算每个 token 的 logits。
  • 返回值:
    • present:当前层的注意力缓存。
    • logits:预测下一个 token 的分布。

这是一个标准的基于 Transformer 的语言模型,常用于生成任务或语言建模任务中(如 GPT)。

encoder

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""Byte pair encoding utilities"""

import os
import json
import regex as re
from functools import lru_cache

@lru_cache()
def bytes_to_unicode():
    """
    Returns list of utf-8 byte and a corresponding list of unicode strings.
    The reversible bpe codes work on unicode strings.
    This means you need a large # of unicode characters in your vocab if you want to avoid UNKs.
    When you're at something like a 10B token dataset you end up needing around 5K for decent coverage.
    This is a signficant percentage of your normal, say, 32K bpe vocab.
    To avoid that, we want lookup tables between utf-8 bytes and unicode strings.
    And avoids mapping to whitespace/control characters the bpe code barfs on.
    """
    bs = list(range(ord("!"), ord("~")+1))+list(range(ord("¡"), ord("¬")+1))+list(range(ord("®"), ord("ÿ")+1))
    cs = bs[:]
    n = 0
    for b in range(2**8):
        if b not in bs:
            bs.append(b)
            cs.append(2**8+n)
            n += 1
    cs = [chr(n) for n in cs]
    return dict(zip(bs, cs))

def get_pairs(word):
    """Return set of symbol pairs in a word.

    Word is represented as tuple of symbols (symbols being variable-length strings).
    """
    pairs = set()
    prev_char = word[0]
    for char in word[1:]:
        pairs.add((prev_char, char))
        prev_char = char
    return pairs

class Encoder:
    def __init__(self, encoder, bpe_merges, errors='replace'):
        self.encoder = encoder
        self.decoder = {v:k for k,v in self.encoder.items()}
        self.errors = errors # how to handle errors in decoding
        self.byte_encoder = bytes_to_unicode()
        self.byte_decoder = {v:k for k, v in self.byte_encoder.items()}
        self.bpe_ranks = dict(zip(bpe_merges, range(len(bpe_merges))))
        self.cache = {}

        # Should haved added re.IGNORECASE so BPE merges can happen for capitalized versions of contractions
        self.pat = re.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""")

    def bpe(self, token):
        if token in self.cache:
            return self.cache[token]
        word = tuple(token)
        pairs = get_pairs(word)

        if not pairs:
            return token

        while True:
            bigram = min(pairs, key = lambda pair: self.bpe_ranks.get(pair, float('inf')))
            if bigram not in self.bpe_ranks:
                break
            first, second = bigram
            new_word = []
            i = 0
            while i < len(word):
                try:
                    j = word.index(first, i)
                    new_word.extend(word[i:j])
                    i = j
                except:
                    new_word.extend(word[i:])
                    break

                if word[i] == first and i < len(word)-1 and word[i+1] == second:
                    new_word.append(first+second)
                    i += 2
                else:
                    new_word.append(word[i])
                    i += 1
            new_word = tuple(new_word)
            word = new_word
            if len(word) == 1:
                break
            else:
                pairs = get_pairs(word)
        word = ' '.join(word)
        self.cache[token] = word
        return word

    def encode(self, text):
        bpe_tokens = []
        for token in re.findall(self.pat, text):
            token = ''.join(self.byte_encoder[b] for b in token.encode('utf-8'))
            bpe_tokens.extend(self.encoder[bpe_token] for bpe_token in self.bpe(token).split(' '))
        return bpe_tokens

    def decode(self, tokens):
        text = ''.join([self.decoder[token] for token in tokens])
        text = bytearray([self.byte_decoder[c] for c in text]).decode('utf-8', errors=self.errors)
        return text

def get_encoder(model_name, models_dir):
    with open(os.path.join(models_dir, model_name, 'encoder.json'), 'r') as f:
        encoder = json.load(f)
    with open(os.path.join(models_dir, model_name, 'vocab.bpe'), 'r', encoding="utf-8") as f:
        bpe_data = f.read()
    bpe_merges = [tuple(merge_str.split()) for merge_str in bpe_data.split('\n')[1:-1]]
    return Encoder(
        encoder=encoder,
        bpe_merges=bpe_merges,
    )

def bytes_to_unicode() 主要功能

  • 字节到 Unicode 的双向映射: 该函数实现了 utf-8 字节到 Unicode 字符的映射,确保:
    1. 所有常见字符都能直接映射。
    2. 额外的字节值(控制字符或空白字符等)映射到专用的 Unicode 字符(避免与常规字符冲突)。
  • 应用场景
    • 分词(Tokenization): 在基于 BPE 的分词方法中,通常操作的是 Unicode 字符而不是字节。这个映射表允许在处理文本时,轻松地将字节表示与 Unicode 表示互相转换。
    • 减少未登录词(UNK): 通过将所有字节映射到特定的 Unicode 字符,能够有效避免未登录词(UNK)的问题。

def get_pairs(word):主要功能

  • 符号对提取:
    • 用于提取单词中相邻符号的所有可能组合(无重复)。
    • 符号可以是单个字符,也可以是更大的单位(如字节或子词)。
  • BPE 算法中的作用:
    • 在 Byte Pair Encoding (BPE) 分词算法中:
      • 通过提取符号对,可以统计每对符号出现的频率。
      • 根据频率合并最常见的符号对,逐步生成新的子词。

Byte Pair Encoding (BPE) 是一种简单而高效的数据压缩算法,同时也是自然语言处理(NLP)领域中广泛使用的分词技术。它通过将低级单元(如字符或字节)合并成更大的单元(如子词)来实现压缩或分词。


BPE 的来源

  1. 数据压缩
    • BPE 最初是用于压缩数据的一种算法(提出于 1994 年)。它通过不断将数据中出现频率最高的相邻字节对合并为新的单个字节来减少整体数据量。
  2. 自然语言处理中的应用
    • 在 NLP 中,BPE 被用作一种 子词(subword)分词方法
    • 传统的分词方法(如基于空格或词典分词)容易遇到未登录词(OOV, Out-Of-Vocabulary)问题。而 BPE 通过将单词分解成子词,可以有效减少未登录词的出现,同时提高模型的泛化能力。

BPE 的基本思想

BPE 的核心思想是:找到并合并文本数据中出现频率最高的相邻符号对,直到达到预设的词汇表大小或其他停止条件

关键步骤

  1. 初始化
    • 将文本中的每个单词拆分为基本符号(如字符或字节)。
      • 示例:"word"['w', 'o', 'r', 'd']
  2. 统计符号对
    • 找出所有相邻符号对,并统计它们的频率。
      • 示例:['w', 'o', 'r', 'd'] 中的符号对:('w', 'o'), ('o', 'r'), ('r', 'd')
  3. 合并最频繁的符号对
    • 找到频率最高的符号对,将其合并为新的符号。
      • 示例:合并 ('o', 'r')['w', 'or', 'd']
  4. 重复步骤 2 和 3
    • 不断统计新的符号对并进行合并,直到满足预设条件(如词汇表大小达到限制)。
      • 示例:继续合并 ('w', 'or')['wor', 'd']
      • 再合并 ('wor', 'd')['word']

BPE 的优点

  1. 减少未登录词(OOV)问题
    • 即使遇到从未见过的单词,也可以将其分解为子词或字符进行处理。
      • 如单词 unbelievable,如果整个单词不在词汇表中,BPE 可能将其分解为 ["un", "believ", "able"]
  2. 提高模型效率
    • 子词分词可以显著减少词汇表大小,同时保留对罕见单词的处理能力。
  3. 适应多语言处理
    • BPE 能够灵活地处理字符和子词,特别适用于多种语言的分词需求。

BPE 的示例

"low lower lowest" 为例,假设目标是生成 BPE 词汇表。

  1. 初始状态

    • 将单词拆解为字符:['l', 'o', 'w'], ['l', 'o', 'w', 'e', 'r'], ['l', 'o', 'w', 'e', 's', 't']
  2. 统计符号对频率

    1
    2
    3
    4
    5
    6
    
    ('l', 'o'): 3
    ('o', 'w'): 3
    ('w', 'e'): 2
    ('e', 'r'): 1
    ('e', 's'): 1
    ('s', 't'): 1
    
  3. 合并频率最高的符号对

    • 合并

      1
      
      ('l', 'o')
      

      → 新的单词序列:

      • ['lo', 'w'], ['lo', 'w', 'e', 'r'], ['lo', 'w', 'e', 's', 't']
  4. 重复统计与合并

    • 再次统计符号对,并合并:

      • 合并

        1
        
        ('lo', 'w')
        

        → 新的单词序列:

        • ['low'], ['low', 'e', 'r'], ['low', 'e', 's', 't']
    • 最终,可能得到:

      • ['low'], ['lower'], ['lowest']

实际用途

  1. 语言模型
    • 主流预训练语言模型(如 GPT、BERT)的分词方法往往基于 BPE 或类似方法(如 WordPiece)。
    • 这些模型会基于大规模语料构建一个子词词汇表。
  2. 文本压缩
    • BPE 可用于减少数据存储占用,如在数据传输和存储优化中。

class Encoder:

这段代码定义了一个 Encoder 类,用于实现基于 Byte Pair Encoding (BPE) 的分词(tokenization)和解码(detokenization)过程。它在自然语言处理中非常有用,特别是在构建和使用基于子词的语言模型时。以下是代码的逐步解析:


1. __init__ 方法:初始化

1
def __init__(self, encoder, bpe_merges, errors='replace'):
  • 参数

    • encoder: 一个映射,将 BPE 子词(tokens)映射到唯一的整数 ID。
    • bpe_merges: BPE 合并操作列表,表示哪些符号对应该被优先合并。
    • errors: 在解码过程中如何处理错误,默认为 'replace'
  • 初始化内容

    1
    2
    
    self.encoder = encoder
    self.decoder = {v: k for k, v in self.encoder.items()}
    
    • self.encoder:用于编码文本(子词 → ID)。
    • self.decoder:用于解码文本(ID → 子词),通过反转 encoder 构建。
    1
    2
    
    self.byte_encoder = bytes_to_unicode()
    self.byte_decoder = {v: k for k, v in self.byte_encoder.items()}
    
    • self.byte_encoder:字节到 Unicode 字符的映射表(使用之前定义的 bytes_to_unicode())。
    • self.byte_decoder:反向映射,用于将 Unicode 字符转换回字节。
    1
    
    self.bpe_ranks = dict(zip(bpe_merges, range(len(bpe_merges))))
    
    • bpe_merges 转换为字典 bpe_ranks,用来快速查找符号对的优先级(频率越高,rank 越小)。
    1
    
    self.cache = {}
    
    • 缓存字典,用于存储已经计算过的 BPE 分词结果,避免重复计算。
    1
    
    self.pat = re.compile(r"""'s|'t|'re|'ve|'m|'ll|'d| ?\p{L}+| ?\p{N}+| ?[^\s\p{L}\p{N}]+|\s+(?!\S)|\s+""")
    
    • 分词正则表达式:
      • 匹配子词、数字、标点符号和空格。
      • 例如,"I'm learning BPE." 会被分成:["I", "'m", "learning", "BPE", "."]

2. bpe 方法:核心的 BPE 分词逻辑

1
def bpe(self, token):
  • 输入:一个字符串 token

  • 功能:对输入的字符串执行 BPE 操作,将其分解为 BPE 子词。

  • 步骤

    1. 缓存检查

      1
      2
      
      if token in self.cache:
          return self.cache[token]
      

      如果 token 已经计算过,直接返回缓存结果。

    2. 初始化

      1
      2
      
      word = tuple(token)
      pairs = get_pairs(word)
      
      • token 转换为元组(字符序列),例如:"low"('l', 'o', 'w')
      • 调用 get_pairs 提取相邻字符对。
    3. 循环合并符号对

      1
      2
      3
      4
      
      while True:
          bigram = min(pairs, key=lambda pair: self.bpe_ranks.get(pair, float('inf')))
          if bigram not in self.bpe_ranks:
              break
      
      • 找出当前符号对中优先级(rank)最高的 bigram
      • 如果 bigram 不在 bpe_ranks 中,结束循环。

      符号对合并逻辑

      1
      2
      3
      
      if word[i] == first and i < len(word)-1 and word[i+1] == second:
          new_word.append(first+second)
          i += 2
      
      • 如果找到匹配的符号对,合并成一个新符号。
    4. 缓存结果

      1
      2
      
      self.cache[token] = word
      return word
      
      • 将分词结果存入缓存并返回。

3. encode 方法:文本分词

1
def encode(self, text):
  • 输入:字符串 text

  • 功能:对输入文本执行分词,返回对应的 token ID 列表。

  • 步骤

    1. 正则匹配

      1
      
      for token in re.findall(self.pat, text):
      

      使用 self.pat 对文本进行分词。

    2. 字节编码

      1
      
      token = ''.join(self.byte_encoder[b] for b in token.encode('utf-8'))
      

      将每个分词转换为 UTF-8 字节,然后通过 byte_encoder 映射为 Unicode 字符。

    3. BPE 分词

      1
      
      bpe_tokens.extend(self.encoder[bpe_token] for bpe_token in self.bpe(token).split(' '))
      
      • 调用 bpe 方法对子词执行 BPE 操作。
      • 将结果映射为 token ID。
    4. 返回结果

      1
      
      return bpe_tokens
      

      返回所有 token ID 的列表。


4. decode 方法:ID 解码

1
def decode(self, tokens):
  • 输入:整数列表 tokens(token IDs)。

  • 功能:将 token ID 解码为原始文本。

  • 步骤

    1. 子词还原

      1
      
      text = ''.join([self.decoder[token] for token in tokens])
      

      使用 self.decoder 将 token ID 映射回子词。

    2. 字节解码

      1
      
      text = bytearray([self.byte_decoder[c] for c in text]).decode('utf-8', errors=self.errors)
      
      • 使用 byte_decoder 将 Unicode 字符转换回字节。
      • 将字节解码为 UTF-8 文本。
    3. 返回结果

      1
      
      return text
      

def get_encoder(model_name, models_dir)

这段代码定义了一个函数 get_encoder,它用于加载和初始化一个 Encoder 实例,通常用于自然语言处理任务(例如基于 BPE 的分词器)。下面是具体的功能和步骤解析:


1. 函数签名

1
def get_encoder(model_name, models_dir):
  • 参数

    • model_name: 模型的名称(字符串),表示特定模型文件所在的子目录名称。
    • models_dir: 存储模型文件的主目录路径。
  • 返回值

    • 返回一个 Encoder 对象,用于分词和解码。

2. 加载 encoder.json 文件

1
2
with open(os.path.join(models_dir, model_name, 'encoder.json'), 'r') as f:
    encoder = json.load(f)
  • 功能

    • 打开模型目录中的 encoder.json 文件。
    • 使用 json.load 解析 JSON 文件内容,加载编码器(encoder)。
  • encoder.json 的作用

    • 是一个 映射文件,将每个子词(BPE token)映射到唯一的整数 ID。

    • 格式示例:

      1
      2
      3
      4
      5
      6
      7
      
      {
          "hello": 0,
          "world": 1,
          "h": 2,
          "ello": 3,
          ...
      }
      

3. 加载 vocab.bpe 文件

1
2
with open(os.path.join(models_dir, model_name, 'vocab.bpe'), 'r', encoding="utf-8") as f:
    bpe_data = f.read()
  • 功能

    • 打开模型目录中的 vocab.bpe 文件(UTF-8 编码)。
    • 使用 f.read() 读取整个文件内容到字符串 bpe_data 中。
  • vocab.bpe 的作用

    • 是一个 BPE 合并规则文件,定义了所有符号对的合并顺序。

    • 格式示例:

      1
      2
      3
      4
      5
      
      #version: 0.2
      h e
      he llo
      lo w
      ...
      
      • 每行表示一个符号对(例如 h e)。
      • 第一行是注释,通常表示文件版本。

4. 提取合并规则

1
bpe_merges = [tuple(merge_str.split()) for merge_str in bpe_data.split('\n')[1:-1]]
  • 功能
    • bpe_data 中提取 BPE 合并规则,跳过第一行注释([1:])。
    • 使用 split() 将每行内容分割成符号对(元组)。
    • 例如:
      • 输入:"h e\nhe llo\nlo w\n"
      • 结果:[('h', 'e'), ('he', 'llo'), ('lo', 'w')]
  • bpe_merges 的作用
    • 定义了符号对合并的优先顺序,用于 BPE 分词。

5. 创建 Encoder 实例

1
2
3
4
return Encoder(
    encoder=encoder,
    bpe_merges=bpe_merges,
)
  • 功能
    • 使用 encoderbpe_merges 创建一个 Encoder 对象。
    • Encoder 返回,以便调用者可以使用其 encodedecode 方法进行分词和解码。
  • 参数传递到 Encoder 的内容
    • encoder: 将子词映射到整数 ID 的字典。
    • bpe_merges: BPE 合并规则(优先级按列表顺序定义)。

代码整体作用总结

  • 输入:
    • 模型名称(model_name)和模型文件所在目录(models_dir)。
  • 功能:
    • 从指定目录中加载:
      1. 子词到整数 ID 的映射文件(encoder.json)。
      2. BPE 合并规则文件(vocab.bpe)。
    • 初始化一个 Encoder 对象,具备分词和解码功能。
  • 输出:
    • 一个可以直接用于分词和解码的 Encoder 实例。

应用场景

  • 模型分词初始化:

    • 在基于子词(如 GPT、BERT)训练的语言模型中,get_encoder 通常用于加载与模型对应的分词器。
  • 分词和解码:

    • 加载完成后,

      1
      
      Encoder
      

      对象可以用来:

      • 分词:将文本转为 token ID。
      • 解码:将 token ID 还原为文本。

示例调用

假设模型目录为 "models/",模型名称为 "gpt2",文件结构如下:

1
2
3
4
models/
├── gpt2/
│   ├── encoder.json
│   ├── vocab.bpe

调用代码:

1
2
3
4
5
6
encoder = get_encoder('gpt2', 'models/')
text = "Hello, world!"
tokens = encoder.encode(text)
print(tokens)  # 输出分词后的 token ID 列表
decoded_text = encoder.decode(tokens)
print(decoded_text)  # 输出解码后的原始文本

sample

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import tensorflow as tf

import model

def top_k_logits(logits, k):
    if k == 0:
        # no truncation
        return logits

    def _top_k():
        values, _ = tf.nn.top_k(logits, k=k)
        min_values = values[:, -1, tf.newaxis]
        return tf.where(
            logits < min_values,
            tf.ones_like(logits, dtype=logits.dtype) * -1e10,
            logits,
        )
    return tf.cond(
       tf.equal(k, 0),
       lambda: logits,
       lambda: _top_k(),
    )


def top_p_logits(logits, p):
    """Nucleus sampling"""
    batch, _ = logits.shape.as_list()
    sorted_logits = tf.sort(logits, direction='DESCENDING', axis=-1)
    cumulative_probs = tf.cumsum(tf.nn.softmax(sorted_logits, axis=-1), axis=-1)
    indices = tf.stack([
        tf.range(0, batch),
        # number of indices to include
        tf.maximum(tf.reduce_sum(tf.cast(cumulative_probs <= p, tf.int32), axis=-1) - 1, 0),
    ], axis=-1)
    min_values = tf.gather_nd(sorted_logits, indices)
    return tf.where(
        logits < min_values,
        tf.ones_like(logits) * -1e10,
        logits,
    )


def sample_sequence(*, hparams, length, start_token=None, batch_size=None, context=None, temperature=1, top_k=0, top_p=1):
    if start_token is None:
        assert context is not None, 'Specify exactly one of start_token and context!'
    else:
        assert context is None, 'Specify exactly one of start_token and context!'
        context = tf.fill([batch_size, 1], start_token)

    def step(hparams, tokens, past=None):
        lm_output = model.model(hparams=hparams, X=tokens, past=past, reuse=tf.AUTO_REUSE)

        logits = lm_output['logits'][:, :, :hparams.n_vocab]
        presents = lm_output['present']
        presents.set_shape(model.past_shape(hparams=hparams, batch_size=batch_size))
        return {
            'logits': logits,
            'presents': presents,
        }

    with tf.name_scope('sample_sequence'):
        def body(past, prev, output):
            next_outputs = step(hparams, prev, past=past)
            logits = next_outputs['logits'][:, -1, :]  / tf.to_float(temperature)
            logits = top_k_logits(logits, k=top_k)
            logits = top_p_logits(logits, p=top_p)
            samples = tf.multinomial(logits, num_samples=1, output_dtype=tf.int32)
            return [
                next_outputs['presents'] if past is None else tf.concat([past, next_outputs['presents']], axis=-2),
                samples,
                tf.concat([output, samples], axis=1)
            ]

        past, prev, output = body(None, context, context)

        def cond(*args):
            return True

        _, _, tokens = tf.while_loop(
            cond=cond, body=body,
            maximum_iterations=length - 1,
            loop_vars=[
                past,
                prev,
                output
            ],
            shape_invariants=[
                tf.TensorShape(model.past_shape(hparams=hparams, batch_size=batch_size)),
                tf.TensorShape([batch_size, None]),
                tf.TensorShape([batch_size, None]),
            ],
            back_prop=False,
        )

        return tokens

这段代码定义了一个函数 top_k_logits,用于对 logits(模型的输出分布)进行 Top-K 限制。Top-K 策略是一种在生成任务(如文本生成)中常用的解码方法,用于限制候选输出的范围。下面是代码的详细解释。


1. 函数签名

1
def top_k_logits(logits, k):
  • 输入参数
    • logits: 张量(Tensor),通常是语言模型输出的未归一化分布(对每个词汇的分数)。
    • k: 整数,表示要保留分数最高的 k 个候选项。
  • 返回值
    • 一个经过 Top-K 过滤的 logits 张量,分数低于前 k 名的项被屏蔽为非常小的值(-1e10)。

2. 特殊情况:k=0

1
2
3
if k == 0:
    # no truncation
    return logits
  • 含义

    • 如果 k=0,表示不进行任何截断操作,直接返回原始 logits
    • 通常在解码时使用 k=0 的情况很少。

3. 内部函数 _top_k

1
def _top_k():
  • 这个内部函数定义了具体的 Top-K 截断操作
  • 核心步骤

Step 1: 获取 Top-K 候选项的分数

1
2
values, _ = tf.nn.top_k(logits, k=k)
min_values = values[:, -1, tf.newaxis]
  • 使用 tf.nn.top_k(logits, k=k)
    • 返回 logits 中每一行的前 k 个最大值及其对应索引。
    • values 是每一行的 Top-K 候选分数。
    • values[:, -1] 是每一行中第 k 小的分数(即 Top-K 的最小值)。
  • min_values 的作用
    • min_values 扩展维度(添加 tf.newaxis),使其形状与 logits 兼容,方便后续比较。

Step 2: 屏蔽低于 Top-K 的分数

1
2
3
4
5
return tf.where(
    logits < min_values,
    tf.ones_like(logits, dtype=logits.dtype) * -1e10,
    logits,
)
  • tf.where 的功能
    • 如果 logits 小于 min_values(不在 Top-K 中),将其替换为 -1e10
    • 否则保留原始分数。
  • 屏蔽分数为 -1e10 的作用
    • 在随后的 Softmax 操作中,这些值将被归一化为接近 0 的概率,从而忽略这些候选项。

4. 条件执行

1
2
3
4
5
return tf.cond(
   tf.equal(k, 0),
   lambda: logits,
   lambda: _top_k(),
)
  • tf.cond 的功能
    • 根据条件执行不同的分支。
    • 如果 k=0,直接返回 logits
    • 如果 k>0,调用 _top_k() 进行 Top-K 截断。
  • 为何使用 tf.cond 而不是简单的 if-else
    • 在 TensorFlow 中,动态计算图需要明确定义执行逻辑,tf.cond 是条件分支的标准操作。

5. 工作原理总结

  1. 如果 k=0,直接返回原始 logits

  2. 如果 :

    1
    
    k>0
    
    • 找出每行 logits 中的前 k 个最大值。
    • 将不在 Top-K 中的分数替换为 -1e10

代码运行示例

假设输入如下:

1
2
logits = tf.constant([[2.0, 1.0, 0.1, 0.5], [0.1, 3.0, 2.5, 1.0]])
k = 2
  • 执行步骤

    1. 第一行 logits:
      • Top-2 候选项为 [2.0, 1.0]
      • 其余值屏蔽为 -1e10
      • 结果为 [2.0, 1.0, -1e10, -1e10]
    2. 第二行 logits:
      • Top-2 候选项为 [3.0, 2.5]
      • 结果为 [-1e10, 3.0, 2.5, -1e10]
  • 最终输出

    1
    2
    
    [[ 2.0,  1.0, -1e10, -1e10],
     [-1e10,  3.0,  2.5, -1e10]]
    

应用场景

  • 生成任务(如文本生成、机器翻译)
    • 用于控制解码时的多样性,限制模型在每一步生成的候选项数量。
    • Top-P (nucleus) 截断 一起常用于生成优化。
  • 降低计算复杂度
    • 对候选项进行截断,减少后续计算量。

这段代码实现了 Top-p 截断(Nucleus Sampling),它是一种生成任务中的采样方法,确保生成的分布仅包含累积概率不超过 p 的候选项,从而控制生成多样性。以下是对代码的详细解释:


1. 函数签名

1
2
def top_p_logits(logits, p):
    """Nucleus sampling"""
  • 输入参数
    • logits:模型输出的未归一化分布(对每个词汇的分数),形状为 [batch_size, vocab_size]
    • p:浮点数,累积概率阈值,范围为 [0, 1]。例如,p=0.9 意味着仅保留累积概率小于等于 0.9 的候选项。
  • 返回值
    • 一个经过 Top-p 截断的 logits 张量,分数低于累积概率阈值 p 的候选项被屏蔽(赋值为 -1e10)。

2. 获取输入维度

1
batch, _ = logits.shape.as_list()
  • 功能:
    • 获取 logits 的形状,batch 是批量大小,_ 是词汇表的大小(vocab_size)。
    • 假设输入 logits 形状为 [batch_size, vocab_size]

3. 对 logits 按分数降序排序

1
sorted_logits = tf.sort(logits, direction='DESCENDING', axis=-1)
  • 功能:
    • logits 的分数降序排序(在最后一维,即词汇维度上)。
    • sorted_logits 是按分数从大到小排列的张量,与 logits 的形状相同。

4. 计算累积概率

1
cumulative_probs = tf.cumsum(tf.nn.softmax(sorted_logits, axis=-1), axis=-1)
  • 步骤

    1. 使用 tf.nn.softmaxsorted_logits 进行归一化,得到概率分布。

    2. 使用

      1
      
      tf.cumsum
      

      计算累积概率(在最后一维上)。

      • 累积概率:对于每个词,表示该词及之前所有词的总概率。
  • 示例: 假设 sorted_logits 对应的 Softmax 概率为 [0.4, 0.3, 0.2, 0.1],累积概率为 [0.4, 0.7, 0.9, 1.0]


5. 找到超过阈值 p 的最小索引

1
2
3
4
indices = tf.stack([
    tf.range(0, batch),
    tf.maximum(tf.reduce_sum(tf.cast(cumulative_probs <= p, tf.int32), axis=-1) - 1, 0),
], axis=-1)

分步解释

  1. 判断累积概率是否小于等于 p

    1
    
    tf.cast(cumulative_probs <= p, tf.int32)
    
    • cumulative_probs 的每个值,判断是否小于等于 p
    • 结果是一个布尔值矩阵,转化为 0(False)或 1(True)。
  2. 统计满足条件的个数

    1
    
    tf.reduce_sum(..., axis=-1)
    
    • 对每一行(词汇维度)统计满足 cumulative_probs <= p 的词的个数。
    • 结果是一个形状为 [batch_size] 的张量,每个值表示该行中满足条件的词的数量。
  3. 找到累积概率超过 p 的最小索引

    1
    
    tf.maximum(... - 1, 0)
    
    • 累积概率刚超过 p 的词的索引应该是 条件个数 - 1
    • 如果没有词满足条件(防止索引为负数),用 tf.maximum 将索引下限设为 0
  4. 组合成二维索引

    1
    
    tf.stack([tf.range(0, batch), ...], axis=-1)
    
    • 创建每个批次的行索引(tf.range(0, batch))和列索引(条件个数 - 1)。
    • indices 是一个形状为 [batch_size, 2] 的张量,每个元素表示需要保留的最大分数位置。

6. 找到累积概率阈值对应的最小分数

1
min_values = tf.gather_nd(sorted_logits, indices)
  • 功能:
    • 使用 indicessorted_logits 中提取累积概率刚超过 p 的最小分数(阈值分数)。
    • min_values 是一个形状为 [batch_size] 的张量,每个值表示该批次对应的分数阈值。

7. 屏蔽分数低于阈值的 logits

1
2
3
4
5
return tf.where(
    logits < min_values,
    tf.ones_like(logits) * -1e10,
    logits,
)
  • tf.where 的功能
    • 如果 logits 小于 min_values,将其替换为 -1e10(非常小的值)。
    • 否则保留原始分数。
  • 效果
    • 低于分数阈值的词汇几乎不会被选中(后续的 Softmax 会将其概率归一化为接近 0)。

代码总结

这段代码实现了 Top-p 截断(Nucleus Sampling),步骤如下:

  1. 对 logits 按分数降序排序。
  2. 计算每个词的累积概率。
  3. 找到刚好满足累积概率 ≤ p 的分数阈值。
  4. 将分数低于阈值的候选项屏蔽为 -1e10

代码运行示例

假设输入如下:

1
2
logits = tf.constant([[2.0, 1.0, 0.5, 0.1], [1.5, 1.0, 0.3, 0.2]])
p = 0.8

执行过程

  1. 对 logits 排序,得到:

    1
    2
    
    [[2.0, 1.0, 0.5, 0.1],
     [1.5, 1.0, 0.3, 0.2]]
    
  2. 计算累积概率(Softmax + cumsum),例如:

    1
    2
    
    [[0.6, 0.9, 0.99, 1.0],
     [0.5, 0.85, 0.98, 1.0]]
    
  3. 阈值对应分数为:

    1
    
    [1.0, 1.0]
    
  4. 屏蔽分数低于阈值的项:

    1
    2
    
    [[2.0, 1.0, -1e10, -1e10],
     [1.5, 1.0, -1e10, -1e10]]
    

应用场景

  • 文本生成任务:
    • 限制生成范围,仅保留累积概率满足 p 的候选项,避免低概率词导致的无意义生成。
  • 生成控制:
    • Top-p 能根据概率动态调整候选项数量,比 Top-k 更灵活。

这段代码定义了一个函数 sample_sequence,用于在基于语言模型(如 GPT-2)的文本生成任务中,逐步生成固定长度的序列。以下是对代码的详细解析。


def sample_sequence

1. 函数签名和输入参数

1
def sample_sequence(*, hparams, length, start_token=None, batch_size=None, context=None, temperature=1, top_k=0, top_p=1):
  • 输入参数:

    1. hparams:模型的超参数,包含词汇大小(n_vocab)、隐藏层大小等配置。
    2. length:生成的序列长度(最大迭代次数)。
    3. start_token:文本生成的起始 token(一个整数表示词汇 ID)。
    4. batch_size:批量大小,决定了同时生成多少条序列。
    5. context:输入上下文序列,形状为 [batch_size, sequence_length],可选。和 start_token 互斥。
    6. temperature:控制采样的随机性。值越低,生成的文本越确定;值越高,生成的文本越随机。
    7. top_k:Top-k 截断,保留概率分布中分数最高的 k 个词。
    8. top_p:Top-p 截断(核采样),保留累积概率小于等于 p 的词。

2. 参数验证和初始上下文设置

1
2
3
4
5
if start_token is None:
    assert context is not None, 'Specify exactly one of start_token and context!'
else:
    assert context is None, 'Specify exactly one of start_token and context!'
    context = tf.fill([batch_size, 1], start_token)
  • 通过

    start_tokencontext

    指定生成的起始条件:

    1. 如果 start_token 被指定,则初始化 context[batch_size, 1] 的张量,值全为 start_token
    2. 如果没有提供 start_token,则需要提供 context
    3. 二者不能同时存在。

3. 定义生成单步的函数 step

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
def step(hparams, tokens, past=None):
    lm_output = model.model(hparams=hparams, X=tokens, past=past, reuse=tf.AUTO_REUSE)

    logits = lm_output['logits'][:, :, :hparams.n_vocab]
    presents = lm_output['present']
    presents.set_shape(model.past_shape(hparams=hparams, batch_size=batch_size))
    return {
        'logits': logits,
        'presents': presents,
    }
  • 功能

    • 调用语言模型(

      1
      
      model.model
      

      ),根据当前输入的 tokens 和历史上下文(

      1
      
      past
      

      )生成:

      1. logits:模型对当前 tokens 的预测分数(未归一化)。
      2. presents:当前生成步骤的注意力层状态,用于缓存历史上下文以提高生成效率。
  • 关键点

    1. past:表示历史生成步骤的缓存状态,用于加速自回归模型的生成。
    2. reuse=tf.AUTO_REUSE:允许在 TensorFlow 中重用模型权重。

4. 定义生成循环的 body 函数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def body(past, prev, output):
    next_outputs = step(hparams, prev, past=past)
    logits = next_outputs['logits'][:, -1, :]  / tf.to_float(temperature)
    logits = top_k_logits(logits, k=top_k)
    logits = top_p_logits(logits, p=top_p)
    samples = tf.multinomial(logits, num_samples=1, output_dtype=tf.int32)
    return [
        next_outputs['presents'] if past is None else tf.concat([past, next_outputs['presents']], axis=-2),
        samples,
        tf.concat([output, samples], axis=1)
    ]
  • 输入参数

    1. past:缓存的历史上下文,用于加速生成。
    2. prev:上一步生成的 tokens。
    3. output:当前生成的完整序列。
  • 功能分解

    1. 调用 step 获取当前步的 logitspresents

    2. 1
      
      logits
      

      进行采样策略:

      • 调整分布随机性(temperature)。
      • 使用 Top-k 和 Top-p 策略限制候选项。
      • 调用 tf.multinomial 从调整后的分布中采样下一个 token。
    3. 更新

      1
      
      past
      

      1
      
      output
      
      • 拼接当前步生成的 presentspast
    • 拼接新生成的 token 到 output
  • 返回值

    • 新的 past、当前步的生成 token、新的生成序列 output

5. 初始化第一步

1
past, prev, output = body(None, context, context)
  • 执行 body,使用 context 初始化生成序列,并获取第一个生成结果。

6. 定义循环条件和主体

1
2
def cond(*args):
    return True
  • 循环条件函数 cond 总是返回 True,表示按固定步数执行循环(由 maximum_iterations 限制)。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
_, _, tokens = tf.while_loop(
    cond=cond, body=body,
    maximum_iterations=length - 1,
    loop_vars=[
        past,
        prev,
        output
    ],
    shape_invariants=[
        tf.TensorShape(model.past_shape(hparams=hparams, batch_size=batch_size)),
        tf.TensorShape([batch_size, None]),
        tf.TensorShape([batch_size, None]),
    ],
    back_prop=False,
)
  • 循环控制:

    1. cond:始终为 True,循环到达 maximum_iterations 为止。
    2. body:调用上述 body 函数逐步生成序列。
    3. loop_vars:循环的变量,包括 pastprevoutput
    4. shape_invariants:指定循环变量的形状变化规则,支持动态长度。

7. 返回生成的序列

1
return tokens
  • 最终返回生成的完整序列 tokens

工作流程总结

  1. 初始化上下文(contextstart_token)。

  2. 1
    
    tf.while_loop
    

    中:

    • 逐步调用模型生成下一个 token。
    • 使用采样策略(Top-k 或 Top-p)筛选候选词。
    • 更新历史上下文和生成序列。
  3. 循环执行至生成长度达到指定值,返回生成的 token 序列。


应用场景

  1. 文本生成:
    • GPT 系列模型中的文本补全、对话生成等。
  2. 定制采样策略:
    • 根据任务调整 temperaturetop_ktop_p 参数,生成风格化或更合理的文本。

如果有具体问题或想运行代码测试,我可以协助进一步分析!

generate_unconditional_samples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!/usr/bin/env python3

import fire
import json
import os
import numpy as np
import tensorflow as tf

import model, sample, encoder

def sample_model(
    model_name='124M',
    seed=None,
    nsamples=0,
    batch_size=1,
    length=None,
    temperature=1,
    top_k=0,
    top_p=1,
    models_dir='models',
):
    """
    Run the sample_model
    :model_name=124M : String, which model to use
    :seed=None : Integer seed for random number generators, fix seed to
     reproduce results
    :nsamples=0 : Number of samples to return, if 0, continues to
     generate samples indefinately.
    :batch_size=1 : Number of batches (only affects speed/memory).
    :length=None : Number of tokens in generated text, if None (default), is
     determined by model hyperparameters
    :temperature=1 : Float value controlling randomness in boltzmann
     distribution. Lower temperature results in less random completions. As the
     temperature approaches zero, the model will become deterministic and
     repetitive. Higher temperature results in more random completions.
    :top_k=0 : Integer value controlling diversity. 1 means only 1 word is
     considered for each step (token), resulting in deterministic completions,
     while 40 means 40 words are considered at each step. 0 (default) is a
     special setting meaning no restrictions. 40 generally is a good value.
     :models_dir : path to parent folder containing model subfolders
     (i.e. contains the <model_name> folder)
    """
    models_dir = os.path.expanduser(os.path.expandvars(models_dir))
    enc = encoder.get_encoder(model_name, models_dir)
    hparams = model.default_hparams()
    with open(os.path.join(models_dir, model_name, 'hparams.json')) as f:
        hparams.override_from_dict(json.load(f))

    if length is None:
        length = hparams.n_ctx
    elif length > hparams.n_ctx:
        raise ValueError("Can't get samples longer than window size: %s" % hparams.n_ctx)

    with tf.Session(graph=tf.Graph()) as sess:
        np.random.seed(seed)
        tf.set_random_seed(seed)

        output = sample.sample_sequence(
            hparams=hparams, length=length,
            start_token=enc.encoder['<|endoftext|>'],
            batch_size=batch_size,
            temperature=temperature, top_k=top_k, top_p=top_p
        )[:, 1:]

        saver = tf.train.Saver()
        ckpt = tf.train.latest_checkpoint(os.path.join(models_dir, model_name))
        saver.restore(sess, ckpt)

        generated = 0
        while nsamples == 0 or generated < nsamples:
            out = sess.run(output)
            for i in range(batch_size):
                generated += batch_size
                text = enc.decode(out[i])
                print("=" * 40 + " SAMPLE " + str(generated) + " " + "=" * 40)
                print(text)

if __name__ == '__main__':
    fire.Fire(sample_model)

这段代码是一个用于生成文本样本的 Python 脚本,基于 TensorFlow 实现,使用了 GPT 模型(例如 GPT-2)。以下是对代码的逐步解释:


1. 头部代码

1
#!/usr/bin/env python3

这是一个 shebang,用于指定运行脚本的 Python 解释器。


2. 引入依赖库

1
2
3
4
5
6
7
import fire
import json
import os
import numpy as np
import tensorflow as tf

import model, sample, encoder
  • fire: 用于解析命令行参数,可以轻松地将函数暴露为命令行工具。
  • jsonos: 用于读取和处理模型的配置文件和路径。
  • numpy: 用于随机数种子设定。
  • tensorflow: 用于加载和运行 TensorFlow 模型。
  • model, sample, encoder: 自定义模块,分别包含模型定义、采样逻辑和文本编码器。

3. sample_model 函数

这是生成文本的核心函数,支持多种参数控制生成行为。

参数说明

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def sample_model(
    model_name='124M',
    seed=None,
    nsamples=0,
    batch_size=1,
    length=None,
    temperature=1,
    top_k=0,
    top_p=1,
    models_dir='models',
):
  • model_name: 模型的名称,例如 '124M'
  • seed: 随机数种子,固定种子以获得可重复结果。
  • nsamples: 要生成的样本数量。如果为 0,则持续生成直到中断。
  • batch_size: 每批次生成的样本数量。
  • length: 生成的文本长度(以 token 为单位)。如果不指定,则默认使用模型的最大长度。
  • temperature: 控制生成文本的随机性。较低值会更确定性,较高值会更随机。
  • top_k: 限制生成时的候选 token 数量。0 表示不限制。
  • top_p: 控制 nucleus sampling(核采样)的参数,限制生成时累计概率的阈值。
  • models_dir: 模型文件所在的根目录。

主要逻辑

  1. 加载模型和超参数
1
2
3
4
5
models_dir = os.path.expanduser(os.path.expandvars(models_dir))
enc = encoder.get_encoder(model_name, models_dir)
hparams = model.default_hparams()
with open(os.path.join(models_dir, model_name, 'hparams.json')) as f:
    hparams.override_from_dict(json.load(f))
  • 解析 models_dir 路径,加载编码器和模型的超参数。
  • 超参数(如上下文长度 n_ctx)存储在 hparams.json 文件中。
  1. 验证文本长度
1
2
3
4
if length is None:
    length = hparams.n_ctx
elif length > hparams.n_ctx:
    raise ValueError("Can't get samples longer than window size: %s" % hparams.n_ctx)
  • 如果未指定长度,则使用模型的默认长度(即上下文窗口的大小)。
  • 如果指定的长度超过模型支持的窗口大小,则抛出错误。
  1. 创建 TensorFlow 会话
1
2
3
with tf.Session(graph=tf.Graph()) as sess:
    np.random.seed(seed)
    tf.set_random_seed(seed)
  • 在 TensorFlow 图中运行会话。
  • 设置随机种子以确保生成结果的可重复性。
  1. 定义采样过程
1
2
3
4
5
6
output = sample.sample_sequence(
    hparams=hparams, length=length,
    start_token=enc.encoder['<|endoftext|>'],
    batch_size=batch_size,
    temperature=temperature, top_k=top_k, top_p=top_p
)[:, 1:]
  • 调用 sample_sequence 方法,基于指定参数生成文本序列。
  • 起始 token 为 '<|endoftext|>',用于指示生成无条件文本。
  1. 加载模型权重
1
2
3
saver = tf.train.Saver()
ckpt = tf.train.latest_checkpoint(os.path.join(models_dir, model_name))
saver.restore(sess, ckpt)
  • 使用 TensorFlow 的 Saver 对象加载模型的权重文件。
  • 自动寻找最新的检查点文件(.ckpt)。
  1. 生成文本样本
1
2
3
4
5
6
7
8
generated = 0
while nsamples == 0 or generated < nsamples:
    out = sess.run(output)
    for i in range(batch_size):
        generated += batch_size
        text = enc.decode(out[i])
        print("=" * 40 + " SAMPLE " + str(generated) + " " + "=" * 40)
        print(text)
  • 通过 sess.run(output) 执行采样操作。
  • 使用编码器的 decode 方法将生成的 token 序列转换为人类可读的文本。
  • 打印生成的样本。

4. 主函数

1
2
if __name__ == '__main__':
    fire.Fire(sample_model)
  • 使用 fire.Firesample_model 函数暴露为命令行接口,允许通过命令行传递参数调用。

运行方式

假设文件名为 generate_samples.py,在命令行中可以运行:

1
python generate_samples.py --model_name 124M --nsamples 5 --length 50 --temperature 0.7 --top_k 40

生成 5 个长度为 50 的样本,使用 124M 模型,采样参数为 temperature=0.7top_k=40


核心模块

  1. model: 定义 GPT 模型的架构。
  2. sample: 实现采样逻辑。
  3. encoder: 实现文本编码(字符到 token 的映射)和解码(token 到文本的映射)。

总结

这段代码是一个典型的无条件文本生成脚本,允许用户通过调整参数生成高质量的自然语言文本。它展示了如何加载预训练模型、配置超参数并执行采样操作。

“shebang” 是一个英语词,它的中文意思是指脚本文件开头的那一行 #!/usr/bin/env python3,也叫作**“解释器指令”“魔法行”**。

具体含义是:

  • #!:表示这是一个特殊的指令行,告诉操作系统用哪个程序来运行这个脚本。
  • /usr/bin/env python3:表示使用环境中找到的 python3 解释器来执行这段代码。

简而言之,这行代码的作用是:让脚本在 Linux 或 macOS 系统中可以直接运行,而无需手动调用 Python。

比如,如果一个文件叫 script.py,包含这一行,你可以直接运行:

1
./script.py

而不需要输入:

1
python3 script.py

interactive_conditional_samples

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/env python3

import fire
import json
import os
import numpy as np
import tensorflow as tf

import model, sample, encoder

def interact_model(
    model_name='124M',
    seed=None,
    nsamples=1,
    batch_size=1,
    length=None,
    temperature=1,
    top_k=0,
    top_p=1,
    models_dir='models',
):
    """
    Interactively run the model
    :model_name=124M : String, which model to use
    :seed=None : Integer seed for random number generators, fix seed to reproduce
     results
    :nsamples=1 : Number of samples to return total
    :batch_size=1 : Number of batches (only affects speed/memory).  Must divide nsamples.
    :length=None : Number of tokens in generated text, if None (default), is
     determined by model hyperparameters
    :temperature=1 : Float value controlling randomness in boltzmann
     distribution. Lower temperature results in less random completions. As the
     temperature approaches zero, the model will become deterministic and
     repetitive. Higher temperature results in more random completions.
    :top_k=0 : Integer value controlling diversity. 1 means only 1 word is
     considered for each step (token), resulting in deterministic completions,
     while 40 means 40 words are considered at each step. 0 (default) is a
     special setting meaning no restrictions. 40 generally is a good value.
     :models_dir : path to parent folder containing model subfolders
     (i.e. contains the <model_name> folder)
    """
    models_dir = os.path.expanduser(os.path.expandvars(models_dir))
    if batch_size is None:
        batch_size = 1
    assert nsamples % batch_size == 0

    enc = encoder.get_encoder(model_name, models_dir)
    hparams = model.default_hparams()
    with open(os.path.join(models_dir, model_name, 'hparams.json')) as f:
        hparams.override_from_dict(json.load(f))

    if length is None:
        length = hparams.n_ctx // 2
    elif length > hparams.n_ctx:
        raise ValueError("Can't get samples longer than window size: %s" % hparams.n_ctx)

    with tf.Session(graph=tf.Graph()) as sess:
        context = tf.placeholder(tf.int32, [batch_size, None])
        np.random.seed(seed)
        tf.set_random_seed(seed)
        output = sample.sample_sequence(
            hparams=hparams, length=length,
            context=context,
            batch_size=batch_size,
            temperature=temperature, top_k=top_k, top_p=top_p
        )

        saver = tf.train.Saver()
        ckpt = tf.train.latest_checkpoint(os.path.join(models_dir, model_name))
        saver.restore(sess, ckpt)

        while True:
            raw_text = input("Model prompt >>> ")
            while not raw_text:
                print('Prompt should not be empty!')
                raw_text = input("Model prompt >>> ")
            context_tokens = enc.encode(raw_text)
            generated = 0
            for _ in range(nsamples // batch_size):
                out = sess.run(output, feed_dict={
                    context: [context_tokens for _ in range(batch_size)]
                })[:, len(context_tokens):]
                for i in range(batch_size):
                    generated += 1
                    text = enc.decode(out[i])
                    print("=" * 40 + " SAMPLE " + str(generated) + " " + "=" * 40)
                    print(text)
            print("=" * 80)

if __name__ == '__main__':
    fire.Fire(interact_model)

这段代码是一个交互式脚本,允许用户使用预训练的 GPT 模型生成基于用户输入的文本。它提供了一种对话式的方式来与语言模型交互,以下是详细的逐步解析:

1. 头部部分

1
#!/usr/bin/env python3
  • 作用: 指定使用 python3 作为脚本的解释器,方便在类 UNIX 系统中直接运行脚本。
1
2
3
4
5
6
7
import fire
import json
import os
import numpy as np
import tensorflow as tf

import model, sample, encoder
  • fire: 将 Python 函数转化为命令行工具,用于解析参数。
  • jsonos: 用于读取模型配置文件和处理路径。
  • numpy: 用于随机数种子设定。
  • tensorflow: 用于加载和运行模型。
  • model, sample, encoder: 自定义模块,分别包含 GPT 模型定义、采样逻辑和编码器。

2. interact_model 函数

该函数是脚本的核心,用于加载模型、处理用户输入并生成文本。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
def interact_model(
    model_name='124M',
    seed=None,
    nsamples=1,
    batch_size=1,
    length=None,
    temperature=1,
    top_k=0,
    top_p=1,
    models_dir='models',
):

参数说明

  • model_name: 模型的名称,例如 '124M'
  • seed: 随机数种子,固定后可重复生成相同的文本。
  • nsamples: 要生成的样本总数。
  • batch_size: 每次批量生成的样本数,nsamples 必须是 batch_size 的倍数。
  • length: 每个生成样本的 token 数。如果不指定,默认值为上下文窗口长度的一半。
  • temperature: 控制生成文本的随机性,值越高生成结果越多样化。
  • top_k: 限制每步生成时候选 token 的数量,0 表示不限制。
  • top_p: 控制 nucleus sampling(核采样)的累积概率阈值。
  • models_dir: 模型存储目录。

3. 核心逻辑

  1. 模型和超参数加载
1
2
3
4
5
6
7
8
9
models_dir = os.path.expanduser(os.path.expandvars(models_dir))
if batch_size is None:
    batch_size = 1
assert nsamples % batch_size == 0

enc = encoder.get_encoder(model_name, models_dir)
hparams = model.default_hparams()
with open(os.path.join(models_dir, model_name, 'hparams.json')) as f:
    hparams.override_from_dict(json.load(f))
  • 解析模型目录路径并加载编码器。
  • 加载模型的超参数(例如上下文窗口大小 n_ctx)。

  1. 验证和设置生成长度
1
2
3
4
if length is None:
    length = hparams.n_ctx // 2
elif length > hparams.n_ctx:
    raise ValueError("Can't get samples longer than window size: %s" % hparams.n_ctx)
  • 如果未指定生成文本的长度,默认使用模型上下文长度的一半。
  • 如果指定长度超出上下文窗口大小,则抛出错误。

  1. 创建 TensorFlow 会话
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
with tf.Session(graph=tf.Graph()) as sess:
    context = tf.placeholder(tf.int32, [batch_size, None])
    np.random.seed(seed)
    tf.set_random_seed(seed)
    output = sample.sample_sequence(
        hparams=hparams, length=length,
        context=context,
        batch_size=batch_size,
        temperature=temperature, top_k=top_k, top_p=top_p
    )
  • 创建一个新的 TensorFlow 图和会话。
  • 定义占位符 context,表示用户输入的 token。
  • 调用 sample_sequence 函数,根据上下文生成序列。

  1. 加载模型权重
1
2
3
saver = tf.train.Saver()
ckpt = tf.train.latest_checkpoint(os.path.join(models_dir, model_name))
saver.restore(sess, ckpt)
  • 使用 Saver 对象加载模型权重。
  • 自动找到最新的检查点文件(.ckpt)。

  1. 交互式输入与文本生成
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
while True:
    raw_text = input("Model prompt >>> ")
    while not raw_text:
        print('Prompt should not be empty!')
        raw_text = input("Model prompt >>> ")
    context_tokens = enc.encode(raw_text)
    generated = 0
    for _ in range(nsamples // batch_size):
        out = sess.run(output, feed_dict={
            context: [context_tokens for _ in range(batch_size)]
        })[:, len(context_tokens):]
        for i in range(batch_size):
            generated += 1
            text = enc.decode(out[i])
            print("=" * 40 + " SAMPLE " + str(generated) + " " + "=" * 40)
            print(text)
    print("=" * 80)
  • 输入部分:
    • 用户输入 raw_text(模型的上下文或提示词)。
    • 输入不能为空。
  • 文本生成部分:
    • 将输入的文本编码成 token,存储在 context_tokens 中。
    • 使用 sess.run(output) 调用模型生成 token 序列,跳过输入的 token,仅解码生成的部分。
    • 使用编码器解码 token 序列,转换成可读文本。
  • 输出部分:
    • 每生成一个样本,会用分隔线显示样本编号和内容。

6. 主函数部分

1
2
if __name__ == '__main__':
    fire.Fire(interact_model)
  • 使用 fireinteract_model 函数转化为命令行工具。

运行方式

假设脚本名为 interact.py,可以运行:

1
python interact.py --model_name 124M --temperature 0.8 --top_k 40

然后,输入提示词:

1
Model prompt >>> Once upon a time,

模型会根据提示生成若干样本。


总结

这段代码实现了一个交互式的文本生成工具,基于 GPT 模型。用户提供输入提示,模型生成后续文本。通过调整 temperaturetop_ktop_p 等参数,可以控制生成结果的随机性和多样性。

Licensed under CC BY-NC-SA 4.0
使用 Hugo 构建
主题 StackJimmy 设计