本文主要是结合Jinhui Yuan等人在LightLDA的paper的理解。对于Gibbs sampling可以参见PRML第11章。
1.介绍
主题模型(TM: topic models)使用广泛,许多公司开发了大规模的LDA工具包实现,以适应海量的语料。互联网级别的语料更复杂,为捕获长尾的语义信息(否则将丢失这些主题信息),需要大容量(high-capacity)的主题参数空间,有成千上万的主题数和很大的词汇量。
为应对大规模数据及模型可扩展性,LightLDA实现了一种分布式数据并行策略的LDA(将文档通过workers进行分割,共享所有主题参数)。当然,你也可以使用SparseLDA和AliasLDA的sampler进行算法加速,来进一步降低运行时长。使用1000台机器,就可以使用LDA模型从10亿级别的文档中infer出具有100亿的参数。这个结果是惊人的,但开销很大:例如,一个1000台机器的集群将花费上百万美金(这还不算电费和维护费用)。另外,你可以租用云平台,这样每台机器每小时也要>=1美元,每个月的开销也要>=70w美元。这对于大多说研究者来说是不可行的。
LightLDA提出了一种花费更小的方法来解决这种大规模ML问题,在10台机器级别就能解决该问题。在三个级别上处理该问题:
- 1.以data-paralled 和 model-paralled方式实现分布式LDA inference:数据和模型会被分区(partitioned),接着跨机器进行流传输(streamed),以便在集群内更有效地利用内存和网络资源。
- 2.开发了一个Metropolis-Hastings sampler,对每个word/token,允许O(1)的采样时间,这可以在时间上产生一个高收敛率,可以击败当前state-of-art的samplers。
- 3.使用了一种不同的数据结构,利用海量语料,可以展示高频头部词,也可以展示低频长尾词,以不同的方式存储,有效利用资源,没有性能损失。
使用开源的Petuun framework,我们生成了一种即快又省内存(compute-and memory eficient)的分布式LDA实现:LightLDA。对于上亿的文档(2000亿tokens),它有1万亿的模型参数(1m的主题 x 1m的词汇量),只需要8台标准机器(与云平台常用计算实例配置类似),在180小时内,或者24台机器60小时。对于参数的size,我们的结果是:比文本数据集上大两阶。对于数据的size(data size)至少相当或者比其它大1阶。对于吞吐量(throughput),我们的系统可以在每20-core机器上,每小时采样5000w文档(平均长度为200 tokens)。而PLDA+每机器每小时使用collasped Gibbs sampler只能达到1200个文档;YahooLDA在每8-core机器每小时200w文档。
#
LDA实现分布variational- 和 sampling-based inference算法。LightLDA只关注sampling-based的方法。因为它可以产生非常稀疏的更新,使它们很适合设置很大的主题数K。
最新的large-scale LDA实现需要使用很大的工业级的集群,使用成百上千的CPU core。这些实现需要大集群的原因是:它们使用了SparseLDA inference算法或者 更慢的原始collapsed Gibbs sampler inference算法。这些情况下,inference算法本身是一个限制因素。我们通过开发了一种新的O(1)-per-token Metropolis-Hastings sampler来处理这个瓶颈,它比SparseLDA sampler几乎快一阶——它允许我们在小集群上处理大语料。我们注意到:最新的AliasLDA也提供了一个解法来解决SparseLDA的瓶颈。然而,另一方面,AliasLDA的计算复杂度为O(Kd),因此,并不擅长处理更长的文档(比如网页),因为doc-topic表在初始迭代时是dense的,因此Kd会很大;更一方面,AliasLDA的paper只描述了一种单机实现,它的分布式和可扩展性是不清楚的(特别是考虑到AliasLDA的高空间复杂度,对于每个词的alias table需要O(K))。在本paper中,我们展示了Metropolis-Hastings sampler在单机上在各指标上均快于AliasLDA,这使我们不再考虑使用AliasLDA。
上述提到的large-scala LDA本质上分为:data-parallelism(在机器之间分割文档) vs. model-parallelism(在机器上分割word-topic分布)。YahooLDA[1]和基于parameter-server的实现[11],将word-topic看成是全局共享的,在这种inference算法上,关于如何跨机器进行物理排序word-topic分布是不可知的。更重要的, 它们在token topic索引$ z_{di} $上以文档为中心的方式调度inference计算,因此将它们看成是data-parallel-only的实现。结论是,我们即不希望yahooLDA的实现,也不希望基于parameter-server的实现,来处理非常大的主题模型(1万亿参数)。一旦整个语料在足够多的机器上传输,每台机器的本地文档只有一小部分参与LDA模型,因此,每台机器需要的内存不会太大。这样的设计,如果没有大的计算集群,根本不能处理大的主题模型。
另一方面,PLDA+和Peacock会额外根据词$ w_{di}$ 将token topic indicators $ z_{di} $ 进行分组(group);这是有好处的。因为它会减小word-topic分布(在每个worker机器上持有)的比例——可以有效地在top个data-parallelism上进行model-parallelism。特别的,采用一种grid-like model-paralled分区策略,需要让训练数据、LDA模型和worker机器进行通信(对比我们的设计,这需要额外的开销)。另一点要注意的是在PLDA+的设计中的pipeline,只需要workers在内存中持有模型的一小部分;然而,这样的系统使用了一个过时的,很慢的Gibbs sampler,它们的数据分布和调度对于极大的数据和模型来说不合适。特别的,它的word-bundling策略依靠一个关于训练数据的倒排索引表示,会是文档内存的两倍(这几乎不可承受,因为内存在large-scale LDA中很昂贵)。LightLDA采用了一种不同的data-and-model-paralled策略来最大化地减小内存和CPU开销:我们将word-topic分布以一种结构感知的model-parallel方式进行切分(slice),我们在workers上将文档块固定,所需要的模型参数通过一种异步有界的data-paralled scheme(a bounded-asynchronous data-parallel scheme)传输给它们。这允许我们可以在一个10亿级的文档上,只需要8台机器就可以训练一个1万亿参数的LDA模型。当增加额外的机器时可以获取线性加速。
3.结构感知的模型并行(Structure-Aware Model Parallelism for LDA)
当训练LDA,使用10w个主题可以极大提升所学到的模型——一个原因是,非常大的语料常包含许多小的,但很合适的主题(长尾主题:long tail),当模型只有上千个主题时常检测不到。然而,对于一个达到上百万的主题模型,这会导致word-topic分布包含万亿的参数,因为互联网大语料可以很轻易的包含上百万的唯一词汇。LDA模型在实际中是很稀疏的(许多参数为0),一个上万亿参数的模型比最新的结果要大两阶【12,1,21,11】;事实上,一些已经存在的分布式实现不会扩展到这么大的模型,因为模型需要通过workers进行划分——例如,系统设计需要假设一个worker’s的文档将从不会达到模型的一小部分,但这在一些语料上是不实际的。除了通过worker机器上进行分区外(所有最新的分布式LDA实现都以这种方式),我们必须同时以一种保守的方式对模型进行分区,以确保workers不会耗尽内存。这就是结构感知的model-parallelism。
在我们的model-parallel策略中,我们简单回顾下LDA模型来确定下相关术语。假设在语料中每个文档按以下方式生成:
- $ \phi_{k} ~ Dirichlet(\beta) $: 为每个主题k抽取词分布$ \phi_{k} $
- $ \theta_d ~ Dirichlet(\alpha) $:为每个文档d抽取主题分布$theta_d $
- $ n_d ~ Poisson(\gamma) $:对于每个文档d,抽取长度$n_d$(例如:它包含的tokens数目)
- 对于每个token $ i \in { 1,2, \cdot, n_d } $:
- $z_{di} ~ Multinomial(\theta_{di}) $: 抽取token的主题
- $w_{di} ~ Multinomial(\phi_{z_{di}})$:抽取token的词
对于标准的collapsed Gibbs sampler for LDA,以如下方式表述:除了token的topic indicator $z_{di}$ 外,所有变量都被analytically integrated out,我们只需要根据Gibbs sample $ z_{di} $:
\[p(z_{di}=k\| rest) \propto \frac{(n_{kd}^{-di}+\alpha_k)(n_{kw}^{-di} + \beta_{w})}{n_k^{-di} + \hat{\beta}}\]…(1)
其中w是$w_{di}$的简写,$ \hat{\beta} := \sum_{w}\beta_{w}$, $n_{kd}^{-di}$是文档d中分配给主题k的token数(除了token $z_{di}$),$n_{kw}^{-di}$是词w(跨所有文档)被分配给主题k的token数(除了token $z_{di}$)。为了避免昂贵的重新计算开销,这些counts(也被称为是“充分统计(sufficient statistics)”)可以缓存成tables,当一个token topic indicator $z_{di}$更改时会发生更新。特别的,所有count $n_{kd}$的集合口头上指的是document-topic table(作为$\theta_d$的sufficient statistics),而所有counts $n_{kw}$的集合则指的是word-topic table(作为$phi_{k}$的sufficient statistics)。
最小情况下,任何分布式LDA实现必须将token topic indicators $z_{di} $、doc-topic table$n_{kd}$(即data),word-topic table $n_{kw}$(即model)进行分区(partition)。当一个LDA sampler正在采样一个token topic indicator $z_{di}$时,它需要看下在word-topic table(即document)中的第$n_{kw_{di}}$行(row)。然而,原始的分区策略会导致一些机器获取word-topic table的一大块:假设我们按顺序对每个文档的tokens进行抽样,那么该worker必须将根据文档中的词汇看到在word-topic table中的所有行。使用快速的Metropolis-Hastings sampler,每个worker可以每秒抽样成千上万的文档(假设每个文档有成百上千个token);更进一步,我们经验上观察到上百万的文档(web-scale语料会更大)足够激活整个word-topic table。这样,原始的序列只描述了word-topic table在每个worker上进出时的快速交换(swap),会生成一个过高的网络通信开销。
我们的结构感知的model-paralled方法打算解决在fast LDA sampling和每个worker上受限内存空间之间的矛盾;这受到块坐标下降算法(block coordinate descent algorithms)的启发。数据块在运行LDA sampler前生成,我们注意到,在每个块上实例化词汇表中的词汇开销很小。该信息绘作为meta-data绑定到块(block)上。如图1所示,当我们加载一个数据块(和它的meta-data)到本地内存(红色的正方形)时,我们从块中的本地词汇(local words)选择一小部分词集合(图中的V1)。这词集足够小,根据在word-topic table中的第$\n.,w_{di}$行,可以存储在worker上的本地内存中——我们将这些行集合称为一个模型切片(“model slice”)。我们的系统会通过网络获取(fetch)model slice,sampler则只对块中的这些tokens进行抽样,它们可以被获取到的切片覆盖到;所有的其它tokens将不会接触到。这种方式下,系统只需要在本地内存中维护一个瘦模型切片(thin model slice),在当前数据块中对所有文档可复用。一旦由slice所覆盖的所有tokens被抽样到,系统会通过网络获取(fetch)下一个model slice(称为V2),对由它覆盖到的tokens进行抽样处理。这种方式(类似于TCP/IP协议或图像处理中的滑动窗口),系统会处理一个数据块中的所有tokens,在从磁盘中加载下一个块之前,一次一个slice。这种和磁盘交互的块交换(swapping of blocks)可以进行核外执行(out-of-core execution).
图1: LDA中的: Structure-aware model parallelism
除了可以让worker保持低内存需求外,结构感知的模型并行(structure-aware model parallelism)可以以如下方式缓和网络开销瓶颈:
- 1.workers不会移到下一个model slice,直到当前slice上相应的所有tokens都被抽样到,我们不需要对模型应用caching和eviction策略。
- 2.因为data-model切片是静态的、不可变更的(static and unchanging),我们将它们的加载(loading:从磁盘中读取数据块,从中心parameter server获取模型)进行pipeling,来隐藏网络通信延迟。
最后要注意一点,structure-aware model parallel策略会“发送模型给数据:(sends the model to the data)”,而非相反。这受两点启发:
- 1.数据(the data)(包含tokens $w_{di}$和相应的topic indicators $ z_{di} $)比模型(the model)更大(模型有万亿参数)
- 2.sampler收敛,模型会更加稀疏(这样可以减少网络通信),而数据的大小仍然是常数。
我们观察到其它分布式LDA的设计采用的是“发送数据给模型(send data to model)”的策略,这会开销更大。
4.LDA的Fast Sampling算法
structure-aware model-parallelism的目的是:在小集群上,从十亿级别的文档中学到非常大的,上万亿参数的LDA模型;更进一步,bounded-asynchronous data-parallel schemes可使用parameter servers来减小网络同步和通信的开销。然而,这些还不能让大的LDA模型快速地进行训练;这激发了我们更大的贡献:一种新的LDA sampling算法,它比最新的算法(SparseLDA和AliasLDA)收敛地更快。为了解释我们的算法,先简单回顾下SparseLDA和AliasLDA的机制。
SparseLDA
SparseLDA使用这样的观测:
- 1.大多数文档只有少量的主题
- 2.大多数词汇只参与少量的主题
这表现为doc-topic和word-topic table同时具有稀疏性(sparsity),其中SparseLDA通过将 collapsed Gibbs sampler的条件概率(等式1)分解三个terms:
\[p(z_{di}=k\| rest) \propto \frac{\alpha_{k}\beta_w}{n_k^{-di}+\hat{\beta}} + \frac{n_{kd}^{-di}\beta_w}{n_k^{-di}+\hat{\beta}} + \frac{n_{kw}^{-di}(n_{kd}^{-di}+\alpha_{k)}{n_k^{-di}+\hat{\beta}}\]…(2)
第一部分为r,第二部分为s,第三部分为t。当Gibbs sampler接近收敛时,第二项s和第三项t会变得很稀疏(因为文档和词被安排到少量的主题上)。SparseLDA首先抽样三个项r,s,t中的其中之一,根据它们在k个可能结果上的概率求和,接着,SparseLDA会其于选中的r,s,t中的某项来抽样主题k。如果s或t被选中,那么抽样的主题k会各自花费$O(K_d)$或$O(K_w)$的时间,其中$K_d$是文档d所包含的主题数,而$K_w$是词w所属的主题数。折算下来SparseLDA的抽样复杂度为$O(K_d+K_w)$,而对于标准的collapsed Gibbs sampler只有O(K)。
AliasLDA
AliasLDA提出了另一种的Gibbs sampling probability分解:
\[p(z_{di}=k\| rest) \propto \frac{n_{kd}^{-di}(n_{kw}^{-di}+\beta_{w})}{n_{k}^{-di}+\hat{\beta}} + frac{\alpha_{k}(n_{kw}+\beta{w})}{n_k+\hat{\beta}}\]…(3)
第一项为u,第二项为v,AliasLDA会预先为第二项v计算一个alias table,它允许在O(1)时间内通过Metropolis-Hastings被抽样到。通过在许多tokens上复用该table,构建该表需要O(K)的开销,折算下来每个token需要O(1)的开销。第一项u是稀疏的(在$K_d$上线性,即在文档d上的当前主题数),可以在$O(K_d)$时间内被计算。
4.1 Metropolis-Hastings sampling
我们看到,折算成每个token所需要的抽样时间,SparseLDA和AliasLDA达到了$O(K_d+K_w)$和$O(K_d)$。这样的加速抽样是很重要的,因为我们可以简化;原始的collapsed Gibbs sampler (Eq. 1)对于每个token需要O(K)的计算开销,这在K=100w个主题上明显是棘手的。SparseLDA减小了sampling的复杂度,通过利用稀疏性,而AliasLDA则利用alias方法以及Metropolis-Hastings算法。LightLDA sampler也使用Metropolis-Hastings,但对于合适的分布式设计有新的见解,这对于高性能表现来说相当重要。我们展示了sampling过程可以被加速,更进一步,使用设计良好的proposal distribution q(·)给true LDA posterior p(·)。
一个A well-designed proposal q(·)可以以两种方式加速sampling过程:
- 1.从q(·)中抽取样本,比从p(·)中抽取样本更便宜
- 2.Markov chain可以快速混合(只需要一少部分step)
这涉及到如何为p(·)构建一个良好的q(·)?如果q(·)与p(·)非常相似,那么构建的Markov chain会快速混合——然而,从q(·)中抽样的开销会和从p(·)的抽样一样昂贵。相反地,如果q(·)与p(·)非常不同,我们可以从中抽样的开销会更小——但这种构建的Markov chain的mix会过慢,需要许多步才会收敛。为了理解这种trade-off,需要考虑下面的临界情况:
-
均匀分布Proposal(Uniform Distribution Proposal): 假设我们选择了q(·)作为均匀分布。MH算法会提出:下一状态 t ~ Uni f(1,…,K) ,并接受$min(1, \frac{p(t)}{p(s)}$的概率的状态。很明显,从一个均匀分布中进行sampling是开销很小的,可以在O(1)时间内完成;然而,均匀分布是非稀疏的,因此与p(·)会相当远,它需要多步的MH来进行mix。
-
完全条件分布Proposal(Full Conditional Distribution Proposa):我们可以选择p(·)作为proposal分布q(·)。MH算法提出了下一步t的概率为p(t),接受$ min{1, \frac{(p(t))p(s)}{p(s)p(t)}=1 $;例如:算法会接受所有的proposals。从q(·)中抽样很明显开销与p(·)一样多,但mixing很很快,因为所有的proposals都被接受了。
4.2 因子分解(Factorization)的Cheap Proposals
为了设计一个开销很小的MH算法,它具有高的mixing rate,我们采用了一个因子分解的策略(a factorized strategy):我们只构建了一个O(1) proposals的集合,选在它们之间做交替选择。为了构建这样的proposals,我们从关于token topic indicator $z_{di}$的真实条件概率(true conditional probability)开始:
\[p(z_{di}=k\| rest) \propto \frac{(n_{kd}^{-di}+\alpha_k)(n_{kw}^{-di}+\beta_w)}{n_k^{-di}+\hat{beta}}\]…(4)
观察到,它可以分解成两项:
\[p(z_{di}=k\| rest) \propto (n_{kd} + \alpha_{k}) x \frac{n_{kw}+\beta_w}{n_k + \hat{\beta}}\]…(5)
第一项为doc-proposal,第二项为word-proposal。即使我们利用这两项的稀疏性,从该条件概率中抽样的开销至少要$ O(min(K_d,K_w)) $——我们是否可以做的更好呢?我们观察到,第一项是依赖文档的(document-dependent),也词汇独立的(word-independent),而第二项是文档独立的(document-independent)、依赖词汇的(word-dependent)。更进一步,它直觉上可看到,最可能的主题是从doc-dependent项和word-dependent项上那些高概率的部分;然而,单独的项可以作为一个好的proposal q——但如果p在主题k上具有高概率,那么这一项也可在k上具有高概率(倒过来不正确)。重要的是,alias方法(在AliasLDA中使用的)可以应用于两项,减少从这样的proposal中抽样的开销至:分摊下来,每token的时间复杂度O(1)。下面分别讨论下两种proposal。
Word-Proposal for Metropolis-Hastings
将$p_w$定义成word-proposal分布:
\[p_w(k) \propto \frac{n_{kw}+\beta_{w}}{n_k+\hat{\beta}}\]…(6)
状态转移s->t的接受概率(aceptance probability)为:
\[min\{1, \frac{p(t)p_w(s)}{p(s)p_w(t)} \}\]…(7)
假设$\pi_w := \frac{p(t)p_w(s)}{p(s)p_w(t)}$,我们可以展示成:
\[\pi_w = \frac{(n_{td}^{-di}+\alpha_t)(n_tw^{-di}+\beta_w)(n_s^{-di}+\hat{\beta})(n_{sw}+\beta_w)(n_t+\hat{\beta})}{(n_{sd}^{-di}+\alpha_s)(n_{sw}^{-di}+\beta_w)(n_t^{-di}+\hat{\beta})(n_{tw}+\beta_w)(n_s+\hat{\beta})}\]…(8)
一旦$t ~ p_w(t)$被抽样到,接受概率可以在O(1)时间内被计算,只要我们根据所在的sufficient statistics n。在抽样期间。直觉上,$\pi_w$是很高的(相对于topic s),不论何时proposed topic t在文档中内很流行,或者对于词w来流行。因为word-proposal趋向于提出主题t,对于词w很流行,使用word-proposal将会探索p(k)的状态空间。为了在O(1)内抽样$p_w$,我们使用类似于[10]的alias table。如图2所示,alias方法的基本思想是,将一个非均匀的分布转化成一个均匀的分布(例如:alias table)。因为alias table会在MH sampling中复用,转称的开销可以分摊到O(1).
尽管alias方法具有O(1)的分摊时间复杂度,它的空间复杂度仍然很高,因为每个词的proposal分布的alias table会存储2K个值:每个二元(bin)的分割点和分割点上的alias value。如果我们需要存储许多词的alias table,这是禁止的。我们的见解是:alias table可以稀疏化,我们可以通过将$p_w=\frac{n_{kw}}{n_k+\beta}+\frac{\beta_w}{\n_k+\beta}$开始。接着抽取两项中的其中之一,我们使用一个预先构建的alias table(从$n_{kw}$中创建,指向于词w)中选中一个主题,它是稀疏的。如果我们抽取第二项,我们也用一个预先构建的alias table(从$n_k$中创建,对于所有词w是共用的,可为所有V个词分摊)来选中一个主题,它是dense的。这种方式下,我们将构建词w的alias table所需的时间复杂度和空间复杂度减小到:$O(K_w)$(词w所参与的主题数)
Doc-Proposal for Metropolis Hastings
将$p_d$定义成doc-proposal分布:
\[p_d(k) \propto n_{kd} + \alpha_{k}\]…(9)
s->t状态转移的接受概率为:
\[min(1, \frac{p(t)p_d(s)}{p(s)p_d(t)})\]…(10)
假设$\pi_d := \frac{p(t)p_d(s)}{p(s)p_d(t)}$,我们可以展示为:
\[\pi_d=\frac{(n_{td}^{-di}+\alpha_t)(n_{tw}^{-di}+\beta_w)(n_s^{-di}+\hat{\beta})(n_{sd}+\alpha_s)}{(n_{sd}^{-di}+\alpha_s)(n_{sw}^{-di}+\beta_w)(n_t^{-di}+\hat{\beta})(n_{td}+\alpha_t)}\]…(11)
至于word-proposal,我们看到:doc-proposal满足,在任何时候主题t(相对于主题s)在文档d内是流行的,或者对于词w。我们将$p_d(k) \propto \frac{n_{kd}}{n_d+\alpha}+\frac{\alpha_k}{n_k+\alpha}$分解成类似于word-proposal的结构,当我们选择第一项时,我们不需要显式构建alias table——这是因为document token topic indicators $z_{di}$可当成是一个alias table。特别的,第一项$n_{kd}$会统计主题k在文档d内的次数,换句话说:
\[n_{kd} = \sum_{i=1}^{n_d}[z_{di}=k]\]…(12)
其中[·]是一个指示函数。这暗示着,对于未归一化的概率分布$n_{kd}$来说,数组$z_{di}$是一个alias table,因此我们可以简化为:通过一个整数j非均匀地从{1,2,…,nd}中抽取一个整数$n_{kd}$,并设置为:$z_{di}=z_{dj}$。图3使用了一个toy example来展示该过程。因而,我们可以下结论:doc-proposal可以在O(1)的非分摊时间中被抽样(因为我们不需要构建一个alias table)。
4.3 结合proposals提升Mixing
不管doc-proposal,还是word-proposal,可以被独立用于LDA中一种有效的MH算法,实例上,许多MH-steps(为每个token重复抽样)需要生成合适的mixing。只需一少部分的MH-steps,单独使用word-proposal可以支持word-topic分布中的稀疏性(例如:每个词都属于很少的主题),但会在document-topic分布中引起很低的稀疏性(例如:每个文档包含了多个主题)。相反地,单独使用doc-proposal,只需要少量的MH-step就会导致在document-topic分布上的稀疏性,同时产生非稀疏的word-topic分布。因此,这种proposal可以很快地对tokens进行抽样,它们需要许多MH-steps来进行很好的混合(mix)。
快速Metropolis-Hastings mixing的关键是,一个proposal分布可以快速探索状态空间,并达到具有高概率(the models)的所有状态。word-proposal的$p_w(k)$擅长于proposing自己的模式(会在少量主题上产生词的聚集),并同样为doc-proposal $p_d(k)$进行proposing。如图4所示,单独使用word-proposal或doc-proposal,一些模式(modes)将从不会被快速探索到。
当仍然维持较高的sampling效率时,我们如何来达到一个更好的mixing rate?如果我们看一下$p(k) \propto p_w(k) x p_d(k) $,我们会看到:对于p(k)会很高(例如:一个mode),我们需要:$p_w(k)$或$p_d(k)$要足够大——但不需要同时满足。然而,我们的解决方案是:将doc-proposal和word-proposal结合成一个“cycle proposal”:
\[p_c(k) \propto p_d(k) p_w(k)\]…(13)
对于每个token,通过我们构建一个MH序列,
5.对头部词(Power-Law Words)采用混合数据结构
即使是data-model分区,当对于非常大的主题数的LDA时,内存大小仍然是一个障碍。LDA模型,或者是word-topic table$ n_{kw} $,是V X K的矩阵,交且一个naive dense representation版本,会需要过高的内存——例如,对于本试验中所使用的V=K=100w,考虑32-bit的整数条件,模型需要4T字节的size。即使用合理的配置高的机器,128G的RAM,也只能需要32台机器来在内存中存储矩阵——实际上,实际的使用可能会更高,因为存在其它系统开销(比如:cache, alias tables, buffers, parameter server)。
一个常用的解决方法是,将sparse数据结构转换成:hash maps。在稀疏存储背后的原理是,文档词汇会遵循长尾分布(power-law)图5所示。有两个暗示:
- 1)在移除stop-words如果看,所有有意义的词汇的词汇几乎会超出32-bit integer的范围(2,147,483,647);这在150亿文档和3w亿tokens,只保留300词频上的web-scale语料上测试时,会超过32-bit的限制。出于这个原因,我们选择使用32-bit的整数,而非64位。
- 2) 即使有数十亿的文档,大多数词的出现次数要少于K次(其中K是主题数,在我们的试验中达到了100w)。这意味着大多数行$n_k$,在word-topic table中是相当稀疏的,因此一个稀疏行表示(hash maps)将极大减小内存占用(memory footprint)。
然而,对比于dense arrays,sparse的数据结构表现出较差的随机访问表现,它会伤到MCMC算法(比如:SparseLDA,AliasLDA以及我们的r Metropolis-Hastings算法),因为它们所有都很严重地依赖于随机访问索引。在我们的试验中,对比于dense arrays,使用纯hash maps会导致一个serveral-fold表示的丢失。当维持高的sampling throughput时,我们怎么才可以享受低内存占用?我们的解决方案是混合数据结构(hybrid data structure),其中,word-topic table的行对应于频繁出现的热词,用dense arrays进行存储;而对于非常见的长尾词,则使用开放寻址/二次探测(open-addressing/quadratic-probing)的hash tables。在数十亿级别的web-scale的语料上,我们发现词汇表中10%的词是热词(“hot”),会覆盖95%的语料中的tokens,而剩余90%的词汇表词则是长尾词,只会覆盖5%的tokens。这暗示着:
- (1)大多数会访问我们的混合word-topic table中的dense arrays,这会保持高的throughput
- (2)word-topic中的大多数行仍是稀疏的hash tables,它可以让内存占用量合理保持较低水平
在我们的V=K=100w的试验中,我们的混合word-topic table只使用0.7TB,如果我们使用纯dense arrays会达到4TB。当该表跨24台机器分布时,每台机器只需要30GB,可以空出昂贵的内存来给其它系统组件用。
6.系统实现
分布式实现对于web-scale的数据来说是令人满意的:它们会将训练时间减少到可承受的水平,大多数实践者会访问至少一台分布式集群。然而,目前存在的分布式LDA实现,只展示出在小问题规模上(特别是模型size)工作良好,或者使用极大的计算集群(有时上千台机器)来完成可接受时间内的训练。如何使用数十台机器来应对和解决大的LDA问题?如果我们希望使用数十亿的训练语料(每个文档至少上百tokens)会占用数T空间,那么在data-paralled的这一点上,简单地从磁盘拷贝数据到内存中都会花费数十小时,当将数据通过网络进行传输时也会花费类似的时间。在model-paralled这一点上,存储1T的参数(100w词 x 100w主题)可以达到上T的内存——只能分布式存储,需要跨机器的参数同步,会有很高的网络通信开销。根据这些注意点,为LightLDA设计了一个架构,它可以将数据传输和参数通信开销尽可能地减少,并让小集群实现成为可能。
系统总览 在开源分布式机器学习Petuum上构建了LDA,对于大规模机器学习,它为结构感知的模型并行(structure-aware model parallelism)以及有界异步数据并行(bounded asynchronous data-parallelism)提供了一个总的框架。根据代码,我们会利用parameter server来实现有bounded asynchronous data-parallelism。一个parameter server对用户会隐藏分布式网络细节(通信和并发控制 ),提供好用的API来开发分布式机器学习程序——该思想让机器学习专家专注于描述算法逻辑,而非系统的细节。我们首先引入总的parameter server的思想,接着描述我们如何让大的LDA模型在小集群上进行增强。
Parameter Server和Data Placement 在基本水平上,一个parameter server(PS)会保存一个分布式共享内存接口[16],其中编程者可以从任何机器上访问内存,对于参数的物理位置不可知。本质上,PS扩展了在单个机器上的内存结构(如图6);存储介质越接近CPU core,越具有较低的时延和较高的传输带宽,但有更少的容量(capacity)。在PS架构上,每个机器的RAM被分成两部分:对于客户端(client)使用的局部RAM,以及对于中心化参数存储的远程RAM(也称为:“server” part)。这样的硬件限制,以及由大的主题模型数据模型引入的需要条件,强烈地影响着我们运行Metropolis-Hastings算法的方式。
我们使用PS来存储两种类型的LDA模型参数:word-topic table $ { n_{kv} }{k=1,v=1}^{K,V}$,它会统计词v分布给主题k的tokens数,一个长为K的“summary row”:$ { n_k }{k=1}^{K} $,它会统计分配给主题k的总tokens数。32-bit的整数可以被用于word-topic table(使用一个dense arrays和sparse hash maps的组合),而对于summary row则使用一个64-bit的整数数组。我们观察到,随着LDA sampler的处理,word-topic table会变得进一步稀疏,随着时间推移会产生更低的网络传输开销。更进一步,Petuum PS支持一个bounded-asynchronous consistency model,它可以减少内部迭代(inter-iteration)参数同步时间,通过一个过时的参数s——对于LightLDA,它已经是一个pipeline的设计,我们发现最优值为:s=1.
如果输入数据比模型大(仍保留不变的throughout LDA接口),通过网络进行传输数据是不明智的。相反地,我们会进行shuffle,跨所有worker机器的磁盘共享语料,每个worker只在它的本机磁盘上访问数据。在图6中,$ { w_{di}, z_{di} }_{d=1,i=1}^{D_n,n_d} $表示在第n台worker上的一份训练数据的shard,其中$D_n$表示第n台worker上的文档数,$n_d$表示在文档d上的tokens数。每个worker的本地内存会持有:
- (1)当前活动的工作数据集$ { w_{di}, z_{di} }{d=1,i=1}^{D{S_n,n_d}} $
- (2)模型$ { n_{kv} }{k=1,v=1}^{K,V_s} $需要抽样tokens的当前集合(使用Metropolis-Hastings sampler)。在抽样期间,我们更新token topic indicators $z{di}$,以及word-topic table。token-topic pairs($w_{di}, z_{di}$)位于本地worker上,不会有网络通信,而word-topic table则被存储在PS中,因而需要一个后台线程来进行有效通信。
Token 和 Topic Indicator Storage 作为data-parallel执行的一部分,每个worker机器会在本地磁盘上存储语料的某个shard。对于web-scale级别的语料,每个shard仍会很大——如果没有许多T,也会有数百G——它不会将整个shard加载进内存中。这样,我们进一步将每个数据的shard分割成数据块(data blocks),并将这些块同时流式化进内存中(见图1左)。根据数据结构,我们故意将tokens $w_{di}$和它们的topic indicators $z_{di}$进行side-by-side放置,作为一个$(w_{di}, z_{di})$ pair的向量(vector),而非两个独立的tokens和topic indicators数组。我们这么做是为了提升数据的locality和CPU cache更有效:无论何时当我们访问一个token $ w_{di} $时,我们总是需要访问它的topic indicator $z_{di}$,vector-of-pairs的设计方式可以直接提升locality。这种设计的一个缺点是:额外的磁盘I/O,每次读/写 tokens $w_{di}$一个数据shard到磁盘中时会swap out。然而,磁盘I/O总是通过对读/写进行pipeline的方式进行masked,当sampler正处理当前shard时会在后台完成。
我们指出,我们的streaming和disk-swapping(out-of-core)设计,天然的会以如下的方式支持容错:如果我们通过原子文件重写来执行一个data swapping到磁盘,接着当系统发生失败时,它会简单地通过warm-start来进行resume训练过程:读取swapped-to-disk模型,re-initialize word-topic和doc-topic table,继续。作为比较,像PLDA+和YahooLDA也具有容错机制,它们需要周期性地将数据和(或)模型进行dump出来——但这在大数据/模型的情况下会引发不平凡的开销。
对结构感知的模型并行进行tuning 我们在第3部分引入了Structure-Aware Model Parallelization的高级思想并应用到LDA中,仍有许多改进来提升效果。我们描述了最重要的几点:
- 1.在计算一个数据块或一个模型切片时,一个worker的CPU core需要等待下一个数据块/模型切片从磁盘/网络被加载。我们通过pipelinging(如图7所示)消除了该I/O延迟,尽管我们注意到:执行pipelining需要很仔细的参数配置(samplers的throughout,数据块的size,模型切片的size等)
- 2.为了阻止数据加载在跨模型切片时的imbalance,我们通过对词汇表通过词频进行排序来生成模型切片,接着对词汇进行shuffing。这种方式下,每个切片会同时包含热词(hot words)和长尾词(long tail words),来改善加载的平衡。
- 3.为了消除数据传输的不必要性,当生成数据块时,我们对token-topic pair $w_{di}, z_{di}$根据$w_{di}$在重排后(shuffled)的词汇表中的位置进行排序,确保属于相同的模型切片的所有tokens在数据块上实际是连续的(见图1)。这种排序只需要执行一次,在数据处理平台如Hadoop上会很快(对比于LDA sampling time)。我们认为:比起PLDA+中的”word bundle”,这种方式更有效,PLDA+会用一个倒排索引来避免数据传输,但会带来两倍的内存开销。
多线程的效果 我们的sampler在单个worker上会并行。通过将内存中的数据块分割成不相效的部分(通过独立线程进行抽样),并在线程间共防震内存中的模型切片。再进一步,我们会让shared模型切片变得immutable, 在将这些数据汇总发送到parameter server之前,会在本地延迟所有的模型更新。通过将模型切片保持immutable状态,我们避免了并发问题(比如:条件竞争和锁),这样就可以达到在接近线性的intra-node扩展性。当模型更新延迟时,理论上会减慢模型的收敛率(convergence rate),实际上,它会消除并发问题,增加sampler throughput,轻易地胜过更慢的收敛率。
现代的server级机器包含了许多CPU sockets(每个CPU有许多物理core),可以连接到不同的内存条(memory banks)上。当这些内存条可以被所有CPU寻址时,当访问绑定到另一socket上的远端内存条时,内存延迟会更长——也是就是:Non-Uniform Memory Access (NUMA). 在我们的实验中,通过对sampling参数进行调整(比如:f Metropolis-Hastings steps数),我们对它们进行部分寻址,发现NUMA的作用相当大。也就是说,我们相信,合适的NUMA-aware编程是一个更好的长期解决方案来解决该问题。最终,我们注意到:为每个线程设置core的关系,在intel处理器上开启硬件超线程(hardware hyper-threading)效果很好,我们观察到一个30%的性能增益。
7.试验数据
对比之前的LDA实现,我们展示了:只需要更少的机器,LightLDA可以调练更大的LDA模型——这归因于data-model的分片,特别是新的Metropolis-Hastings sampler,它比SparseLDA和AliasLDA要快一阶。我们使用许多数据集(表7),注意,Bing的”web chunk”数据集有12亿的网页(共2000亿的tokens)。我们的试验表明:
- (1)在core数目和机器数目上,我们的分布式实现有接近线性的可扩展性
- (2)比起state-of-art的SparseLDA和AliasLDA samplers,我们的分布式实现有接近线性的可扩展性(在单线程设置中测量得到)
- (3)最重要的是,LightLDA可以允许很大的数据size和模型size,只需要8台左右机器即可。