介绍

word2vec中的logistic function计算使用的是快速算法,初始先算好值,然后在神经网络训练迭代中直接查表加快计算和训练速度。设计也还算精巧。

代码

在初始化时,有代码片段一,会对expTable做专门的初始化:

#define EXP_TABLE_SIZE 1000
#define MAX_EXP 6



// step 4: 分配logistic查表.
expTable = (real *)malloc((EXP_TABLE_SIZE + 1) * sizeof(real));
  
// 初始化: 预先计算好指数运算表. 
for (i = 0; i < EXP_TABLE_SIZE; i++) {
    expTable[i] = exp((i / (real)EXP_TABLE_SIZE * 2 - 1) * MAX_EXP); // Precompute the exp() table
    expTable[i] = expTable[i] / (expTable[i] + 1);                   // Precompute f(x) = x / (x + 1)
  }

expTable数据的大小为1000,存了1000个值用于查表。

而在模型训练阶段,代码片段二:

// 前向传播: f=∑ neu1*syn1
// Propagate hidden -> output
for (c = 0; c < layer1_size; c++) {
  f += neu1[c] * syn1[c + l2];
}

// 范围判断,查表
if (f <= -MAX_EXP) 
  continue;
else if (f >= MAX_EXP) 
  continue;
else 
  f = expTable[(int)((f + MAX_EXP) * (EXP_TABLE_SIZE / MAX_EXP / 2))];

将代码片段二代入到代码片段一,我们可以发现很神奇的现象:

将上式简单做个替换,把f替成z,整体输入看成i:

而再做一次运算,即知该表的值刚好与 logit函数 y = 1/(1+e^-z) 相同。

为什么要这么设计呢?

通过查表法,当然是为了加快指数运算。如果在每次迭代时,都去做指数运算,cpu开销蛮大。这样的设计,可以优化训练时间。

我来来看前前一段代码中,expTable[0 … EXP_TABLE_SIZE]中具体对应的值是哪些?这里,我提供了它的python实现:代码下载 ,绘制出实际的取值曲线(logistic regression的某一段取值),详见上面链接提供的代码:

另外,还有一点,就是这样的分割:EXP_TABLE_SIZE=1000, MAX_EXP=6? 把数字代入,即知:

\[e_{raw}=e^{(\frac{2*i}{1000}-1)*6}\] \[logit=\frac{e_{raw}}{e_{raw}+1}\] \[i=\frac{(f+6)*(1000)}{6*2}\]

为什么要做这样的trick进行分解呢?上图展示的是从输入z的角度去理解。要真正理解为什么这样取,还要结合实际代码中Hierarchical softmax代码里的情况去理解。

里面的f输入范围在(-MAX_EXP, MAX_EXP), 即(-6, 6)。

第一阶段:(f + MAX_EXP)/MAX_EXP/2 * EXP_TABLE_SIZE 所做的操作:

  • f + MAX_EXP: 平移,(0, 12)
  • (f + MAX_EXP)/MAX_EXP: 压缩,(0,2)
  • (f + MAX_EXP)/MAX_EXP/2: 归一化,(0,1)
  • (f + MAX_EXP)/MAX_EXP/2 * EXP_TABLE_SIZE: 即将归一化后的f映射到[0,1000)个expTable的格子里,即是第一式的输入i

第二阶段:(i / (real)EXP_TABLE_SIZE * 2 - 1) * MAX_EXP 所做的操作:

  • i / EXP_TABLE_SIZE: 将上面第一阶段的i,重新压缩到(0,1)
  • i / (real)EXP_TABLE_SIZE * 2: 拉伸成(0,2)
  • i / (real)EXP_TABLE_SIZE * 2 - 1: 平移为(-1,1)
  • (i / (real)EXP_TABLE_SIZE * 2 - 1) * MAX_EXP: 放大为(-6,6)

ok,这时候,我们就会明白,为啥取(-6,6)了。因为:

  • logit(6)=0.997527376843
  • logit(-6)=0.00247262315663

这个范围内的值足够了。

