作者:宋顾杨 2023-08-24 08:49:27
云计算
云原生 分布式计算引擎Ray,不仅能成为大数据领域的基础设施,还可以成为AI等其它任何需要分布式系统的基础设施。本文将以实例的形式展示在云原生环境中,在没有Ray的情况下开发一个复杂的分布式系统需要考虑哪些问题、复杂性在哪里,并一步步揭秘如何利用Ray构建复杂的分布式系统。帮助大家了解Ray在构建分布式系统方面的便利性。
目前创新互联建站已为上千的企业提供了网站建设、域名、雅安服务器托管、网站运营、企业网站设计、祁东网站维护等服务,公司将坚持客户导向、应用为本的策略,正道将秉承"和谐、参与、激情"的文化,与客户和合作伙伴齐心协力一起成长,共同发展。
首先从一个实例开始,上图是我们最近构建的AutoML的例子,搭建自动分布式机器学习的服务。
图中虚线框起来的就是这个自动机器学习的服务,服务中有如下几个角色:proxy是一个常驻服务,在整个AutoML集群中负责提供服务的入口,会根据不同的AutoML的任务创建小方框的训练集群;小方框中又有两个角色,trainer和worker,trainer 是类似协调器的角色,协调一个完整的AutoML任务,在协调过程中不断地创建worker来完成整个AutoML的计算。
集群外部通过client接入AutoML服务,将AutoML需要的models和data组合发到proxy,proxy根据用户请求创建trainer,当然在这个过程当中需要通过K8S管理资源来创建trainer的Pod或者process,trainer会通过解析client 端的这个models和data计算需要多少个worker,同样通过K8S创建出对应的worker,开启整个训练任务。
每个worker训练完成后,trainer收集结果,检测整个AutoML任务是否完成,最后把结果返回给proxy,返回结果后,trainer和worker会被销毁。
这是一个比较完整的AutoML Service服务,可以看到其特点为:云原生、多角色、高弹性、动态化、有状态、频通信。高弹性主要体现在proxy和trainer两个角色中,他们都会在runtime的过程中不断地去申请资源。
接下来探讨一下如果在云原生环境实现这个case,需要考虑哪些事情?下面从两个方面进行分析。
从技术栈角度,假设当前是AI应用场景,主要编程语言是Python。首先需要开发单体应用,包括proxy、trainer、worker几个角色。在单体应用的编写中:
从编程语言角度,为了实现上面的分布式系统,需要应用下面的编程语言:
以上就是常规思路实现分布式系统需要考虑的技术栈以及编程语言。
我们可以看到整个云原生环境下分布式系统的构建是非常复杂的。但是经过分析可以看到,很多需要研发和维护的逻辑是通用的分布式系统的逻辑,跟实际的业务逻辑没有直接的关系。比如上面提到的AutoML的case,业务或者系统的开发同学更关注AutoML本身的逻辑。那么,是否有一个系统,能够将分布式系统的通用能力全部解决,使得系统或者业务的研发团队能够专注于业务逻辑本身呢?Ray的出现就是为了解决这个问题。
在正式分享如何利用Ray来构建分布式系统之前,先对Ray做一个简要的介绍。
首先从Ray在github上star数的发展来看,从2016年中开源到现在经历了两个里程碑,左上图中红线是Ray的star数历史数据,在2021年Ray的star 数已经超过了Flink,2023年5月迎来了另一个里程碑:超过了Kafka,目前距离Spark 的star数还有一定的距离,从star数的角度看,Ray的发展非常迅速。
右上是一张 Google 的PATHWAYS论文的截图,PATHWAYS在 Google 内部实际被看作是下一代 AI 的基础架构,在这篇论文中多次提到了Ray,认为Ray实际上非常有可能成为该基础架构中的分布式计算框架。
右中是最近被大家熟知的大语言模型公司OpenAI的信息,OpenAI在今年公开了GPT3.5和GPT4的分布式训练的部分细节,底层也是通过Ray来构建整个分布式系统的。
左下是2022年Ray社区做的偏向大数据领域的一项工作,利用云上的资源做数据排序,应用shuffle的能力,这套排序系统打破了世界记录,首次达到每TB 小于$1的成本(0.97$)。
右下是Ray社区近期在离线推理Batch Inference方面的一项工作,对比了Ray 和其他现有的方案,通过Ray进行数据处理,实现一个pipeline流程给训练系统提供数据,相比Spark,在Throughput方面有两到三倍的提升。
通过上面的信息,大家可以对Ray有一个high level的认知。
Ray 是由加州大学伯克利分校RISELab实验室发起的一个开源项目,RISELab实验室在业界非常知名,Spark 也发源于这个实验室。
Ray是一个通用的分布式计算引擎,由于Ray的通用性,很可能会成为新一代的计算技术设施。在分布式领域,由于Ray的整个生态,在 AI 领域更是可能成为统一的编程框架,这部分会在后面的开源生态部分做进一步介绍。
Ray 的通用性体现在哪里呢?实际上通过查阅Ray Core的API,可以看出Ray的设计思想是不绑定任何计算模式,把单机编程中的基本概念分布式化。
从API 的设计可以看出,Ray并不是一个大数据系统,尤其是Ray Core这一层没有任何大数据相关的算子,而是从单机编程的基本概念进行分布式化的。
具体如何分布式化?我们在单机编程中经常用到两个非常核心的概念,一个叫Function,一个叫Class,在面性对象的编程语言里面,基本上大家会围绕这两个概念进行代码开发,在Ray中会将这两个基本概念进行分布式化,对应到分布式系统就叫Task和Actor。
首先解释一下Task。Task是对单机编程中的Function进行分布式化,是一个无状态的计算单元。
上图是一个例子,有一个叫 heavy_compute的Function,它是一个CPU 密集型的运算,在单机内,如果要对它进行1万次运算,比如在单核里面会有左边的代码,一个简单的for loop;如果需要用到多核的能力,就需要用到多线程或者多进程。而在Ray里面,如果想利用多机的能力,要将function 进行分布式化,整个流程非常简单,只需要三步, 如右图所示。
可以看出,整个分布式的过程非常简单,而且编程的整个框架和流程也没有打破单机编程的习惯,这也是Ray整个核心 API 的一个核心能力,给编程者提供最大的便利性。
下面看一下Ray中object的概念。讲object的原因是想让大家理解一下为什么Ray可以把heavy_compute Function运行到另外一个节点并且可以把结果拿回来,这依赖于Ray底层的分布式object store。整体流程如上图左侧所示。
我们在Node 1运行heavy_compute function,这个 function 会使用remote通过Ray底层的调度系统调度到Node 2, Node 2会执行这个function,执行完成后,把结果put到本地的object store中,object store 是Ray中的一个核心组件,最终结果返回到Caller端是通过Ray底层的 object store之间的object传输,把结果返回来给Caller端。
从整个的流程看, heavy_compute.remote 返回的是一个ObjectRef,并不是最终的结果。ObjectRef类似于单机编程中的future,只不过它是分布式的future,可以通过ray.get获取最终结果。
Ray的分布式 object store是非常核心的组件,完美支撑了Ray整套分布式API 的设计,其特点如下:
上面讲解了Task 和object,接下来介绍一下Actor。Actor 也是非常简单的,是将单机编程的Class概念进行分布式化。
左图有一个Counter 类,是一个有状态计算单元,将它进行分布式化,如右图所示。
以上介绍了Ray的几个核心概念,接下来看一下刚刚讲的case,怎么利用Ray来构建这个分布式系统。
Ray 是一个集群化服务,有两种部署方式。
有了Ray集群以后,我们回到之前AutoML的架构图,这里已经加入了Ray系统,利用Kubernetes+Ray,如何实现这个分布式系统呢?
在用户看来,实际上已经看不到K8s这层资源了,通过刚才的介绍大家很容易想到实现的思路,就是利用Ray的Actor 和Task去分析一下整个系统哪些是有状态计算单元,哪些是无状态计算单元,然后对它进行分布式化。
下面简要讲解一下,上面架构图从右到左整个Service系统是怎么实现的。
worker是一个Task,所以需要封装一个function。function train_and_evaluate的主要逻辑是拿到model、训练数据集和测试数据集,完成单机的训练和评估,上面是已经抽象成单机的计算。然后通过ray.remote 把它变成一个Task。
Trainer需要对多个 worker进行调度,通过把不同任务调到不同 worker 上面并收集结果完成单个AutoML请求的计算过程。
首先有一个Trainer 类封装整个Trainer的业务逻辑。然后通过ray.remote 把它变成一个Actor,Trainer的train的方法通过两层loop实现对多个worker的调度,实际上是对上面实现的worker的train_and_evaluate function的remote 执行,这样就能在分布式系统中实现并发计算,并发计算完成后,Trainer收集结果并返回给proxy。
proxy是对外服务的入口,定义两个function:
这里值得注意的是,在部署 proxy 的时候需要设置一个name,用于服务发现。
用Ray Client 接入的时候,任何一个AutoML用户拿到Client,先通过ray.get_actor传入上面的actor name就可以获得proxy的句柄,然后可以通过proxy的句柄调用proxy方法从而实现AutoML的接入,最后通过调用proxy 的get_result拿到最终的运算结果。
接下来介绍一些细节。
第一个细节是资源定制。
在纯云原生的实现思路中,如果没有Ray,资源定制是写到 yaml 里边的。比如说训练需要多少GPU 或者计算节点需要多少CPU,都是在 yaml 中定制 container 的规格。
Ray提供了另外一个选择,完全无感知的代码化的配置,用户可以在 runtime 的时候,或者在Ray的Task 或 Actor 的decorator 中加一个参数,就可以通过Ray系统的调度能力分配相应的资源,达到整个分布式系统资源定制的目的。
Ray的资源定制除了支持GPU、CPU、Memory 之外,还可以插入自定义资源。然后Ray的调度还有一些高级功能,比如资源组,或者亲和性和反亲和性的调度,目前都是支持的。
第二个细节是运行时环境。
在分布式系统中,往往不同分布式系统的组件对环境的要求是不一样的。如果使用常规思路,就需要把环境固化到image里面,通过 Dockerfile 去定制环境。
Ray实现了更灵活的选择,也是代码化的,可以在runtime创建Task或Actor之前的任意时刻定制指定计算单元的运行时环境。上图中给worker 的 Task 设定一个runtime_env,定制一个专属的Python版本,并在该版本里面装入一些pip包,完成面向Python的隔离环境的定制。这时Ray集群内部会在创建这个Task之前去准备该环境,然后将该Task调度到该环境执行。
Ray的运行时环境是插件化的设计,用户可以根据自己的需求实现不同的插件,在Ray中原生支持了一些插件如Pip、Conda、Container等,只要是跟环境相关,不只是代码依赖,也可以是数据依赖,都可以通过插件去实现。
右下图从运行时环境这个角度看,以Python为例,隔离性的支持力度有如下几个维度,一个是 Process 级别的隔离,第二是 Virtual env 级别的隔离,第三是 Conda 级别的隔离,最后是 Container级别隔离。
从隔离性来说,从右到左是由弱到强的,Process 的隔离性是非常弱的,Container 隔离性是更强的。
从用户体验来说,环境定制上 Container 是更重的而Process 是更轻的。
所以在Ray中用户可以根据自己的环境定制的需求选择需要定制的环境的粒度。有些人需要完全的Container 级别的隔离,有些人Process 级别的隔离就足够了,可以根据自己的需求进行选择。
第三个细节是运维与监控。
Ray提供了 Ray Dashboard。Ray dashboard实现了整个Ray集群包括Ray Nodes、Ray Actors等各种维度信息的透出;另外,还有集群内的Logs和events,比如某个Actor的某个方法执行异常,Ray会把堆栈通过 event收集到dashboard中,方便迅速定位问题;除此之外还有profiling 工具,Ray dashboard 可以支火焰图,还可以一键看到任意一个Actor或Task的进程状态或者堆栈。
除了Ray dashboard,Ray还提供了黑屏化的Ray State Client,同样可以通过 Ray State Client 去 query 整个集群的状态。
在监控方面Ray集成了Metrics的框架,用户可以直接调用Ray的metrics 的接口写入metric,然后在Ray dashboard中通过iframe的形式嵌入了Grafana来做一些简单的监控。
下面介绍一下Ray的架构。Ray在架构上与很多大数据系统类似,有一个主节点head节点,其他是 worker 节点。
在主节点里有GCS角色(Global Control Service),GCS主要负责整个集群的资源调度和节点管理,类似于Hadoop架构中Yarn里边的 Resource Manager。
Ray的worker节点主要有Raylet角色。除了做单机的进程管理和调度之外,比较关键的还有刚刚讲过的分布式的object store,是集成到Raylet进程里面的。
上图是我们做的一个实验,除了Ray+云原生的实现方式,我们也写了一套代码以云原生的方式来实现相同逻辑。代码已经放在上图下方GitHub 的repo上面,大家有兴趣可以查阅。
这里介绍一下实验评估结果:
最后来介绍一下Ray的开源现状。
从Ray的活跃度来看,Ray从 2016 年开源至今,活跃度持续稳定增长。目前社区有超过 800个Contributor,Star数超过26K。
Ray在中国有由蚂蚁长期维护的中文社区。
Ray forward已经在国内举办了五届,2023年7月2日蚂蚁刚刚举办了最新一届的Ray forward。从五届Ray forward可以感受到一个趋势,在最开始的两年,大部分的talk都是蚂蚁和加州大学伯克利分校RISELab实验室的人员分享,而近两年已经有越来越多的国内公司来Ray forward分享他们自己的议题,今年是议题最多的一次。
刚刚讲的从Ray的概念还有整个case来看,实际上是Ray底层Core的核心能力。
Ray的生态花费了非常大的精力在 AI 领域,上图是Ray 2.0的核心概念,叫Ray AIR(Ray AI Runtime)。Ray AIR的设计思想是在AI pipeline 的各个处理流程中去集成各种各样主流的工具,比如数据处理、训练、Tune、Serve等。
如果利用 Ray去构建一个AI的pipeline,在数据处理方面可以选择Spark,也可以选用Mars或Dask等Python的科学计算工具,也可以选择Ray原生的Ray Dataset;在训练方面,可以根据业务需求选择PyTorch, TensorFlow 等训练框架。
Ray AIR定位是一个可扩展的统一的机器学习工具集,最终可以帮助用户实现一个脚本就能够将整个AI的pipeline构建起来,这是一种融合计算的思路。
在没有Ray之前,整个pipeline会有多个系统串联起来,有了Ray之后,底层会有统一的 runtime 来完成编排和调度。
上图是一张比较老的图,主要是为了展示Ray的生态。主要分为两部分library,一个是Ray的Native Libraries,一个是Third Party Libraries。
Ray的企业应用是比较广泛的。
除了刚刚讲到的大语言模型场景下的OpenAI之外,上图列出了已经集成Ray很长时间的一些企业。整体来看,目前国外的发展更多一点,国内相对少一点,国外的很多大厂,包括一些传统企业,都在利用Ray来构建他们底层的分布式系统。
上图汇总了目前做大模型训练的一些已经集成了Ray的开源框架。
上图是Alpa的详细架构,可以看到Ray主要在中间层。Alpha 项目可以自动做到层间和层内两个角度并行化,从整个创新角度看是比较领先的分布训练框架。
上图是除了 AI 以外的集成了Ray的能力的开源框架。
Q:Ray的使用场景其实还是蛮多的,尤其在蚂蚁内部基于Ray构建了很多框架,比如刚刚提到的GeaFlow,一个流图计算框架,是一个比较大的方向;另外刚刚提到的隐私计算是另外一个方向。那除此之外,蚂蚁内部还有类似于函数计算系统,函数计算也可以是基于Ray来构建,从Ray的API可以看出Ray做函数计算还是非常方便的;除此之外还有科学计算、在线机器学习,最近我们也在探索搜推引擎能不能基于Ray来构建。
AI 方面在蚂蚁内部用得比较多的是AI在线服务,怎么应用一个或多个模型或大模型提供推理服务,整个从外围的资源编排调度,包括failover都可以利用Ray Serve做在线服务的支持。Ray在AI 方面的应用比较广泛,包括刚刚提到的Ray AIR涉及到的从 AI 的数据处理、到训练、到Ray Tune,以及最近的大模型场景,都是利用Ray来完成底层分布式底盘的。
Q:是的,如果是基于Ray原生的object store,需要把结果 put 到Node 2 里面,在 put 完之后,Node 2 异常退出了,那数据就可能丢了。当然用户可以通过Ray原生的血缘的能力,或者用户自己实现的failover能力去进行恢复,如果需要保证不丢数据,则需要实现高可用,对object store做一个扩展。目前来说,Ray object store的面向场景是做计算引擎中间结果的存储,它并不需要做持久化存储。
Q:我觉得是互不冲突,从Ray的设计思想来看,不对标任何一个大数据系统。刚刚也提到在Ray的整个生态中,用户也可以把 Spark跑在Ray上面,甚至还有一些项目在做Ray on Spark。在数据处理领域Spark有它的非常核心的能力。如果想在训练之前做简单的数据预处理,不需要牵扯到复杂的算子,这种情况下可以直接用Ray,但是如果计算场景比较复杂,比较偏向于大数据处理,并且用到比较复杂的shuffle逻辑或者比较复杂的算子,还是可以利用 Spark进行处理,然后再对接 Ray生态,用户根据自己的计算场景来进行技术选型。
Q:我对Akka的API是什么样子印象不是很深了,我理解它应该跟Ray的API 还是有很大区别的。Ray主要是从编程的角度出发,有Task和Actor,Ray中的Actor model 跟传统的Actor model 的概念还是有一点区别的。Ray的Actor更偏分布式,目前没有面向单机线程间交互的场景。
Q:我们认为容错有两个维度。
第一个维度是粗粒度进程级别的容错,因为Ray交付的是一个进程,无论是Task还是Actor。Task/Actor的failover,首先是Task/Actor所在进程的一个探活,识别其是否异常退出,这是分布式系统基础能力,在Ray里面是完全由Ray来实现的;其次是进程维度的failover,当Task尤其是Actor异常退出后,把异常退出实体重新调度起来进行新的实例化,也是由Ray负责的。也就是说,粗粒度的恢复是可以完全由Ray来负责。
第二个维度是细粒度,主要是进程的状态,因为Ray实际上是不去规定内部计算逻辑的,内部可以跑流批计算、可以跑AI 训练、可以跑任何的一个function ,Ray并不知道内部代码在做什么事情,所以状态恢复方面,目前Ray主流的用法还是把状态恢复交给用户的业务代码自己负责。当然用户可以通过Ray接口知道当前是否被重启了,已经重启了多少次?用户之前状态需要自己做一些checkpoint,可以存到存储里面或者存到 Ray底层的object store 里面。
用户可以根据不同的failover的可靠性要求做具体方案。但总体来说,一般的用法是粗粒度的failover由Ray托管,细粒度的状态恢复是由Ray上的应用自己来做恢复。
分享题目:云原生场景下如何利用Ray快速构建分布式系统
链接分享:http://www.mswzjz.com/qtweb/news44/202144.html
网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联