wechat在《Ekko: A Large-Scale Deep Learning Recommender System with Low-Latency Model Update》给出了它们的实时方案:
摘要
深度学习推荐系统(DLRSs)需要在低延迟下更新模型,以便及时为新用户和内容提供服务。然而,现有的DLRSs未能做到这一点。它们通常:在离线状态下训练/验证模型,并将整个模型广播到全局推理集群中。因此,它们会带来显著的模型更新延迟(例如几十分钟),这对服务级别目标(SLOs:Service-Level Objectives)产生了不利影响。
本文介绍了一种名为Ekko的新型DLRS,它能够实现低延迟的模型更新。其设计理念是允许模型更新立即传播到所有推理集群,从而绕过长时间延迟的模型checkpoint、验证和广播。为了实现这一理念:
- 首先,我们设计了一种高效的点对点模型更新传播算法。该算法利用DLRS模型更新中的稀疏性和时间局部性,以提高模型更新的吞吐量和延迟。
- 此外,Ekko还配备了一个模型更新调度器,可以在网络繁忙时优先发送对SLOs影响较大的模型更新。
- 最后,Ekko还包含一个推理模型状态管理器,用于监控推理模型的SLOs,并在检测到对SLOs有害的偏差更新(detrimental biased update)时回滚模型。
评估结果表明,Ekko比最先进的DLRS系统快了几个数量级。Ekko已在生产环境中部署超过一年,每天为超过十亿用户提供服务,并将模型更新延迟从最先进系统的几十分钟减少到2.4秒。
1 引言
深度学习推荐系统(DLRSs)是大型技术组织(如Meta [54]、字节跳动 [23]、谷歌 [15] 和英伟达 [56])中的关键基础设施。DLRS通常包含一组大型参数服务器,这些服务器托管着众多机器学习(ML)模型(例如嵌入表 [10, 26, 54] 和深度神经网络 [18])。参数服务器在地理分布的数据中心中进行复制,以实现容错和与客户端的低延迟通信。每个数据中心都有一组推理服务器,这些服务器从本地参数服务器中拉取模型,并为客户端提供推荐结果。
为了确保能够及时为新用户和内容提供服务,DLRS必须持续更新ML模型:它首先使用训练服务器收集新的训练数据并计算模型梯度,然后通过广域网(WAN)将模型更新传播到模型副本。
大规模的DLRS需要为数十亿用户提供服务 [15, 23, 54],并且必须实现与延迟相关的服务级别目标(SLOs)[49],例如将新创建的内容提供给用户的延迟。为了最好地实现SLOs,DLRS的操作者对实现低延迟模型更新提出了新的需求。这有几个原因:
- (i)最近的DLRS应用(如YouTube [24] 或 TikTok [8])使用户能够创建大量的短视频、文章和图像。所有这些内容都需要尽快提供给客户端,通常在几分钟甚至几秒钟内;
- (ii)数据保护法律(如GDPR [60])允许DLRS用户匿名。匿名用户的行为需要在线学习;
- (iii)许多在线ML模型(如强化学习 [74])已被用于生产中以提高推荐质量。这些模型必须在线持续更新以实现最佳性能。
然而,在现有的DLRS中实现低延迟模型更新极为困难。现有系统(如Merlin [56]、TFRA [66]、Check-N-Run [21] 和 BigGraph [39])采用离线方式更新模型:在收集新训练数据后,这些系统离线计算模型梯度,验证模型checkpoint,并将checkpoint广播到所有数据中心。这样的模型更新过程可能需要几分钟甚至几小时 [21]。
另一种方法是使用WAN优化的ML系统 [28] 或联邦学习系统 [37]。这些系统使用本地收集的数据更新副本模型,并延迟同步副本。然而,延迟同步引入了显著的异步性,这通常会对SLOs的实现产生不利影响 [28, 42]。
我们希望探索一种DLRS设计,能够在实现低延迟模型更新的同时不损害SLOs。我们的核心思想是:允许训练服务器在线更新模型(使用梯度),并立即将模型更新传播到所有推理集群。这种设计使我们能够绕过长时间延迟的更新步骤,包括离线训练、模型checkpoint、验证和广播,从而减少模型更新延迟。为了使这一设计可行,我们需要解决几个挑战:
- (i)如何在带宽有限且网络路径异构的WAN上高效传播大量模型更新 [28];
- (ii)如何保护SLOs免受网络拥塞的影响,这种拥塞可能会延迟关键更新;
- (iii)如何保护SLOs免受对模型准确性有害的偏差更新的影响。
本文介绍了Ekko,一种新型的大规模DLRS,能够以低延迟更新全局复制的模型。Ekko的设计做出了以下几个关键贡献:
(1) 高效的点对点模型更新传播
现有的参数服务器通常采用主备数据复制协议[11, 41, 67] 来实现模型更新。然而,在大模型更新的情况下,主备协议由于更新延迟长 [67] 和领导者瓶颈 [2] 而表现出不足的可扩展性。
为了解决这些问题,我们探索了如何实现点对点(P2P)[20] 模型更新传播。我们为地理分布的DLRS设计了一种高效的无日志状态同步算法(见§4)。该算法在DLRS中非常有效,因为模型更新通常集中在热门参数上 [21],并且它只传输模型参数的最新版本(即状态)。Ekko必须允许参数服务器以P2P方式高效发现模型状态的差异。为此,我们设计了:
- (i)模型更新缓存:使参数服务器能够高效跟踪和比较模型状态;
- (ii)分片版本:可以显著减少比较模型状态时的网络带宽消耗;
- (iii)WAN优化的传播拓扑:使参数服务器能够优先选择带宽充足的区域内网络路径,而不是带宽有限的跨区域网络路径。
(2) SLO保护机制
Ekko允许模型更新在没有离线模型验证的情况下到达推理集群。这种设计可能会使SLOs(特别是与推荐结果的新鲜度和质量相关的SLOs)容易受到网络拥塞和偏差更新的影响,这两种情况在生产环境中都可能发生。
为了处理网络拥塞,我们设计了一个SLO感知的模型更新调度器(见§5)。该调度器计算包括:更新新鲜度优先级、更新重要性优先级、模型优先级在内的指标。这些指标预测模型更新对推理SLOs的影响。调度器根据这些指标在线计算每个模型更新的优先级。我们将调度器集成到参数服务器中,而不改变Ekko中P2P模型更新传播的分布式架构。
Ekko通过一种新颖的推理模型状态管理器处理偏差更新。该管理器为每组推理模型创建一个基线模型。该基线模型接收少量用户流量,并作为推理模型的基准。管理器持续监控基线和推理模型的质量相关SLOs。当偏差更新损坏推理模型的状态时,管理器通知见证服务器将模型回滚到健康状态。
我们使用测试床和大规模生产集群对Ekko进行了评估(见§6)。测试床实验结果表明,与最先进的参数服务器(如Adam [11])相比,Ekko将模型更新延迟减少了最多7倍。
我们进一步在包含40 TB模型和分布在多个地理区域的4,600多台服务器的大规模生产环境中进行了实验。实验结果表明,Ekko在每秒执行10亿次更新(即212 GB/s)的情况下,传播更新的延迟仅为2.4秒。Ekko仅使用总网络带宽的3.0%进行同步,其余带宽用于训练和推理。这种秒级延迟性能比最先进的DLRS基础设施(如TFRA [66] 和 Check-N-Run [21])实现的分钟级延迟(即5分钟 [69])快了几个数量级。
2 DLRS中的低延迟模型更新
在本节中,我们将介绍DLRS及其更新模型的算法。然后,我们描述那些能够从减少模型更新延迟中受益的服务级别目标(SLOs)。最后,我们讨论实现低延迟模型更新所面临的系统挑战。
2.1 DLRS与模型更新
大多数技术组织采用如图1所示的系统架构来构建DLRS。DLRS通常服务于分布在全球的客户端(1)。为了最小化服务延迟,DLRS模型(例如嵌入表 [10, 26, 54] 和深度神经网络 [18])在多个数据中心中进行地理复制。当客户端的请求到达时,推理服务器从本地参数服务器中拉取模型参数,并基于该模型进行推理以响应请求。
图1 一个典型的DLRS架构
数据管道在运行时从客户端收集训练数据(例如新内容和用户活动)。收集到的数据到达数据中心的训练服务器(2)。训练服务器使用优化器 [33] 计算梯度以修正相应的模型。所有更新后的模型(通常有数百到数千个)被持久化为checkpoint(3)。这些checkpoint首先经过验证,只有那些能够改善SLOs的checkpoint才会通过广域网(WAN)传播到面向推理的数据中心的参数服务器(4),从而完成模型更新过程。
在实践中,更新DLRS模型的延迟包括:计算模型更新和将更新传播到全球数据中心的时间。这个延迟定义假设我们已经使用了低延迟的消息队列(例如Kafka [36])来加速训练数据的摄取。最近的DLRS(如NVIDIA Merlin [56] 和 Meta Check-N-Run [21])报告了模型更新的分钟级和小时级延迟。假设我们想要更新一个包含大型嵌入表(通常大小为几TB)的DLRS模型。在这种情况下,将该模型持久化为checkpoint并验证模型可能需要几十分钟。再通过WAN传播该模型还需要十几分钟(假设该WAN提供数Gbps的带宽 [72])。
2.2 低延迟模型更新的原因
DLRS需要实现许多服务级别目标(SLOs),这些目标通常与推荐结果的新鲜度和质量相关。以短视频推荐服务(如TikTok)为例,DLRS模型的准确性决定了该服务的质量SLOs,而将新制作的视频快速提供给用户的时间则决定了该服务的新鲜度SLOs。
在实际的DLRS中,我们观察到SLOs通常依赖于完成模型更新的延迟,这使得低延迟模型更新成为一项关键的系统需求。这有以下几个原因:
(1) 短时间内产生的大量新内容
全球DLRS(如YouTube [24]、TikTok [8] 和 Instagram [22])通常服务于数十亿用户,并且允许用户快速创建大量内容。DLRS需要通过低延迟更新模型,快速将这些新内容整合到推荐结果中,否则会影响用户参与度。
(2) 匿名用户的增加
数据保护法律(如GDPR [60])禁止许多DLRS跟踪用户活动。因此,即使这些用户之前使用过相同的服务,DLRS也可能拥有推荐模型未知的匿名用户。因此,DLRS必须快速响应匿名用户的在线活动,以满足他们的推荐需求。这种快速响应依赖于低延迟的模型更新。
(3) 在线推荐模型的增加
DLRS中越来越多的在线机器学习模型(例如使用强化学习 [74] 和持续学习 [69] 的模型)被用于提高推荐质量。这些模型需要从在线用户活动中收集训练数据,因此必须以低延迟持续更新模型参数。
2.3 我们的核心思想及相关挑战
我们希望探索如何在更新DLRS模型时实现低延迟。我们的观察是,更新延迟主要是由于几个离线步骤累积而成的:模型训练、验证和广播。假设我们绕过这些离线步骤,允许更新后的模型直接传播到推理集群,那么我们可以大幅减少更新模型的步骤,从而实现低延迟。然而,要实现这样的设计,我们必须解决以下几个挑战:
(1) 缺乏高效传播大规模模型更新的算法
现实中的DLRS通常拥有大量模型(例如通常有数百到数千个)。它需要在线更新其中许多模型。这些模型包括多阶段推荐管道 [10, 15] 中的模型以及用于A/B测试 [69] 的模型。这些模型通常占用数十TB的内存,并且需要在线完成大规模模型更新(例如每秒数百GB)。
假设我们使用传统的数据复制协议,例如链式复制 [41] 和两阶段提交 [11]。这些协议针对的是通用数据复制,缺乏在带宽有限的网络(即WAN)上协调ML模型更新的机制(这些更新可能对推理SLOs产生不同的影响)。此外,这些传统协议存在领导者瓶颈问题,并且由于异构的WAN路径和网络滞后节点而导致较长的更新延迟。因此,这些协议不适合满足我们的高吞吐量、低延迟需求。另一种选择是使用地理复制协议 [72]。然而,这些协议无法处理训练数据中心中的服务器故障,因此无法满足我们的系统可用性要求。
我们还考虑了网络高效的分布式ML系统,例如Gaia [28] 和 Google Federated [35]。这些系统 [7, 28, 35, 37, 46] 允许模型在每个数据中心中独立训练,从而提高模型更新的吞吐量和延迟。然而,它们延迟同步其状态,因此会导致模型状态过时 [47],这可能对推荐质量产生不利影响。因此,松散同步的分布式ML系统无法满足我们的模型准确性要求。
(2) 缺乏保护SLOs的机制
在DLRS中启用在线模型更新对SLOs提出了挑战。这样的DLRS可能会出现模型更新竞争网络带宽的情况,从而延迟关键更新(例如那些显著影响模型准确性或上线新内容的更新)。尽管有一些系统可以调度模型梯度的发送 [6],但这些系统针对的是训练集群。因此,它们基于梯度 [6, 28] 优先处理模型更新,而缺乏对这些更新如何影响推理模型SLOs的认识。
在线模型更新甚至可能是有害的。由于在线更新通常是基于一小批数据(在短时间内收集的数据:几秒或几分钟)计算的,它们通常包含噪声 [34]。当更新变得特别嘈杂时,它们会对推理SLOs产生不利影响(即降低推理模型的准确性)。为了解决这个问题,
- 现有的模型服务系统(如Clipper [16] 和 Clockwork [25])使用离线模型验证,这种方法会对长时间(例如几小时)累积的模型更新进行平均。
- 其他模型服务系统(如Google TFRA [66])跟踪推理模型的SLO指标,并在SLOs恶化时重新加载checkpoint。
然而,这样的设计在DLRS中实现起来具有挑战性。大型DLRS模型(例如面向推荐的Transformer模型 [18])越来越常见,重新加载这些模型会影响服务的可用性。
3 Ekko系统架构
本文介绍了Ekko,一种能够实现低延迟模型更新的新型DLRS系统。在本节中,我们将描述Ekko的系统模型,并概述其核心创新组件。
3.1 系统模型
Ekko是一个地理分布的DLRS系统。它在中心数据中心更新模型,然后将更新后的模型传播到靠近全球用户(即客户端)的地理分布数据中心。Ekko将模型表示为键值对,并将模型划分为分片(例如在我们的生产环境中有100,000个分片)。它将模型分片存储在键值存储中(在Ekko中称为参数存储)。参数存储通过哈希将键值对分配到分片中。由于模型经常在线整合新项目和特征过期 [32],模型大小可能会随时间变化。
Ekko使用基于软件的路由器将参数请求定向到模型分片。这些路由器将训练数据中心中的参数服务器指定为模型分片的主节点。它们还确保主节点的选择能够平衡参数请求的工作负载。路由器的实现遵循典型的键值存储和数据库 [38]。本文中我们省略了路由器实现的细节。
在路由器中,分片管理器可以处理资源过载、故障域 [55] 和副本集问题 [12]。与传统的分片管理器不同,Ekko的分片管理器实现了几个DLRS特定的优化:
- (i)为了分摊请求处理开销,Ekko将针对同一模型的并发推理请求进行批处理 [16]。然而,批处理请求可能会查询不同参数服务器上的大量参数(例如数千个),从而导致长尾查询延迟 [19]。为了防止长尾延迟,Ekko限制分配给模型分片的服务器数量;
- (ii)Ekko支持多个需要性能隔离的DLRS应用程序。它将不同应用程序的分片映射到不同的服务器上,因此一个应用程序的分片请求激增不会影响其他应用程序的分片。
3.2 架构概述
我们在图2中突出了Ekko的创新设计。如图所示,Ekko使参数服务器能够实现高效的点对点(P2P)模型更新( 1 )(见§4)。P2P模型更新算法避免了中心训练数据中心广播更新后的模型,而是利用数据中心内部和跨数据中心的所有网络路径(图中的实线),从而在传播模型更新时实现高吞吐量。在没有中央协调器的情况下,每个数据中心可以独立选择优化同步模型更新的间隔。
图2 Ekko架构总览
Ekko支持大规模模型更新的并发传播。这些更新可能会竞争网络资源,从而延迟对SLOs有显著益处的更新。为了解决这个问题,Ekko依赖于一个SLO感知的模型更新调度器( 2 )(见§5.2)。该调度器预测每个模型更新将如何影响推理结果。预测结果有助于计算每个模型更新的优先级。基于优先级,Ekko协调在训练数据中心优先传播哪些模型更新,从而提高推理服务器上SLOs的整体满意度。
Ekko可以保护推理服务器免受有害模型更新的影响。为了实现这一点,它在推理集群中运行一个模型状态管理器( 3 )(见§5.3)。该模型状态管理器监控推理模型的SLO相关指标。如果某个推理模型表现出性能下降(由在线更新引起),管理器会将模型状态回滚到性能更好的状态,从而恢复推理模型的性能。
4 高效的点对点模型更新
本节介绍Ekko中高效的点对点(P2P)模型更新机制。为了实现参数服务器中的P2P模型更新,Ekko的设计实现了以下目标:
- 协调大量参数服务器:Ekko需要协调大量(例如数千个)分布在全球的参数服务器完成模型更新。为了避免网络延迟导致的滞后问题,我们为Ekko中的参数服务器设计了无日志同步机制(§4.3)。
- 支持大规模模型更新:作为一个共享的DLRS,Ekko需要托管数千个模型。这些模型可以在线生成大量(例如每秒数十亿次)更新。为了支持这一点,Ekko使参数服务器能够通过对等节点高效发现模型更新,并在不过度消耗计算和网络资源的情况下拉取更新(§4.4)。
- 支持地理分布式部署:Ekko需要支持地理分布式部署,这通常涉及跨WAN的异构网络路径以及服务器/网络故障。为此,Ekko设计了系统机制,以提高在WAN上发送模型更新的吞吐量/延迟,并容忍服务器/网络故障(§4.5)。
接下来,我们将概述P2P模型更新机制,并详细描述其实现。
4.1 模型更新概述
图3展示了Ekko中模型更新涉及的组件和步骤。假设我们需要在两个副本(分别称为副本1和副本2)之间同步一个分片(称为分片1)。与所有其他分片类似,分片1具有:
- (i)分片知识(shard knowledge),用于总结参数更新;
- (ii)更新缓存(update cache),用于基于参数版本跟踪最近的模型更新。每个分片还关联一个分片版本,用于指示该分片是否可能有需要同步的参数。
分片知识、更新缓存和分片版本共同加速了参数服务器之间的参数同步。
图3 Ekko P2P模型更新总览
为了完成模型更新,副本2从副本1请求最近修改的分片版本( 1 )。副本1收到请求后,返回最近修改的分片版本列表( 2 )。副本2将副本1的所有分片版本与本地分片版本进行比较,然后向副本1发送相关的分片知识( 3 )。最后,副本1将所有更新的参数发送给副本2( 4 )。通过这些步骤,Ekko可以确保模型更新最终以低延迟传播到所有副本(即最终一致性)。
我们发现最终一致性在实际DLRS中是可接受的。尽管DNN副本可能在一个小时间窗口内存在差异,但它们通常表现出接近(甚至完全相同)的推理结果 [11]。这是因为DNN通常使用浮点数表示模型参数,因此即使本地参数值存在微小差异,DNN副本也会做出接近的预测。
4.2 DLRS中的参数版本
为了跟踪模型参数的状态,Ekko为每个键值对(即模型参数的存储格式)分配一个参数版本,定义如下:
定义1(参数版本):参数版本 $ v $ 是一个由时间戳 $ t $ 和唯一标识副本的 $ id $ 组成的对 $ (t, id) $。时间戳 $ t $ 基于现代物理时间源 [14, 43] 提供的时间范围生成。Ekko确保 $ t $ 在每个副本中单调递增,并使用计数器填充物理时间戳,以确保来自单个副本的任何两个更新不会共享相同的时间戳。我们定义参数版本的总序关系:
\[v_1 \geq v_2 \iff (t_1 > t_2) \lor ((t_1 = t_2) \land (id_1 \geq id_2))\]在冲突解决期间,具有较大参数版本的参数将覆盖另一个参数 [62]。
在Ekko中,值得注意的是,时间戳基于实时时钟而不是逻辑时钟(逻辑时钟通常用于键值存储和存储服务)。我们发现这种设计在分布式DLRS中非常有效,原因如下:DLRS具有嵌入表,其中参数是稀疏更新的。假设主副本中有一个嵌入参数,该参数有大量更新计数,但主副本在失败之前未传播该参数。当主副本恢复时,计数器可能会用较小的更新计数覆盖当前主副本。这种覆盖可能会对推荐质量产生不利影响,因为被覆盖的主副本可能具有更新的参数(由最近收集的训练数据更新),从而产生更好的推荐结果。因此,逻辑计数器不足以解决分布式DLRS中的冲突。
4.3 无日志参数同步
一旦为参数分配了版本号,Ekko需要决定如何同步不同的副本。我们观察到DLRS通常会覆盖参数,只有最后一次写入决定参数的状态。因此,我们决定发送参数的最后一个版本。
Ekko需要决定同步副本的间隔。我们可以使用基于日志的同步算法 [9, 11]:这些算法选择同步间隔,以便模型更新可以以不超过网络中最慢链路带宽的速率发送。然而,这些算法会导致许多网络链路的利用率不足。更重要的是,它会导致滞后问题,从而显著增加同步延迟,使参数服务器在从故障中恢复时更有可能具有过时的状态。因此,我们希望实现参数服务器中的无日志参数同步,以便这些服务器可以根据每个链路的带宽动态选择与对等节点的同步间隔。
参数服务器中的分片知识:我们建议使用分片知识 [50, 51] 来实现无日志参数同步。更正式地说,在每个副本中,其所有分片都维护相应的分片知识。分片知识使用版本向量 [58] 实现,总结了它们学到的参数更新。与分片知识 $ VV_{\text{shard}} $ 关联的分片数据反映了应用来自每个副本 $ r $ 的所有历史参数更新后的空分片状态,其中更新对应的参数版本 $ v \leq VV_{\text{shard}}[r] $。假设在副本 $ r $ 中有一个参数 $ p $ 的更新需要处理。为了维护分片知识,该副本生成一个新的参数版本 $ v_p = (t, id) $ 并设置 $ VV_{\text{shard}}[id] = v_p $。
分片同步过程:为了同步一个分片,副本 $ r $ 将其分片知识 $ VV_{r1} $ 发送到选定的副本 $ s $。副本 $ s $ 记录其当前分片知识 $ VV_s $ —— 即原子地读取 $ VV_s $ 并从其存储中选择所有参数 $ p $,其参数版本 $ v_p = (t_p, id_p) > VV_{r1}[id_p] $ —— 并用 $ VV_s $ 响应 $ r $。然后,$ r $ 根据 $ s $ 的响应原子地应用所有参数更新,并进一步将 $ VV_s $ 与其当前分片知识 $ VV_{r2} $ 合并。
在同步过程中有几个注意事项:(i)当副本 $ r $ 与副本 $ s $ 同步时,$ r $ 可能同时与另一个副本(称为副本 $ k $)进行同步操作。这些操作可能在 $ r $ 完成处理 $ s $ 的响应之前完成。因此,$ VV_{r2} $(即 $ VV_r \cup VV_k $ 的结果)不一定等于 $ VV_{r1} $。(ii)在无故障场景中,同步过程会省略所有被覆盖的参数版本,这些场景中更新参数的请求总是路由到同一个主副本。我们发现这些无故障场景在我们的生产环境中很常见。
4.4 提高同步效率
Ekko必须确保参数同步对参数服务器的性能开销可以忽略不计。否则,同步可能会消耗过多的计算和通信资源,从而影响参数服务器在服务模型推理和训练请求时的性能。接下来,我们将讨论如何通过参数更新缓存(减少计算成本)和分片版本(减少通信成本)来提高参数同步的效率。
4.4.1 参数更新缓存
由于一个分片可能包含大量参数,简单地遍历所有参数来响应同步请求会带来巨大的计算成本。尽管我们可以使用索引来加速参数遍历,但维护这样的索引会消耗大量内存资源,而这些资源在参数服务器上难以提供。
我们设计了参数更新缓存来减少参数同步的计算成本。这种缓存的设计利用了我们在DLRS中经常观察到的稀疏性和时间局部性 [21]。与密集的DNN训练系统(每次迭代更新整个模型)不同,DLRS只更新其参数的一个子集(即稀疏性)。例如,在我们的生产DLRS中,每小时只有3.08%的参数被更新。此外,模型更新通常会在一个时间窗口内覆盖某些参数(即时间局部性)。这是因为DLRS通常有热门项目和用户,它们的参数更新在短时间内占主导地位。
具体来说,参数更新缓存包含指向最近更新参数的指针。它利用主导版本向量(Dominator Version Vector,简称DVV)来判断同步请求到达时是否命中缓存。
缓存维护算法:缓存的维护保证两个不变性:(i)对于所有存在于分片中但不在缓存中的参数 $ p_{\text{uncached}} $,满足 $ DVV[id_{p_{\text{uncached}}}] \geq v_{p_{\text{uncached}}} $;(ii)对于所有缓存的参数 $ p_{\text{cached}} $,满足 $ DVV[id_{p_{\text{cached}}}] < v_{p_{\text{cached}}} $。
算法1描述了Ekko中参数更新缓存的维护。维护依赖于估计的更新传播时间 $ D_{\text{prop}} $。考虑更新缓存的函数:
(第1行)。$ t_{\text{pruneto}} $ 是一个时间戳,用于描述 $ DVV_{\text{proposed}} $ —— 一个用于判断是否应修剪参数的版本向量。对于每个修改请求,如果修改参数 $ p $ 的参数版本 $ v_p = (t_p, id_p) $ 大于 $ DVV_{\text{proposed}}[id_p] $,则缓存记录指向该参数的指针(第5行)。否则,缓存将参数版本与 $ DVV $ 合并(第3行)。1
UpdateCache
算法1
考虑修剪参数指针的函数:
(第7行)。该函数接收 $ D_{\text{prop}} $,这本质上允许Ekko利用对缓存命中率的在线观察来指导缓存修剪操作。假设我们希望在缓存大小超过限制时修剪参数指针,缓存首先确定 $ DVV’{\text{proposed}} $,它严格主导 $ DVV{\text{proposed}} $(第8行)。然后,缓存移除被 $ DVV’_{\text{proposed}} $ 主导的参数指针(第11行)。最后,缓存通过将 $ DVV $ 与修剪参数版本合并来更新 $ DVV $(第12行)。通过这种方式,Ekko实现了缓存大小的自适应管理,从而减少了其内存占用。1
PruneCache
缓存命中分析:我们分析了参数更新何时命中缓存。假设副本 $ s $ 接收到来自副本 $ r $ 的同步请求,副本 $ r $ 持有分片知识 $ VV_r $。如果 $ VV_r $ 主导 $ DVV_s $,则请求命中缓存,其后续操作(例如选择参数)仅涉及缓存中的参数。
Ekko确保更新缓存的使用不会影响无日志参数同步的最终一致性:同步过程需要选择副本 $ s $ 中满足 $ v_p > VV_r[id_p] $ 的参数 $ p $。由于更新缓存保持不变性 $ DVV_s[id_{p_{\text{uncached}}}] \geq v_{p_{\text{uncached}}} $ 且 $ VV_r $ 主导 $ DVV_s $,因此该过程选择的参数集与之前的算法相同。
参数更新缓存在减少选择参数的成本方面特别有效。根据我们生产环境中部署的缓存跟踪数据,99.4%的同步请求可以命中缓存,从而将选择参数的成本降低了99%。
4.4.2 分片版本
我们引入分片版本来减少同步副本时的网络成本。分片版本捕获了副本上分片数据的部分因果关系,并且它们比版本向量小得多。我们可以允许副本维护分片版本列表,每个列表与一个邻居副本相关联。通过这种方式,副本可以通过交换和比较分片版本来识别可能更新的分片。正式地,我们将分片版本定义如下:
定义2(分片版本):分片版本 $ sv = (c, id) $ 是一个由计数器 $ c $ 和标识生成该版本的副本的 $ id $ 组成的对。计数器 $ c $ 在每个副本的每个分片中单调递增。对于同一个分片 $ s $,当且仅当 $ id_1 = id_2 $ 且 $ c_1 \geq c_2 $ 时,$ sv_1 \succeq sv_2 $。
分片版本维护:在初始化时,每个副本为其分片生成分片版本。当训练工作器发出参数更新时,副本会生成一个新的分片版本。由于每个分片都有一个主副本,因此在正常情况下,只有一个副本生成分片版本。
一旦接收到同步请求,响应副本(记为 $ s $)会回复其分片版本 $ sv_s $,同时附带 $ VV_s $ 和更新的参数。一旦请求副本(记为 $ r $)收到此回复,它会以原子方式完成以下操作:(1)将其分片知识 $ VV_r $ 与接收到的 $ VV_s $ 合并(合并结果记为 $ VV’_r $);(2)如果 $ VV’_r = VV_s $,则将其分片版本 $ sv’_r $ 更新为 $ sv_s $;否则,如果 $ VV’_r \neq VV_r $,则生成一个新的分片版本。需要注意的是,当 $ VV_r = VV_s $ 时,为了避免活锁,Ekko会根据确定性规则(例如选择数值较大的分片版本)从 $ s $ 和 $ r $ 中选择一个分片版本。
我们实现了簿记技术 [51],用于维护与不同副本相关联的分片版本列表。通过结合分片版本和簿记技术,Ekko可以有效地减少同步相关的网络流量。例如,在我们的一个生产DLRS中,Ekko在同步过程中过滤掉了98%的分片。
使用分片版本进行同步:我们讨论分片版本如何促进同步。Ekko维护一个不变性:只有当分片知识 $ VV_1 $ 主导 $ VV_2 $ 时,$ sv_1 \succeq sv_2 $ 才成立。因此,只有当 $ sv_r \nsucceq sv_s $ 时,副本 $ r $ 才需要与副本 $ s $ 同步分片。此外,考虑具有相同分片可比分片版本的不同副本,Ekko更倾向于与具有最大分片版本的副本同步,因为较大的分片版本表示参数的最新版本。
4.5 实现细节
WAN优化:Ekko针对地理分布式部署进行了优化,这种部署包括多个数据中心内部网络和一个跨数据中心的广域网(WAN)。为了提高在这种部署下的性能,Ekko采用了WAN优化的模型更新传播策略。该策略为P2P同步构建了一个灵活的通信拓扑。它允许每个数据中心使用Zookeeper [31] 为每个分片选举一个本地领导者。领导者从其他数据中心拉取模型更新,而其他副本则从该领导者拉取更新。通过这种方式,Ekko使得大部分同步流量通过带宽充足的数据中心内部网络,只有少量同步流量通过WAN。需要注意的是,参数同步的实现并不依赖于特定的通信拓扑。Ekko可以使用其他覆盖拓扑来进一步提高同步性能。
故障容忍:Ekko使用请求路由器来容忍故障。路由器决定客户端请求的路由,并通过心跳检测副本的健康状态。如果路由器推测某个副本发生故障(无论是完全故障还是性能下降 [30]),它会阻止客户端(推理服务器和训练服务器)向该副本发送请求。同时,路由器会跟踪集群中副本的分片知识。如果之前被怀疑故障的副本恢复并向路由器发送心跳,路由器将指示该副本与集群中更新充分的副本同步。当同步完成后,路由器会将客户端请求重新定向到该副本。如果某个副本丢失了状态,它将使用新的ID重新加入集群。如果训练服务器在给定时间内无法联系到路由器,它们将停止发送参数更新,从而在网络分区的情况下尽力保护模型参数免受分歧 [5]。
5 SLO保护机制
Ekko允许模型更新直接传播到推理集群中的参数服务器。然而,这为推荐服务的SLOs带来了两个挑战:(i)网络拥塞可能导致关键模型更新被延迟;(ii)基于小批量偏差数据的模型更新可能对推理结果产生不利影响。
本节介绍了保护推理SLOs免受网络拥塞和偏差更新影响的机制。我们首先定义SLOs(见§5.1),然后描述一个SLO感知的模型更新调度器(见§5.2),最后讨论一个处理偏差更新的推理模型状态管理器(见§5.3)。
5.1 DLRS中的SLOs
DLRS有两类主要的SLOs:
- 新鲜度SLOs:衡量将新内容和用户纳入模型推理的延迟。这对于实时与用户交互的推荐服务(如TikTok和YouTube)至关重要。例如,这些服务通常需要及时捕捉新用户的兴趣,以确保他们有足够的参与度;否则,他们可能会因为失去兴趣而离开推荐应用。提高新鲜度SLOs通常会带来更好的用户体验。此外,新内容将获得更好的曝光,从而确保DLRS的繁荣。
- 质量SLOs:衡量用户体验和参与度。它们对DLRS的盈利能力有直接影响。例如,这类目标包括观看的视频数量和用户观看时间。
图4描述了推理服务器如何影响新鲜度和质量SLOs。一旦接收到请求,推理服务器会选择相关的用户和项目嵌入,然后聚合这些嵌入并将聚合后的嵌入发送给一个DNN,该DNN返回推荐项目的分数。DLRS最终返回一个按分数排序的项目列表。在这种情况下,新鲜度SLO基于推荐项目的最新时间戳来衡量(理想情况下,该时间戳应尽可能接近当前时间)。质量SLO可以基于项目的观看时间和点击次数来衡量。在实践中,Ekko在线维护大量新鲜度和质量SLOs。这些SLOs的实现由DLRS应用开发者贡献。
图4
5.2 SLO感知的模型更新调度器
Ekko通过SLO感知的模型更新调度器及其与P2P模型更新传播的集成,防止新鲜度和质量SLOs受到网络拥塞的影响。
5.2.1 模型更新的SLO感知优先级
Ekko在调度模型更新时计算一组优先级:
更新新鲜度优先级:Ekko计算更新新鲜度优先级 $ p_u $。该优先级基于以下观察设计:如果参数是最近创建的,则具有高优先级;否则,优先级相对较低。这是因为新创建的参数对推理结果的影响比长期服务的参数更大。例如,如果用户的嵌入在推理服务器中不可用,但她的请求已经到达,DLRS将无法回答该请求,从而影响质量SLOs。另一个例子是,如果推理服务器上的嵌入表未包含某个项目,DLRS将不会推荐该项目,从而影响新鲜度SLOs。
更新重要性优先级:Ekko根据梯度 $ g $ 为每个模型更新计算更新重要性优先级 $ p_g $。该优先级最初受到研究的启发,研究表明梯度大小 $ \mid g \mid$ 如何影响DNN的推理结果 [6, 28]。然而,在Ekko中,简单地采用梯度大小是不够的。作为一个共享的DLRS,Ekko在共享网络上复用了来自不同模型的更新。因此,Ekko必须有办法比较具有不同分布的梯度大小。为此,我们定义 $ p_g = \mid g\mid / \bar{\mid g \mid} $,其中 $ | g | $ 表示梯度的1-范数,$ \bar{\mid g \mid} $ 表示最近模型更新的平均梯度大小。直观地说,该定义对梯度大小进行了归一化,从而使它们具有可比性。 |
模型优先级:在DLRS中,模型通常以不同的速率接收推理请求,这表明它们在衡量SLOs整体满意度中的重要性不同。为了考虑这一点,Ekko允许处理大多数请求的模型被分配更高的优先级,而很少接收请求的模型优先级较低。为此,我们定义模型优先级为 $ p_m = c_m / \sum_{i=1}^M c_i $,其中 $ c_m $ 是模型 $ m $ 的请求计数,$ \sum_{i=1}^M c_i $ 表示所有 $ M $ 个模型的总请求计数。
优先级组合:我们将上述优先级组合起来,计算模型更新的总体优先级 $ p $:
\[p = (p_g + p_u) \cdot p_m\]其中,重要性优先级 $ p_g $ 和新鲜度优先级 $ p_u $ 都已归一化,以便可以相加。然后将它们的和乘以模型优先级 $ p_m $。
需要注意的是,Ekko并不要求用户仅使用上述优先级。一些Ekko用户有自定义的优先级定义,包括更新计数、更新间隔和嵌入表中参数的位置。这些自定义优先级针对某些DLRS工作负载 [69],但它们不够通用,无法包含在默认设置中。Ekko通过支持用户定义函数(UDFs)来定义优先级,从而适应这些自定义优先级。
5.2.2 调度器实现
模型更新调度器在每次更新生成时计算其优先级。它需要确保优先级计算的开销可以忽略不计,否则它可能成为模型更新的瓶颈。为了实现这一点,调度器将优先级相关统计信息(例如每个模型 $ m $ 的 $ \bar{\mid g \mid} $ 和 $ p_m $)的维护卸载到一个后台线程中。此外,为了限制内存开销,它使用分位数草图(例如DDSketch [52])计算时间窗口内的第 $ k $ 百分位优先级 $ p_k $,其中 $ k $ 是由算法管理者设置的比率。Ekko使用WebAssembly [27] 执行用户定义的优先级计算,以实现UDFs之间的高效隔离。
将调度器集成到参数服务器中:为了实现优先级调度的承诺,我们必须将调度器集成到已启用无日志P2P同步的参数服务器中。为此,我们为每个参数引入重要版本(记为 $ sigv $),并为每个分片引入重要知识(记为 $ SVV $)。此外,Ekko为每个分片分配一个临时重要参数存储 $ store_{\text{significant}} $ 和相应的临时重要知识 $ T SVV $,以支持带有优先级调度的P2P同步。
算法2描述了带有优先级调度器的无日志P2P同步。假设我们有一个来自副本的模型更新,Ekko计算 $ p $。如果 $ p \geq p_k $,Ekko设置 $ sigv = v $,其中 $ v $ 是该更新的参数版本;否则,$ sigv $ 保持不变。然后,Ekko使用 $ sigv $ 构建 $ SVV_{\text{other}} $ 并调用
函数(第1行)。如果Ekko在同步中未应用优先级,副本将交换 $ SVV $ 并执行 1
UPDATESVV
函数。在将参数写入持久参数存储时,Ekko通过执行 1
UPDATESVV
函数修剪被覆盖的参数(第4行)。需要注意的是,副本会估计模型更新到达自身所需的时间。因此,当网络拥塞发生时,服务器将出现更新超时。在这种情况下,Ekko使用 1
WRITESTOREPARAMETER
函数(第15行)触发同步中的优先级调度器。一旦接收到请求,副本优先返回重要参数存储中的参数。1
PRIORITISEDSYNC
算法2
5.3 推理模型状态管理器
Ekko使用推理模型状态管理器来保护SLOs免受有害模型更新的影响。该管理器监控推理模型的健康状况(即质量SLOs),并根据需求进行低延迟的模型状态回滚。
5.3.1 监控模型健康状况
Ekko基于以下思想监控模型健康状况:对于DLRS应用程序,它为推理模型创建基线模型。基线模型处理少量用户流量(通常小于1%)。它们与在线推理模型不同,因为它们携带延迟的状态。换句话说,它们使用先前的训练样本进行训练,通常比当前推理模型的训练样本早几分钟。
Ekko基于从推理服务器和客户端(例如用户设备)收集的指标来衡量模型健康状况。为了计算这些指标,Ekko定义了自定义的水印和触发器 [3]。其状态管理器仅在确信时(即观察到一段时间的监控数据)发出异常检测事件。需要注意的是,Ekko并不局限于使用特定的异常检测算法。它支持自定义的异常检测算法,例如常用于时间序列数据的算法 [61]。
我们将模型状态(即健康或不健康)的转换建模为复制状态机 [63],并在模型状态管理器中实现。该管理器通过检查与健康状况相关的指标和模型更新延迟,在时间戳 $ t $ 处评估并记录模型健康状况。时间戳 $ t $ 单调递增。管理器判断模型状态是健康、损坏还是不确定。当管理器确信模型状态发生变化(即健康或损坏)时,它会将此信息记录在其复制状态中。如果模型状态已损坏,管理器将客户端请求重定向到其他健康的推理模型,然后启动模型状态回滚。
5.3.2 低延迟模型状态回滚
Ekko使用见证服务器以低延迟回滚损坏的模型状态。见证服务器参与副本同步,但不参与模型训练。与参数服务器不同,见证服务器(i)不会立即将更新的参数刷新到参数存储中,(ii)在同步中不运行优先级调度。具体来说,Ekko将尚未刷新的参数更新插入日志中。日志附带有同步的物理时间戳(记为 $ t $)。如果在短时间内有多个同步操作,Ekko会合并它们的日志以节省空间。
模型状态管理器控制见证服务器启动状态回滚。假设模型状态在时间 $ t $ 处被认为是健康的,见证服务器会找到一个满足以下两个条件的时间戳 $ t_{\text{max}} $:(i)它小于等于 $ t $;(ii)它不在任何发生损坏状态的时间间隔内。然后,见证服务器刷新时间戳小于等于 $ t_{\text{max}} $ 的日志。模型状态管理器记录此 $ t_{\text{max}} $,$ t_{\text{max}} $ 稍后将用于见证服务器以恢复健康的模型状态。通过这种方式,我们可以确保见证服务器上的参数存储 $ store_{\text{healthy}} $ 始终保存健康的模型状态。
回滚过程:图5展示了回滚模型状态的过程。假设发现某个模型已损坏,模型状态管理器首先通知参数服务器停止接受该模型的训练请求( 1 )。然后,它指示参数服务器停止基于优先级的同步,清除其 $ store_{\text{significant}} $,并重置 $ T SVV = SVV $。接着,管理器等待参数服务器和见证服务器上的模型分片收敛。随后,管理器选择见证服务器启动状态回滚( 2 )。我们需要确保恢复的模型分片可以一起使用。因此,管理器仅选择见证服务器上 $ store_{\text{healthy}} $ 中 $ t_{\text{max}} $ 在一个小时间窗口内的分片。
图5
一个关键设计是,见证服务器会比较 $ store_{\text{healthy}} $ 和其当前状态以找到状态差异( 3 )。由于更新参数的局部性,这种差异通常很小。因此,我们只需将差异写入参数服务器以恢复状态。我们需要确保写入操作能够成功。因此,写入的参数被分配比参数服务器上当前参数版本更大的参数版本( 4 )。之后,管理器等待参数服务器和见证服务器上的模型分片收敛。最后,Ekko会在恢复的模型上恢复少量流量。当该模型的健康状况指标恢复正常时,管理器通知参数服务器恢复接受请求( 5 )。
需要注意的是,如果见证服务器发生故障,其未刷新的更新日志将被丢弃。这有助于Ekko防止潜在的损坏更新被刷新。如果参数服务器或见证服务器发生故障(或重新加入集群),回滚过程将重新执行。
6 评估
在本节中,我们通过测试床和生产环境实验评估Ekko的以下方面:(i)Ekko的更新延迟及其与数据中心数量的可扩展性(§6.1.1);(ii)Ekko在异构WAN中的更新延迟(§6.1.1);(iii)Ekko中实现的优化的性能分解(§6.1.2);(iv)Ekko在大规模生产DLRS中的实际延迟和可用性(§6.2.1);(v)低延迟模型更新在在线服务中的好处(§6.2.1);(vi)在繁忙网络中使用模型更新调度器的有效性(§6.2.2);(vii)模型损坏时回滚模型的延迟(§6.2.2)。
除非另有说明,更新延迟是指更新提交时间与更新在所有副本中可见时间 [68] 之间的最大时间差(无故障场景)。在所有实验中,我们测量更新延迟并报告所有更新的平均值。
6.1 测试床实验
我们在一个30台服务器的集群中进行测试床实验。每台服务器配备24核CPU、64 GB内存和5 Gbps网络链路。我们将每三台服务器分组为一个数据中心(DC),以模拟多DC场景,最多形成10个DC。我们选择其中一个DC作为训练导向的DC,该DC从一台服务器(充当DLRS客户端)接收模型更新。我们让其他DC作为推理导向的DC,并将它们与训练导向的DC连接。DC间的带宽为4,800 Mbps(除非另有说明),模拟WAN。
我们的测试床实验包括两个工作负载。第一个工作负载训练一个通常用于生产环境的大型排序模型。在此工作负载中,我们选择分片大小为0.4 MB。第二个工作负载使用按时间顺序排序的Criteo Terabyte Click Logs [17] 训练Wide & Deep模型 [10]。我们使用21天的数据日志初始化嵌入表。为了确保实验可重复,我们记录模型更新轨迹并在实验期间重放它们。
6.1.1 更新延迟
我们在同构WAN和异构WAN中评估Ekko的更新延迟。这两种WAN在现实世界中都很常见。第一个基线是Adam [11],它通常用于参数服务器中,通过两阶段提交协议同步模型更新。我们的Adam实现移除了更新广播之间的等待时间,从而提高了网络利用率。第二个基线是Checkpoint-Broadcast,这是DLRS中应用模型更新的事实标准方法 [1, 21]。我们省略了与通用键值存储(如PaxosStore [73] 和 TiKV [29])的实验,这些存储提供写入操作的线性一致性。我们的早期采用结果表明,这些键值存储的写入吞吐量较低,比生产DLRS所需的吞吐量低几个数量级。
为了公平比较,Ekko和基线都使用DRAM进行存储 [57],并采用相同的主节点分配和负载均衡方案。我们进一步确保它们的传播都是网络受限的,并使用相同数量的分片。
同构WAN结果:我们首先在同构WAN中将Ekko与Adam进行比较。我们分别测量了1个DC(3个副本)、5个DC(15个副本)和10个DC(30个副本)的延迟。图6a和图6b显示了结果。可以看出,Ekko在生产环境和Criteo工作负载中的延迟显著低于Adam。具体来说,在运行生产工作负载的10个DC中,Ekko实现了2.6秒的延迟,比Adam的18.8秒延迟低7倍。我们还观察到,随着DC数量的增加,Ekko和Adam之间的性能差距也在扩大。原因是Ekko具有可扩展的P2P同步架构,并针对WAN优化了其传播拓扑。相比之下,Adam依赖主副本发送更新,受限于训练DC中有限的带宽。
图6
我们还将Ekko与Checkpoint-Broadcast进行比较。根据我们的实验结果,Checkpoint-Broadcast在WAN中同步4 GB参数需要超过7秒。总参数为113 GB。在10个DC的情况下,训练DC需要向所有其他推理DC发送113×9=1,017 GB的参数。因此,训练DC需要花费超过29分钟完成参数广播(因为WAN的带宽为4,800 Mbps)。这种广播延迟比Ekko实现的秒级延迟(例如2.6秒)高几个数量级。
异构WAN结果:然后我们在异构WAN中评估Ekko和基线。在此WAN中,我们将DC间带宽默认设置为256 Mbps。为了引入异构性,我们选择训练DC与另一个推理DC之间的一个链路,并将其带宽设置为128 Mbps。实验在每个DC中运行3个副本,总共10个DC。如图7a和图7b所示,Ekko在生产环境和Criteo工作负载中都能有效缓解慢速异构链路的影响。它允许副本以独立的速率同步,保持秒级同步延迟。这种低延迟性能显示了Ekko的无日志P2P同步在缓解异构网络路径不利影响方面的有效性。相比之下,Adam在WAN中受到慢速路径的影响,导致在生产工作负载中花费超过150秒同步副本,在Criteo工作负载中花费超过100秒。
图7
除了Adam,我们还考虑了其他基于日志的同步方法,例如Multi-Paxos [9]。我们可以让这些方法将一段时间内到达的更新聚合到一个日志条目中,以节省WAN带宽。然而,这些方法仍然受到异构链路的影响。这是因为它们基于网络中最慢的链路选择聚合间隔,导致许多其他链路利用率不足。
6.1.2 性能分解
我们希望了解Ekko同步中各个组件的有效性。因此,我们对生产工作负载(10个DC)进行了性能分解分析。我们首先配置Ekko仅使用分片知识(见§4.3)进行同步。此配置是本实验的基线,相当于最先进的P2P同步技术——版本向量(VV)[50, 51]。
图8显示了结果。仅使用VV时,Ekko需要76.3秒同步所有参数。启用更新缓存(§4.4.1)后,Ekko将延迟减少到27.4秒(即2.8倍加速)。通过分析更新缓存的跟踪数据,我们发现缓存在生产工作负载中实现了100%的命中率。需要注意的是,测试床服务器上每个副本的总内存比生产服务器少10倍,这意味着分片中的参数比实际场景中少。随着分片中参数数量的增加,VV将花费更多时间进行同步,而更新缓存可以保持低延迟。
图8还显示了分片版本(§4.4.2)的效果。通过进一步启用分片版本,Ekko将延迟从27.4秒减少到6.0秒(即4.6倍加速)。这表明跳过未更新的分片可以有效减少同步带来的网络消耗。
图8
最后,启用WAN优化(§4.5)后,Ekko将延迟从6.0秒进一步减少到2.6秒(即2.3倍加速)。这表明P2P同步必须考虑WAN中每个链路的可用带宽,否则无法充分发挥其潜力。总之,启用Ekko中的所有组件使P2P同步总共加速了29.3倍(即2.6秒 vs. 76.3秒)。
6.2 生产集群实验
我们已经将Ekko部署到生产中超过一年。生产环境包括分布在6个地理分布式DC中的4,600台服务器。截至2022年,我们已使用Ekko支持多种推荐服务,包括短视频推荐、搜索和广告。每天有超过10亿用户使用这些服务。在本节中,我们报告Ekko在该生产环境中的性能。
6.2.1 模型更新
我们从生产环境中收集跟踪数据,以分析Ekko在更新模型中的性能。生产环境中有数百个DLRS模型(总共40 TB参数或2500亿个键值对)。每个参数分片的大小从0.1 MB到20 MB不等,具体取决于模型大小。Ekko每秒可以执行10亿次更新(即212 GB/s)。
关于延迟性能,Ekko在所有DC中同步参数花费2.4秒,仅在训练DC中花费0.7秒。同步流量仅占总网络流量的3.0%,反映了Ekko作为参数服务器后台同步服务的有效性。Ekko的低延迟、高吞吐量性能并未影响系统可用性。自部署以来,Ekko在参数读写操作中实现了>99.999%的可用性。
更新缓存分析:我们对更新缓存在各种现实推荐服务中的性能特别感兴趣。我们的跟踪数据显示:更新缓存只需缓存0.13%-0.2%的参数,即可实现>99.4%的命中率。这些性能结果验证了更新局部性的广泛存在。事实上,我们的生产推荐服务每小时平均更新3.08%的参数。
我们选择一个更新密集的DLRS模型来揭示最坏情况下的更新局部性。图9显示了480分钟窗口内更新参数的比例。此时间窗口涵盖了我们生产DLRS一天中最繁忙的时间。我们报告了不同时间间隔的比例。在10分钟间隔内,只有4.3%的参数被更新,并且这一比例在480分钟的时间窗口内保持稳定。在60分钟间隔内,我们观察到类似的模式,比例仅略微增加到约10%。实际上,许多其他模型的更新工作负载较少,其更新参数的比例低于此模型。
图9
低延迟模型更新的好处:我们希望了解低延迟模型更新是否真的能提高推荐服务的质量。为此,我们在短视频推荐服务 [65] 中进行了为期15天的在线A/B测试 [64]。该服务包括一个多阶段管道 [10, 15]。我们仅在排序阶段进行实验。我们将排序模型分为两组:实验组和对照组。每组接收1%的总流量用于训练和推理。我们通过将实时日志缓存到分布式文件系统中,将用于训练对照组模型的数据(即事件日志)延迟20分钟。
我们的A/B测试结果显示:与对照组相比,实验组在所有推荐视频中新鲜视频(发布在一小时内)的比例增加了3.82%。这意味着系统向实验组用户推荐了更多新鲜视频。此外,实验组用户滑动视频列表的比例减少了1.30%,而浏览视频的总时间增加了1.68%。这意味着实验组用户花费更多时间观看视频,并对推荐视频更感兴趣。
最后,实验组用户点击评论的比例增加了2.17%。这意味着实验组中的用户互动增加。值得注意的是,在现实世界的多阶段DLRS中,1%-3%的改进被认为是显著的 [10, 21, 71]。事实上,自从在DLRS的更多阶段启用低延迟模型更新以来,我们观察到推荐质量的更显著改进。
6.2.2 SLO保护机制
我们还运行A/B测试来评估Ekko的SLO保护机制的有效性。
图10
SLO感知的模型更新调度器:我们将排序模型分为实验组(启用优先级调度器)和对照组。每组有1%的训练和推理流量,并部署到专用服务器以避免流量干扰。我们监控反映新鲜度SLOs的指标:推荐结果中新鲜视频(即过去一小时内发布)的数量。为了模拟网络拥塞,我们将模型更新的可用带宽减少了92%。模型更新调度器(i)使用默认的优先级计算规则(定义见§5.2.1),(ii)将百分位优先级 $ k $ 设置为99($ k $ 定义见§5.2.2)。
A/B测试结果显示,在实验组中,Ekko将同步流量减少了92%,并保持重要更新的低延迟。相比之下,对照组在繁忙网络中发送模型更新时无法区分更新。因此,对照组延迟了SLO关键更新,其SLO指标下降了2.32%。这种下降在实践中是显著的,因为该SLO指标是决定DLRS利润的关键因素。
在线模型状态回滚:我们评估在线回滚模型状态的延迟。我们将Ekko与checkpoint恢复方法进行比较。为了公平比较,我们让回滚延迟排除(i)Ekko中收集SLO指标的时间,以及(ii)等待分歧参数收敛的时间。我们部署了5个见证服务器。对于每个见证服务器,我们分配113 GB参数和800 Mbps网络带宽。
在实验过程中,我们通知Ekko的模型状态管理器将DLRS模型的状态回滚到1分钟前的版本。然后,管理器通知所有见证服务器识别过去1分钟内更新的参数。因此,见证服务器只需重新加载当前状态与早期状态之间的差异。因此,整个回滚操作仅需6.4秒即可完成。相比之下,checkpoint恢复方法无法感知模型状态的最近更新。因此,它必须重新加载整个状态,花费1,157秒完成(比Ekko慢180倍)。
7 相关工作
数据复制系统:Ekko中探索的参数同步问题与之前的数据复制工作相关。现有的数据复制系统通常探索如何利用应用程序的特性来提高数据复制的延迟性能 [13, 40, 45, 53]。例如,Egalitarian Paxos [53] 利用了状态机命令的低干扰率,Gemini [40] 利用了混合一致性操作,而COPS [45] 和PNUTS [13] 则利用了互联网服务对宽松一致性的容忍性。与这些系统不同,Ekko利用了DLRS特有的模型更新局部性和最终一致性模型来加速模型参数(而非通用数据)的同步,使Ekko在设计空间中独树一帜。
ML系统中的带宽节省技术:优先处理模型更新的问题与分布式ML系统中的带宽节省技术相关。这些技术通常涉及梯度压缩 [4, 6, 28, 44],在繁忙网络中优先处理大梯度,预期这些大梯度对训练模型的最终准确性有显著影响。与这些技术不同,Ekko针对模型推理场景,人们关心的是众多推理SLO指标,而不仅仅是模型的准确性。因此,Ekko不仅依赖梯度大小,还进一步考虑模型的新鲜度和优先级来调度模型更新。
ML系统中的SLO感知调度:在调度中考虑SLOs的问题在之前的ML系统中已有探索。模型服务系统通常将推理延迟作为主要SLO,以指导与推理相关的计算任务的调度 [16, 25, 70]。模型训练系统,例如Pollux [59] 和KungFu [48],使用ML特定的SLOs(例如训练吞吐量和梯度统计)来决定如何调度训练工作器。与这些系统相比,Ekko关注新鲜度和质量SLOs,并支持在调度模型更新时使用这些SLOs。
8 结论
本文提出了Ekko,一种能够在秒级延迟下更新大规模模型参数的新型DLRS。Ekko具有高效的P2P模型更新算法,能够协调数十亿次模型更新,并将其高效传播到地理分布的数据中心中的副本。此外,它还具备SLO保护机制,能够保护模型状态免受网络拥塞和在线有害模型更新的影响。实验结果表明,Ekko比最先进的DLRS快几个数量级,证明了其新颖设计的有效性。