但如果你细发现,两步合在一起,我们发现仿佛又回到了原点,于是可能开始会怀疑,为什么不干脆直接用f,省去中间这么多的变换,直接原封不动地输入expTable呢?

比如类似这样的实现:

def logit(x):
    e = math.exp(x)
    return e/(e+1)

我们可以推导下,如果让你设计一个需要1000个值的数组,存储logit函数值。假设数组名为expTable,即:

  • 输入x为(0,1000)
  • expTable[x]的输出值对应概率值(0,1)

由于logit(0)=0.5,常用的一个实现是:

\[y=\frac{1}{1+e^{-(x*2-1)}}\]

从0开始刚好为0,输入(0,+inf),输出(0,1)。如果想让x刚好对应索引,刚在原基础上除以1000即可。即:

\[y=\frac{1}{1+e^{-(x*2/1000-1)}}\]

再在原基础上做一下范围控制,就是我们上面的:

\[y=\frac{1}{1+e^{-(x*2/1000-1)*6}}\]

如果希望得到更精准的值,将EXP_TABLE_SIZE和MAX_EXP调大些即可以。

介绍

Mikolov在Distributed Representations of Words and Phrases and their Compositionality中,提到高频词的subsampling问题,以下我对相关节选进行的简单中文翻译:

在非常大的语料中,最高频的词汇,可能出现上百万次(比如:in, the, a这些词)。这样的词汇通常比其它罕见词提供了更少的信息量。例如,当skip-gram模型,通过观察”France”和”Paris”的共现率(co-occurrence),Skip-gram模型可以从中获益;而”France”和”the”的共现率则对模型贡献很少;而几乎每个词都常在句子中与”the”共同出现过。该思路也常用在相反的方向上;高频词的向量表示,在上百万样本训练完后不会出现大变化。

为了度量这种罕见词与高频词间存在不平衡现象,我们使用一个简单的subsampling方法:训练集中的每个词\(w_i\),以下面公式计算得到的概率进行抛弃:

\[P(w_i)=1-\sqrt{\frac{t}{f(w_i)}}\]

\(f(w_i)\)是\(w_i\)的词频,t为选中的一个阀值,通常为\(10^{-5}\)周围(1e-5=0.00001)。我们之所以选择该subsampling公式,是因为:它可以很大胆的对那些词频大于t的词进行subsampling,并保留词频的排序(ranking of the frequencies)。尽管subsampling公式的选择是拍脑袋出来的(启发式的heuristically),我们发现它在实践中很有效。它加速了学习,并极大改善了罕见词的学习向量的准确率(accuracy)。

具体实现

在当中的描述是:

\[P(w_i)=1-(\sqrt{\frac{sample}{freq(w_i)}}+\frac{sample}{freq(w_i)})\]

实际代码如下:

// 进行subsampling,随机丢弃常见词,保持相同的频率排序ranking.
// The subsampling randomly discards frequent words while keeping the ranking same
if (sample > 0) {

  // 计算相应的抛弃概率ran.
  real ran = (sqrt(vocab[word].cn / (sample * train_words)) + 1) * (sample * train_words) / vocab[word].cn;

  // 生成一个随机数next_random.
  next_random = next_random * (unsigned long long)25214903917 + 11;

  // 如果random/65535 - ran > 0, 则抛弃该词,继续
  if (ran < (next_random & 0xFFFF) / (real)65536) 
      continue;
}

为此,我简单地写了一个python程序,做了个测试。程序托管在github上,点击下载

词频越大,ran值就越小。subsampling进行抽样时被抽到的概率就越低。

1.png

参考:

Distributed Representations of Words and Phrases and their Compositionality

介绍

如果你在大学期间学过信息论或数据压缩相关课程,一定会了解Huffman Tree。建议首先在wikipedia上重新温习下Huffman编码与Huffman Tree的基本概念: Huffman编码wiki

简单的说(对于文本中的字母或词),Huffman编码会将出现频率较高(也可认为是:权重较大)的字(或词)编码成短码,而将罕见的字(或词)编码成长码。对比长度一致的编码,能大幅提升压缩比例。

而Huffman树指的就是这种带权路径长度最短的二叉树。权指的就是权重W(比如上面的词频);路径指的是:从根节点到叶子节点的路径长度L;带权路径指的是:树中所有的叶结点的权值乘上其到根结点的路径长度。WPL=∑W*L,它是最小的。

word2vec的Huffman-Tree实现

为便于word2vec的Huffman-Tree实现,我已经将它单独剥离出来,具体代码托管到github上: huffman_tree代码下载。示例采用的是wikipedia上的字母:

即:F:2, O:3, R:4, G:4, E:5, T:7

这里有两个注意事项:

  • 1.对于单个节点分枝的编码,wikipedia上的1/0分配是定死的:即左为0,右为1(但其实分左右是相对的,左可以调到右,右也可以调到左)。而word2vec采用的方法是,两侧中哪一侧的值较大则为1,值较小的为0。当然你可以认为(1为右,0为左)。
  • 2.word2vec会对词汇表中的词汇预先从大到小排好序,然后再去创建Huffman树。在CreateBinaryTree()调用后,会生成最优的带权路径最优的Huffman-Tree。

最终生成的图如下:

此图中,我故意将右边的T和RG父结节调了下左右,这样可以跳出上面的误区(即左为0,右为1。其实是按大小来判断0/1)

相应的数据结构为:

/**
 * word与Huffman树编码
 */
struct vocab_word {
  long long cn;     // 词在训练集中的词频率
  int *point;       // 编码的节点路径
  char *word,       // 词
       *code,       // Huffman编码,0、1串
       codelen;     // Huffman编码长度
};

最后,上面链接代码得到的结果:

1
2
3
4
5
6
word=T	cn=7	codelen=2	code=10	point=4-3-
word=E	cn=5	codelen=2	code=01	point=4-2-
word=G	cn=4	codelen=3	code=111	point=4-3-1-
word=R	cn=4	codelen=3	code=110	point=4-3-1-
word=O	cn=3	codelen=3	code=001	point=4-2-0-
word=F	cn=2	codelen=3	code=000	point=4-2-0-

整个计算过程设计的比较精巧。使用了三个数组:count[]/binary[]/parent_node[],这三个数组构成相应的Huffman二叉树。有vocab_size个叶子节点。最坏情况下,每个节点下都有一个叶子节点,二叉树的总节点数为vocab_size * 2 - 1就够用了。代码使用的是 vocab_size * 2 + 1。

当然,如果你有兴趣关注下整棵树的构建过程的话,也可以留意下这部分输出:

count[]:	7 5 4 4 3 2 1000000000000000 1000000000000000 1000000000000000 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 0 0 0 0 0 0 0 0 0 0 0
parent[]:	0 0 0 0 0 0 0 0 0 0 0 0
	
--step 1:
count[]:	7 5 4 4 3 2 5 1000000000000000 1000000000000000 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 0 0 0 1 0 0 0 0 0 0 0
parent[]:	0 0 0 0 6 6 0 0 0 0 0 0
	
--step 2:
count[]:	7 5 4 4 3 2 5 8 1000000000000000 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 0 1 0 1 0 0 0 0 0 0 0
parent[]:	0 0 7 7 6 6 0 0 0 0 0 0
	
--step 3:
count[]:	7 5 4 4 3 2 5 8 10 1000000000000000 1000000000000000 1000000000000000
binary[]:	0 1 1 0 1 0 0 0 0 0 0 0
parent[]:	0 8 7 7 6 6 8 0 0 0 0 0
	
--step 4:
count[]:	7 5 4 4 3 2 5 8 10 15 1000000000000000 1000000000000000
binary[]:	0 1 1 0 1 0 0 1 0 0 0 0
parent[]:	9 8 7 7 6 6 8 9 0 0 0 0
	
--step 5:
count[]:	7 5 4 4 3 2 5 8 10 15 25 1000000000000000
binary[]:	0 1 1 0 1 0 0 1 0 1 0 0
parent[]:	9 8 7 7 6 6 8 9 10 10 0 0

参考

1.Huffman编码

Y Bengio在2003年发布了paper《A Neural Probabilistic Language Model》,即NNLM。我们来看一下这篇10多年前的paper中的主要内容:

1.介绍

语言建模和其它学习问题很难,因为存在一个基础问题:维度灾难(curse of dimensionality)。当你想建模许多离散随机变量间(比如:一个句子中的词,或者在一个数据挖掘任务中的离散变量)的联合分布时,这个现象特别明显。例如,如果你想建模自然语言中在一个size=10w的词汇表V的10个连续词的联合分布,潜在会有\(100000^{10}-1=10^{50}-1\)个自由参数(free parameters)。当建模连续变量时,我们可以更轻易地获得泛化(例如:使用多层神经网络或高斯混合模型的平滑分类函数),因为学到的函数可以具有一些局部平滑属性。对于离散空间,泛化结果并不明显:这些离散变量的任何变化,都可能对要估计的函数值具有一个剧烈的影响;当离散变量的值可取的数目很大时,大多数观察到的目标相互之间在hamming距离上差距很大

受非参数化密度估计的启发,对不同学习算法如何泛化进行可视化的一种有效方法是,认为将初始集中在training points(例如:训练句子)上的概率质量(probability mass)在一个更大的空间(volume)中分散是的(distributed)。在高维中,将概率质量(probability mass)进行分散(distribute)很重要,而非围绕在每个training point周围的所有方向上均匀。我们在本paper会展示提出的这种方法与state-of-art统计语言建模方法的不同。

统计语言模型可以通过下面的公式进行表示,给定所有之前出现的词,对下一个词的条件概率:

\[\hat{P}(W_1^T) = \prod\limits_{1}^{T} \hat{P}(w_t | w_t^{t-1})\]

其中,\(w_t\)是第t个词,子序列可以写成:\(w_i^j = (w_i, w_{i+1}, \cdots, w_{j-1}, w_j)\)。这样的统计语言模型在许多应用中很有用,比如:语音识别,语言翻译,信息检索。统计语言模型对这样的应用有极大影响。

当构建自然语言的统计模型时,可以通过使用词序(word order)来减小建模问题的难度,实际上在词序列中时序更接近的词在统计学上相互依赖更强。因而,对于一个大数目的上下文(比如:最近n-1个词的组合)中的每个词,可以近似使用n-gram模型来构建对于下一个词的条件概率表

\[\hat{P}(w_t | w_1^{t-1}) \approx \hat{P}(w_t | w_{t-n+1}^{t-1})\]

我们只考虑这些在训练语料中实际出现、或者发生足够频繁的连续词的组合。当n个词的一个新组合出现在训练语料中,会发生什么?我们不想分配零概率到这样cases中,因为这些新组合很可能会出现,而且他们可能对于更大的上下文size会出现的更频繁。一个简单的答案是,使用一个更小的上下文size来看下预测概率,比如:back-off trigram模型、或者smoothed trigram模型。在这样的模型中,从在训练语料中的词序列获取的的模型如何泛化到新的词序列上呢?一种方式是,给这样的插值或back-off ngram模型对应一个生成模型。本质上,一个新的词序列可以通过“粘合(gluning)”在训练语料中频繁出现的非常短或长度为1,2…,n的重叠块来生成。在特别的back-off或插值n-gram算法中,获得下一块(piece)的概率的规则是隐式的。通常,研究者们使用n=3(比如:trigrams),并获取state-of-art结果。。。。

1.1 使用分布式表示来解决维度灾难

简单的,提出的方法可以如下进行总结:

  • 1.将词表中的每个词与一个distributed word feature vector(在\(R^m中\)一个real-valued vector)进行关联。
  • 2.采用在序列中的词的feature vectors来表示词序列的联合概率函数.
  • 3.同时学习word vectors和概率函数的参数

feature vector可以表示词的不同方面:每个词与一个向量空间中的一个点(point)相关联。features的数目(比如:实验中采用m=30, 60或100)比词汇表的size要小很多。概率函数被表示成:给定之前的词,一个关于下一个词的条件概率乘积(例如:使用一个multi-layer NN来预测)。该函数具有这样的参数,它以最大化训练数据的log似然来进行迭代调参。feature vectors是通过学习得到的,但它们可以使用语义特征的先验知识进行初始化。

为什么会有效?在之前的示例中,如果我们知道,dog和cat扮演着相似的角色(语义上和结构上),对于(the,a), (bedroom, room), (is, was), (running, walking)是相似的,我们可以很自然地对下面的句子进行泛化:

The cat is walking in the bedroom

泛化成:

A dog was running in a room

或者:

The cat is running in a room

A dog is walking in a bedroom

The dog was walking in the room

以及许多其它组合。在我们提出的模型中,它是可以泛化的,因为“相似”的词被认为是具有一个相似的feature vector,因为概率函数是一个关于这些feature values的平滑函数,在特征上的微小变化在概率上只会引入很小的变化。因些,在训练数据中上述句子之一的出现将会增加概率,不仅仅是那些句子,而且包括在句子空间中它们的“邻居”的组合数目。

1.2 之前工作

略。

2.一个神经模型

训练集是关于\(w_t \in V\)的一个序列\(w_1, \cdots, w_T\),其中词汇表V是一个有限的大集合。学习目标是学习一个好的模型,使得:

\[f(w_t, \cdots, w_{t-n+1}) = \hat{P}(w_t \mid w_1^{t-1})\]

也就是说给出很高的out-of-sample likelihood。下面,我们会上报关于\(1/\hat{P}(w_t \mid w_1^{t-1})\)的几何平均,它被称为“困惑度(perplexity)”,它也是平均负log似然的指数。模型的唯一限制是,对于\(w_1^{t-1}\)的任意选择,\(\sum\limits_{i=1}^{\mid V\mid} f(i, w_{t-1}, \cdots, w_{t-n+1}) = 1\),其中\(f>0\)。通过将这些条件概率相乘,可以获得一个关于这些词序列的联合概率的模型。

我们将函数\(f(w_t, \cdots, w_{t-n+1})= \hat{P}(w_1^{t-1})\)解耦成两部分:

  • 1.一个映射函数C,它将V的任意元素i映射到一个真实向量\(C(i) \in R^m\)。它表示与词典中每个词相关的分布式特征向量(distributed feature vectors)。实际上,C被表示成一个关于自由参数的\(\mid V \mid \times m\)的矩阵。
  • 2.词上的概率函数,由C进行表示:对于在上下文中的词,\((C(w_{t-n+1}), \cdots, C(w_{t-1}))\),会使用一个函数g将一个关于feature vectors的输入序列映射到一个关于下一个词\(w_t \in V\)的条件概率分布上。g的输出是一个vector,它的第i个元素可以估计概率\(\hat{P}(w_t = i \mid w_1^{t-1})\),如图1所示。
\[f(i, w_{t-1}, \cdots, w_{t-n+1}) = g(i, C(w_{t-1}), \cdots, C(w_{t-n+1}))\]

1.png

图1: 神经网络结构:\(f(i, w_{t-1}, \cdots, w_{t-n+1}) = g(i, C(w_{t-1}), \cdots, C(W_{t-n+1}))\),其中g是神经网络,C(i)是第i个word feature vector。

函数f是这两个mappings(C和g)的一个组合,其中C对于在上下文中所有词是共享的(shared)。这两部分每一个都与一些参数有关。mapping C的参数就简单的是feature vectors自身,通过一个\(\mid V\mid \times m\)的矩阵C进行表示,其中第i行表示第i个词的feature vector C(i)。函数g通过一个feed-forward或RNN或其它参数化函数(parametrized function)来实现,它使用参数\(\mathcal{W}\)。

训练通过搜索\(\theta\)来完成,它会在训练语料上最大化penalized log-likelihood:

\[L = \frac{1}{T} \sum\limits_{t} log f(w_t, w_{t-1}, \cdots, w_{t-n+1}; \theta) + R(\theta)\]

其中\(R(\theta)\)是一个正则项。例如,在我们的实验中,R是一个权重衰减项(weight cecay penalty),它只应用于神经网络的weights和矩阵C上,而非应用在biases上。

在上述模型中,自由参数(free parameters)的数目与词汇表中词数目V成线性比例。它也只与阶数n成线性比例:如果介入更多共享结构,例如,使用一个time-decay神经网络或一个RNN(或者一个关于两种网络的组合),比例因子可以被减小到sub-linear。

在以下大多数实验上,除word features mapping外,神经网络具有一个hidden layer,可选的,还有从word features到output上的直接连接(direct connections)。因此,实际有两个hidden layers:

  • 1.shared word features layer C:它是线性的
  • 2.普通的双曲正切( hyperbolic tangent) hidden layer

更精准的,神经网络会计算如下的函数,它使用一个softmax output layer,它会保证正例概率(positive probabilities)求和为1:

\[\hat{P}(w_t | w_{t-1}, \cdots, w_{t-n+1}) = \frac{e^{y_{w_t}}}{\sum\limits_{i} e^{y_i}}\]

其中,\(y_i\)是对于每个output word i的未归一化log概率,它会使用参数b, W, U, d和H,具体计算如下:

\[y = b + W x + U tanh(d+Hx)\]

…(1)

其中,双曲正切tanh以element-by-element的方式进行应用,W可选为0(即没有直接连接),x是word features layer activation vector,它是从矩阵C的input word features的拼接(concatenation):

\[x = (C(w_{t-1}), C(W_{t-2}), \cdots, C(W_{t-n+1}))\]

假设h是hidden units的数目,m是与每个词相关的features的数目。当从word features到outputs上没有直接连接(direct connections)时,矩阵W被设置为0。该模型的自由参数是output biases b(具有\(\mid V \mid\)个元素),hidden layer biases d(具有h个元素),hidden-to-output weights U(一个\(\mid V \mid \times h\)的矩阵),word features到output weights W(一个\(\mid V\mid \times (n-1) m\)的矩阵),hidden layer weights H(一个 \(h \times (n-1) m\)的矩阵),word features C(一个\(\mid V\mid \times m\)的矩阵):

\[\theta = (b,d,W,U,H,C)\]

自由参数的数目是 \(\mid V\mid (1 + nm + h) + h(1 + (n-1) m)\)。主导因子是\(\mid V\mid (nm+h)\)。注意,在理论上,如果在weights W和H上存在一个weight decay(C上没有),那么W和H可以收敛到0,而C将会增长。实际上,当使用SGA(随机梯度上升)训练时,我们不会观察到这样的行为。

在神经网络上进行SGA包含,包含在执行以下的在训练语料的第t个词之后的迭代更新中:

\[\theta \leftarrow \theta + \epsilon \frac{\partial log \hat{P}(w_t | w_{t-1}, \cdots, w_{t-n+1})}{\partial \theta}\]

其中,\(\epsilon\)是learning rate。注意,大部分参数不需要更新、或者在每个样本之后训练:\(C(j)\)的word features,不会出现在input window中。

混合模型。在我们的实验中,我们已经发现,通过将神经网络的概率预测与一个interpolated trigram model相组合,可以提升效果,使用一个固定的权重0.5,一个可学习权重(在验证集上达到最大似然),或者一个weights集合,是基于在context的频率为条件的(使用相似的过程:在interpolated trigram上组合trigram, bigram, and unigram)。

其它

略.

参考:

http://www.jmlr.org/papers/volume3/bengio03a/bengio03a.pdf

mllib中lda源码分析(一)中,我们描述了LDA的原理、以及变分推断batch算法和online算法的推导。这一节会再描述下spark MLlib中的ml实现。

spark MLlib的实现,是基于变分推断算法实现的,后续的版本会增加Gibbs sampling。本文主要关注online版本的lda方法。(代码主要以1.6.1为主)

一、Batch算法

二、Online算法

ml中的lda相当于一层wrapper,更好地适配ml中的架构(tranformer/pipeline等等),调用的实际上是mllib中的lda实现。

1.LDA.run(主函数)

  def run(documents: RDD[(Long, Vector)]): LDAModel = {
    // 初始化(em=>em, online=>online).
    val state = ldaOptimizer.initialize(documents, this)

    // 迭代开始. 
    var iter = 0
    val iterationTimes = Array.fill[Double](maxIterations)(0)
    while (iter < maxIterations) {
      val start = System.nanoTime()
      
      // optimzier对应的迭代函数.
      state.next()
      
      val elapsedSeconds = (System.nanoTime() - start) / 1e9
      iterationTimes(iter) = elapsedSeconds
      iter += 1
    }

    // 获取最终完整的的LDAModel. 
    state.getLDAModel(iterationTimes)
  }

OnlineOptimizer.next

  override private[clustering] def next(): OnlineLDAOptimizer = {
    // 先进行sample
    val batch = docs.sample(withReplacement = sampleWithReplacement, miniBatchFraction,
      randomGenerator.nextLong())
    
    // 如果为空,返回
    if (batch.isEmpty()) return this

    // 提交batch,进行lda迭代.
    submitMiniBatch(batch)
  }

submitMinibatch

/**
   * 提供语料的minibatch给online LDA模型.为出现在minibatch中的terms它会自适应更新topic distribution。
   * 
   * Submit a subset (like 1%, decide by the miniBatchFraction) of the corpus to the Online LDA
   * model, and it will update the topic distribution adaptively for the terms appearing in the
   * subset.
   */
  private[clustering] def submitMiniBatch(batch: RDD[(Long, Vector)]): OnlineLDAOptimizer = {
    iteration += 1
    val k = this.k
    val vocabSize = this.vocabSize
    val expElogbeta = exp(LDAUtils.dirichletExpectation(lambda)).t      // β ~ Dirichlet(λ)
    val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)       // 
    val alpha = this.alpha.toBreeze
    val gammaShape = this.gammaShape

    // step 1: 对每个partition做map,单独计算E-Step. stats(stat, gammaPart)
    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>

      // 1.过滤掉空的.
      val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)

      // 2.stat 是一个DenseMatrix:  k x vocabSize
      val stat = BDM.zeros[Double](k, vocabSize)

      // 
      var gammaPart = List[BDV[Double]]()

      // 3.遍历partition上的所有document:执行EM算法.
      // E-step可以并行计算.
      // M-step需要reduce,作合并,然后再计算.
      nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
        // 3.1  取出document所对应的词的id。(支持dense/sparse).
        val ids: List[Int] = termCounts match {
          case v: DenseVector => (0 until v.size).toList
          case v: SparseVector => v.indices.toList
        }

        // 3.2 更新状态 E-step => gammad ().
        val (gammad, sstats) = OnlineLDAOptimizer.variationalTopicInference(
          termCounts, expElogbetaBc.value, alpha, gammaShape, k)

        // 3.3 根据所对应id取出对应列. 更新sstats(对应主题状态) 
        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats

        // 3.4 更新该partition上每个文档的gammad的gammaPart列表中.
        gammaPart = gammad :: gammaPart
      }

      // 4.mapPartition返回iterator,每个partition返回一个迭代器(stat, gammaPart)
      // stat: k x V matrix. 针对该partition上的文档,所更新出的每个词在各主题上的分布.
      // gammaPart: list[vector[K]] 该分区上每个文档的gammad列表.
      Iterator((stat, gammaPart))
    }

    // step 2: 对mini-batch的所有partition上的stats(stat,gammaPart)中的stat进行求总和.
    val statsSum: BDM[Double] = stats.map(_._1).reduce(_ += _)
    expElogbetaBc.unpersist()

    // step 3: 对mini-batch的所有partition上的stats(stat,gammaPart)中的gammaPart进行更新.
    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
      stats.map(_._2).reduce(_ ++ _).map(_.toDenseMatrix): _*)

    // step 4: 更新batchResult ( K x V), 每个元素上乘以 E[log(β)]
    val batchResult = statsSum :* expElogbeta.t

    // M-step:
    // 更新λ.
    // Note that this is an optimization to avoid batch.count
    updateLambda(batchResult, (miniBatchFraction * corpusSize).ceil.toInt)

    // 如果需要优化DocConentration,是否更新alpha
    if (optimizeDocConcentration) updateAlpha(gammat)
    this
  }

variationalTopicInference

里面比较核心的一个方法是variationalTopicInference:

  private[clustering] def variationalTopicInference(
      termCounts: Vector,
      expElogbeta: BDM[Double],
      alpha: breeze.linalg.Vector[Double],
      gammaShape: Double,
      k: Int): (BDV[Double], BDM[Double]) = {

    // step 1: 词的id和cnt: (ids, cnts) 
    val (ids: List[Int], cts: Array[Double]) = termCounts match {
      case v: DenseVector => ((0 until v.size).toList, v.values)
      case v: SparseVector => (v.indices.toList, v.values)
    }

    // step 2: 初始化: gammad ~ Γ(100, 0.01), 100维
    // gammd: mini-batch的变分分布: q(θ | γ) 
    // expElogthetad: paper中的exp(E[logθ]), 其中θ ~ Dirichlet(γ)
    // expElogbetad:  paper中的exp(E[logβ]), 其中β : V * K
    // phiNorm:  ϕ ∝ exp{E[logθ] + E[logβ]},ϕ ~ θ*β. 非零.
    // Initialize the variational distribution q(theta|gamma) for the mini-batch
    val gammad: BDV[Double] =
      new Gamma(gammaShape, 1.0 / gammaShape).samplesVector(k)                   // K
    val expElogthetad: BDV[Double] = exp(LDAUtils.dirichletExpectation(gammad))  // K
    val expElogbetad = expElogbeta(ids, ::).toDenseMatrix                        // ids * K

    // 加上一个很小的数,让它非零.
    val phiNorm: BDV[Double] = expElogbetad * expElogthetad :+ 1e-100            // ids
    var meanGammaChange = 1D
    val ctsVector = new BDV[Double](cts)                                         // ids

    // 单个mini-batch里的loop.
    // 迭代,直到 β 和 ϕ 收敛. paper中是0.000001,此处用的是1e-3.
    // Iterate between gamma and phi until convergence
    while (meanGammaChange > 1e-3) {
      val lastgamma = gammad.copy

      // breeze操作:https://github.com/scalanlp/breeze/wiki/Universal-Functions
      // ":"为elementwise操作符;其中的:=,对象相同,内容赋值
      // 计算:γ_dk, 先对ctsVector进行归一化,再乘以转置E(log(β)]^T,再pointwise乘E(log(θ)).
      // 各矩阵或向量的维度: K为topic, ids:为词汇表V
      // gammad: Vector(K),单个文档上每个主题各有一γ值.
      // expElogthetad: Vector(K),单个文档上每个主题各有一θ值.
      // expElogbetad: Matrix(ids*K),每个词,每个主题都有一β值.
      // ctsVector: Vector(ids),每个词上有一个词频.
      // phiNorm: Vector(ids),每个词上有一个ϕ分布值。
      // 
      // 
      //        K                  K * ids               ids
      gammad := (expElogthetad :* (expElogbetad.t * (ctsVector :/ phiNorm))) :+ alpha
      
      // 更新 exp(E[logθ]), expElogbetad不需要更新
      expElogthetad := exp(LDAUtils.dirichletExpectation(gammad))
      
      // 更新 phiNorm, 
      // TODO: Keep more values in log space, and only exponentiate when needed.
      phiNorm := expElogbetad * expElogthetad :+ 1e-100

      // 平均变化量.
      meanGammaChange = sum(abs(gammad - lastgamma)) / k
    }

    // sstats: mini-batch下每个主题下的词分布.
    // n 
    // sstatsd: k x 1 * 1 x ids => k x ids
    val sstatsd = expElogthetad.asDenseMatrix.t * (ctsVector :/ phiNorm).asDenseMatrix
    (gammad, sstatsd)
  }

ok。关于alpha的更新等,此处不再解释。详见源码。