Dataflow模型的使用, 使得大数据计算的批处理和流处理融合为一体. 但是, 现有的针对大数据计算的集群资源调度框架, 要么面向流处理, 要么面向批处理, 不适合批处理与流处理作业共享集群资源的需求. 另外, GPU用于大数据分析计算时, 由于缺乏有效的CPU-GPU资源解耦方式, 降低了资源使用效率. 在分析现有的集群资源调度框架的基础上, 设计并实现了一种可以感知批处理/流处理应用的混合式资源调度框架HRM. 它以共享状态架构为基础, 采用乐观封锁协议和悲观封锁协议相结合的方式, 确保流处理作业和批处理作业的不同资源要求. 在计算节点上, 提供CPU-GPU资源的灵活绑定, 采用队列堆叠技术, 不但满足流处理作业的实时性需求, 也减少了反馈延迟并实现了GPU资源的共享. 通过模拟大规模作业的调度, 结果显示, HRM的调度延迟只有集中式调度框架的75%左右; 使用实际负载测试, 批处理与流处理共享集群时, 使用HRM调度框架, CPU资源利用率提高25%以上; 而使用细粒度作业调度方法, 不但GPU利用率提高2倍以上, 作业的完成时间也能够减少50%左右.
The use of the Dataflow model integrates the batch processing and stream processing of big data computing. Nevertheless, the existing cluster resource scheduling frameworks for big data computing are oriented either to stream processing or to batch processing, which are not suitable for batch processing and stream processing jobs to share cluster resources. In addition, when GPUs are used for big data analysis and calculations, resource usage efficiency is reduced due to the lack of effective CPU-GPU resource decoupling methods. Based on the analysis of existing cluster scheduling frameworks, a hybrid resource scheduling framework called HRM is designed and implemented that can perceive batch/stream processing applications. Based on a shared state architecture, HRM uses a combination of optimistic blocking protocols and pessimistic blocking protocols to ensure different resource requirements for stream processing jobs and batch processing jobs. On computing nodes, it provides flexible binding of CPU-GPU resources, and adopts queue stacking technology, which not only meets the real-time requirements of stream processing jobs, but also reduces feedback delays and realizes the sharing of GPU resources. By simulating the scheduling of large-scale jobs, the scheduling delay of HRM is only about 75% of the centralized scheduling framework; by using actual load testing, the CPU resource utilization is increased by more than 25% when batch processing and stream processing share clusters; by using the fine-grained job scheduling method, not only the GPU utilization rate is increased by more than 2 times, the job completion time can also be reduced by about 50%.
分布式批处理[
批处理作业的资源调度框架一般采用尽力而为的策略, 通过先来先服务、资源的公平共享以及容量调度等方法, 尽量降低作业的完成时间, 提高系统的吞吐量, 确保不同批处理作业之间资源分配的公平性. 流处理作业的资源调度框架要求一次性分配足够的资源, 再通过资源的弹性扩大或者缩小, 尽量降低作业的处理延迟. 对于流处理作业, 如果采用公平资源分配, 可能出现资源不足的现象. 例如, 总的资源数为100个CPU核, 两个流处理作业都需要60个CPU核, 采用公平分配, 每个作业获得50个CPU核, 都无法达到资源需求.
面向流处理作业的资源调度框架和面向批处理作业的资源调度框架具有各自的特点, 分别被应用于流处理和批处理作业的大数据应用中. 但是自从Google的Dataflow[
但是, 现有的集群资源调度框架, 在流处理作业中采用紧耦合方式, 在批处理作业中采用松耦合方式, 因此无法感知作业的类别来提供不同的资源调度策略, 达到批处理/流处理作业的资源共享要求. 如果简单地将这两类作业共享在一个集群基础设施上, 当使用批处理资源调度框架的公平共享策略时, 就无法一次性为流处理作业分配足够的资源, 不能满足SLO; 当采用流处理资源调度框架的弹性调度策略, 就必然会引起流处理作业和批处理作业之间资源分配的冲突问题. 在这种情况下, 就需要一种新的集群资源调度框架, 能够将批处理作业的资源分配策略和流处理作业的资源分配策略统一起来, 实现批处理作业和流处理作业之间的资源共享, 确保两类不同作业的资源分配需求.
文献[
随着大数据处理技术的发展, 大数据应用场景越来越广泛, 大数据分析过程中对于吞吐量和延迟的要求越来越高, 并且随着一些声音、图像等数据的出现, GPU开始在大数据分析中被广泛使用. 但是, GPU主要应用于计算密集型的大数据计算, 不太适合I/O需求巨大的大数据处理场景, 所以还需要CPU进行调度. 现有的集群资源调度框架, 要么针对大规模的CPU集群, 要么针对GPU集群, 对于CPU计算与GPU计算交替存在的大数据计算场合, 无法将CPU计算和GPU计算有机地统一起来, 缺乏CPU-GPU资源之间的解耦, 不能达到全面利用CPU和GPU资源的目的. 当GPU计算运行时, CPU就处于空闲状态. 另外, 当CPU资源全部被使用时, 就可能导致GPU计算缺乏CPU资源而无法运行, 例如Shuffle操作等待CPU资源, 造成GPU资源的浪费.
目前, 面向大数据计算的CPU-GPU异构集群资源调度框架, 基本上通过扩展CPU集群的功能来实现, 还没有真正意义上的CPU-GPU集群资源调度框架. Yarn[
鉴于以上的问题, 面向Dataflow模型, 建立一种适用于批处理作业和流处理作业融合的异构集群资源混合式调度框架就成为本文的核心目标. 通常情况下, 混合式调度框架应该具有以下特征: 一是可以感知作业的特征, 不同类型的作业采用不同的资源调度策略; 二是CPU资源和GPU资源的灵活耦合方式; 三是尽可能共享使用计算资源, 以便达到集群资源的高效使用; 四是调度开销要尽可能的小, 使得调度器能够管理更多的计算资源和调度更多的作业.
本文的主要贡献是提供一种全新的面向CPU-GPU异构集群的混合式资源调度框架(heterogeneous resource management, HRM), 满足批处理和流处理作业的资源调度需求, 实现CPU计算资源和GPU计算资源的合理分配, 提高系统的资源利用率. 其主要工作以及创新点包括以下几个方面.
(1) 提出了基于共享状态的混合资源调度框架. 通过感知用户的作业类型, 采用乐观封锁协议和悲观封锁协议相结合的策略, 实现流处理和批处理作业的一体化资源调度框架;
(2) 提出了资源单元的概念, 可按照CPU-GPU之间的亲和性[
(3) 通过在资源单元上实施队列堆叠, 使得流处理作业的资源可以一次性确保获得, 批处理作业的资源可以排队获得. HRM在同一个资源单元上设置批处理队列和流处理队列, 形成队列的堆叠. 流处理队列具有高的优先级别. 当批处理作业与流处理作业产生资源竞争时, 由于流处理作业运行在流处理队列中, 具有较高的优先级别, 因此可以确保流处理作业的资源需求. 而批处理作业则采取尽力而为的调度策略;
(4) 通过容器在队列排队等待方式, 减少反馈延迟带来的资源浪费, 提高资源利用率. 资源调度过程是一个不断循环的过程, 即资源释放、状态通知、资源分配、资源再释放. 这种循环过程存在两个问题: (a) 作业释放资源, 但是心跳间隔时间还没有到来, 这部分资源就处于空闲状态; (b) 资源管理器收到心跳通知后, 进行资源的分配. 从资源管理器收到心跳信息到新的分配的容器运行之前, 计算节点上的资源空闲. 为了应对这类延迟, HRM为计算节点设置了队列系统, 一旦运行中的容器结束, 队列中的容器立即从等待状态转换为执行状态, 避免了反馈延迟;
(5) 通过多容器在队列中同时运行机制, 实现GPU设备的细粒度资源分配, 使多个任务可以共享同一个资源单元, 提高GPU设备的使用率.
本文实现了HRM与Spark编程框架的对接, 并进行了模拟测试和实际负载的测试. HRM系统在8台包含双CPU以及双GPU设备的集群上进行了实际负载的测试, 通过运行Spark编程框架对应的流处理和批处理的测试基准以及大量的GPU视频流应用, 与集中式资源调度框架进行了比较. HRM的调度延迟只有集中式调度框架的75%左右; 另外, 针对实际负载测试批处理与流处理共享集群资源时, 使用HRM调度框架, CPU资源利用率提高25%以上; 使用细粒度资源分配方法, 不但GPU利用率提高2倍以上, 作业的完成时间也能够减少50%左右.
本文第1节介绍集群计算资源调度框架的现状. 第2节描述HRM的总体结构. 第3节讲述CPU-GPU计算资源的分割模式. 第4节描述批处理作业和流处理作业资源调度过程. 第5节介绍基于Spark框架的应用感知策略. 第6节给出HRM的性能模拟实验以及实际负载的测试结论. 第7节对全文进行总结概括.
集群资源调度框架分为集中式(Yarn, Mesos, Omega)、分布式(Sparrow, Apollo)和集中分布混合式(Mercury, Hawk, Eagle).
Yarn提供3种调度策略, 即先来先服务(FIFO)、容量调度[
Sparrow[
Mercury[
文献[
Storm[
GPU资源调度主要包含多任务在单个GPU设备的共享以及多任务在多个GPU设备的共享. 文献[
这些资源管理框架只考虑GPU资源, 缺乏CPU与GPU资源的解耦, 不考虑CPU与GPU之间的亲和性, 也不考虑GPU执行中对CPU的依赖, 应用于大数据计算时, 其后果是要么CPU资源的浪费, 要么GPU资源浪费. HRM系统中, 将CPU-GPU作为独立的资源单元来管理, 为每个资源单元提供队列机制, 实现CPU任务和GPU任务的单独调度, 适应于大数据分析计算; 同时也实现了细粒度的共享和弹性资源分配, 支持流处理和批处理作业混合的计算环境.
在本节中, 首先给出HRM的总体说明, 然后给出了资源调度的总体框架以及各个组成部分的功能介绍.
HRM总体结构采用共享状态架构, 由多个调度器、一个资源协调器、资源监视器以及队列管理器组成.
HRM调度框架
最上层是用户的应用程序(后面简称作业, job). 对于每个作业, AppMaster是核心, 与集群资源相关的功能包括任务管理以及与集群资源之间的各种消息交互. 任务管理主要是任务的调度以及任务执行中的控制; 与集群计算资源的交互主要是向集群注册、获得集群资源的状态以及容器的提交等.
中间层是资源调度框架. 资源调度框架包括多个分布式调度器(distributed scheduler)、资源协调器(negotiator)、作业状态(job states)以及资源状态(resource states)这4个部分. 作业状态是整个集群上运行的作业的信息, 包括作业的开始时间、作业中的stage、每个stage中的任务数量、未完成任务和正在执行的任务数等. 资源状态是各个计算节点上可用资源状态, 例如可用资源数量、正在运行的容器的状态和数量以及集群计算节点的健康状态等. 资源协调器是接收各个计算节点的心跳信息, 并经过安全检查和一定的预处理后, 更新全局资源状态, 同时还负责将资源仲裁的结果向作业的调度器汇报. 分布式调度器可以感知作业类型, 对于流处理作业执行流处理资源分配策略(data stream policy), 批处理作业则执行批处理资源分配策略(batch policy). 关于这两种策略, 会在第4节进行详细说明.
底层是计算节点的管理. 计算节点管理包括计算节点上的队列管理器(queue system)[
AppMaster是用户作业(job)的一个进程, 运行在某个计算节点上, 拥有自己的状态并管理作业中的全部任务. 当用户提交作业时, AppMaster负责向资源管理器注册并请求资源的分配. 一旦获得资源, AppMaster管理作业中的任务, 监控任务的状态并控制任务的运行. 由于AppMaster掌握作业输入数据的存储信息、分片信息以及作业执行过程中临时数据的存储信息, 因此作业一旦获得计算资源, 就按照一定的调度策略(如数据的本地化访问等)将每个任务发送到计算节点上进行执行.
HRM中包含两类心跳信息: 一类是资源状态的汇报, 另外一类是作业状态的汇报. 第1类是计算节点(node)上资源监控器(ResourceTracker)与协调器之间的通信, 定时向资源管理器汇报计算节点的状态、容器的状态等. 第2类是用户的应用程序(AppMaster)与分布调度器(distributed scheduler)之间的心跳信息交互, 定时更新作业状态的信息(job status), 例如作业的任务数, 已经完成, 未完成任务数量等.
详细的资源分配和容器调度过程
● 1a(1b). 作业的调度请求. 1a为流处理作业向一个分布式调度器发送任务提交请求, 1b为批处理作业向一个分布式调度器发送的任务提交请求;
● 2a(2b). 对于批处理作业, 调度器从全局集群资源状态信息(resource status)获得一个拷贝; 对于流处理作业来说, 是发送一个资源加锁请求, 获得全部资源状态并进行调度;
● 3. 分布式调度器根据全局资源状态信息进行资源分配, 然后将资源分配结果发送到协调器(negotiator);
● 4a(4b). 对于批处理作业, 协调器对分布式调度器的调度结果进行冲突检查: 对于不存在冲突的资源分配信息, 更新全局资源状态(
● 5a(5b). 协调器向调度器发送调度结果;
● 6a(6b). 分布式调度器根据资源分配的最终结果, 向集群的计算节点发送消息, 将容器的启动命令提交到队列中;
● 7a(7b). 计算节点的队列管理器按照资源约束信息, 启动任务执行所需的容器;
● 8a(8b). 作业定期通过调度器向协调器汇报作业的状态信息, 比如运行的任务数量等信息;
● 9a(9b). 节点资源监控器定时向协调器汇报节点的健康状态消息以及节点上容器的状态消息.
HRM系统采用全局资源状态信息来为调度器提供分配依据, 各个分布式调度器调度前, 首先获得全局资源的状态的一个备份, 然后按照其信息进行并行的资源分配, 从而减少了次优分配的概率. 另外, HRM的流处理和批处理作业之间设置不同的优先级别, 保证流处理作业对资源的抢占式调度, 一定程序缓解了资源分配冲突的问题.
对于分布式调度框架, 由于各个调度器独立工作, 调度器之间无法进行消息通信, 因此各个调度器的负载情况就不能被共享, 导致调度器无法决策各个作业在全局资源中应该使用的资源份额. HRM设置作业的状态信息, 用来决策全局资源在各个作业之间的公平共享. 协调器掌握整个集群的负载大小, 确保全局的分配策略, 如资源共享的公平性、任务在队列中重新设置优先级别等等.
计算节点上的队列管理器是任务执行的基础, 任务使用计算节点的物理资源来执行. 因此, 高效地管理集群计算节点的可用资源, 是一个非常重要的环节.
大数据处理中, 对GPU的资源分配完全不同于高性能计算. 高性能计算的每个作业需要强大的算力, 较少涉及数据的I/O. 但是, 基于数据流计算模型的大数据分析作业是由许多操作组成的有向无环图(DAG), 像洗牌操作(shuffle)、网络传输等操作只能在CPU上执行, 而且占有大量的比例.
首先, 本文假设集群中所有CPU核的计算能力是相同的, 但是因为CPU和GPU之间的亲和性不同, 导致GPU在使用不同CPU时所造成的延迟也是不尽相同的. 因此, 在CPU和GPU分配方面, 需要考虑以下几个方面: (1) 如果CPU和GPU分割不合理, 就可能发生CPU饥饿或者GPU饥饿的情况; (2) CPU和GPU之间存在亲和性关系, GPU对应的CPU核在计算过程中不断变化, 必然会导致主机内存到设备内存数据传输性能的下降.
HRM采用资源单元策略(resource unit, RU), 使用Linux提供的容器技术进行隔离, 每个RU是一个独立的资源分配单元. 对于每个RU, 按照亲和性为GPU设备设置一定数量的CPU核. 为此, HRM提供3种方式来划分RU: (1) 计算节点上的每个GPU作为一个RU, 全部CPU形成一个RU, 可以被GPU共享; (2) 每个节点上, 将一定数量的CPU核与GPU绑定在一起, 形成一个RU, 剩余CPU可以分成一个或者多个RU; (3) 依照GPU数量, 将CPU核均匀地划分给GPU设备, 形成不同的RU. 其结果如
CPU-GPU分配方式
CPU-GPU资源分割由用户自己选定, 依据亲和性以及资源需求动态灵活地绑定. 但是可以制定以下启发式原则.
● 如果作业类型主要是CPU作业, GPU作业数量较少时, 选择
● 如果作业类型既包含CPU作业, 也包含一定数量的GPU作业时, 按照亲和性关系和CPU核数量的要求, 选择
● 如果作业全部是GPU作业, 例如大量的图片检测程序时, 只需要将所有CPU核平均分配到各个GPU上即可, 选择
为了高效利用每个资源单元, 需要设置队列来调度任务. 通常, GPU操作的执行离不开CPU的调度, 所以GPU操作执行中, 总是存在CPU的执行, 其执行模型如
大数据分析中GPU操作的执行模型
为了解决CPU和GPU不能完全利用而导致的资源空闲问题, HRM通过队列机制将资源单元与队列进行了映射, 通过Linux的Cgroup子系统以及其层次结构来实现资源单元与队列的映射. 具体过程分为3步: 首先为队列创建控制组, 控制组用队列编号命名; 其次, 根据资源单元为队列上的控制组设置资源参数, 确保资源单元与其他资源单元之间的隔离; 最后, 一旦作业运行在队列上, 将作业的进程ID添加到控制组, 满足作业共享资源单元. 所使用的Cgroup子系统有cpuset, devices, memory和cpuacct.
每个大数据作业的运行时系统包含一个调度器和多个执行器, 调度器管理作业中的操作, 执行器用来执行作业中的各种操作. 为了防止作业之间的干涉, 执行器以容器的方式存在. 由于CPU与GPU操作是并发, 即交替执行, 通过设置队列的同时运行容器数量, 让资源单元同时运行多个操作, 从而提高资源单元的利用率. 当然, 同时运行数量设置太多, 可能会导致资源的竞争, 反而影响作业的性能. 具体策略为:
(1) CPU操作与GPU操作之间的资源共享. 为了提高CPU资源利用率, 将GPU操作的剩余CPU时间分配给其他CPU操作. 为了防止CPU操作与GPU操作对CPU资源的竞争, 通过队列为GPU操作设置较高的优先级别, 为CPU操作设置较低的优先级别, 从而保证GPU操作能够得到足够的CPU计算资源, 避免两种不同类型的操作执行过程中的等待或者干涉;
(2) GPU操作与GPU操作之间的资源共享. 单个GPU操作独占GPU设备时, 会造成GPU资源的浪费.为了提高GPU设备的利用率, 通过队列机制, 当GPU设备的显存还存在剩余的话, 加载其他作业的操作到同一个GPU设备, 实现GPU计算资源在多个操作之间的资源共享, 如
GPU设备资源的细粒度共享
在单个计算节点上, 设置传输队列和执行队列. 如
RU上队列的堆叠模型
每个RU上采用队列堆叠技术, 设置两个执行队列, 分别为批(batch)队列和流(stream)队列, 这两个执行队列共享RU对应的物理资源. 这就意味着: 相同的物理资源上, 虚拟了两个可执行队列. 执行队列具有以下属性.
● 队列类型. 如果RU中包含GPU设备, 那么队列分类为GPU队列; 如果RU中不包含GPU设备, 那么队列分类设置为CPU队列;
● 同时运行数. 每个RU包含有具体的资源数量, 单个容器无法完全使用全部的资源数量时, 就可以提交多个容器同时运行, 提高RU的利用率;
● 排队数量. 容器在队列中排队, 当存在空闲资源时, 可以立即执行排队中的容器, 降低调度过程中的反馈延迟;
● 优先级别. 优先级别决定队列中等待容器的优先执行级别, 高优先级别的流处理队列采用抢占式调度策略, 优先获得资源. 堆叠技术可以确保流处理作业对于资源的弹性分配. 如果流处理作业负载较高, 那么批处理作业获得资源的机会减少, 但是批处理作业可以通过排队来获得需要的资源;
● 资源限制. 容器请求的资源超过限制, 则请求不被调度执行.
作业的容器在队列上由等待状态变为运行状态, 是由队列上的可用计算资源决定的, 容器的资源总和不能超过RU上的可用资源总和.
对于CPU队列, 假如RU中的可用CPU核数为
此时, 只要满足
对于GPU队列, 假如RU中GPU的显存大小为
在HRM中: 对于流处理作业, 使用流处理队列来管理容器的执行; 对于批处理作业, 使用批处理队列来管理容器的执行. 由于流处理作业的运行时间通常是难以预料的, 所以调度器分配流处理作业的容器时, 不设置容器在队列中的排队机制, 即流处理作业的资源分配仍然采用释放、分配、使用、再释放的循环模式. 对于批处理作业, 由于其运行时间相对较短, 因而在批处理作业调度过程中, 为了减少反馈延迟, 采用容器在队列中排队的机制. 当前一个运行的容器结束后, 后续的容器从等待状态直接转换为执行状态, 防止反馈延迟带来的资源空闲, 提高了资源的使用效率.
如果队列排队长度设置太短, 就会存在资源空闲而导致集群资源利用率下降; 反之, 如果队列排队长度设置太长, 就可能导致一些容器等待延迟变大. 实际上, 只要集群中的资源无法满足作业对资源的需求, 那么一些容器就需要在队列中等待.
在实际的应用中, 超过60%的作业是重复发生的. 因此, 可以通过作业完成时间来估计队列的长度. 对于这样的作业, 我们依据历史的作业运行记录来预估容器的运行时间. 如果缺乏这样的估计, 使用默认的作业完成时间, 并扩展了作业调度器动态调整的功能, 即: 通过观察实际的容器持续时间, 并随着作业的运行不断完善初始估计.
所有的队列都存在一个预定义的队列排队长度
(1) 基于固定执行时间的队列长度. 假设所有的容器的运行时间都为1/
(2) 基于延迟的队列长度. 当作业运行时间变化较大时, 维护队列的固定长度就不能很好地利用资源.当多个短时间运行的容器碰巧出现在队列中, 那么就可能造成资源的空闲. 当队列中存在长时间运行的容器, 就可能造成某些容器启动的延迟. 因此, 当容器的运行时间能够确定时, 使用基于延迟的策略来设置队列长度.
在特殊情况下, 指定一个容器在队列中等待的最大时间
作业一旦向资源调度器注册后, 资源调度器就需要为这些作业申请资源. 作业获得资源后, 再启动容器并运行任务. 流处理作业的资源分配频率低, 但是需要确保能够获得足够的资源.
作业请求资源时, 资源调度器需要根据整个集群的可用资源状态来进行决策, 所以全局资源状态信息是调度的依据. 一个计算节点的资源信息一般包含CPU核数量(
HRM使用队列等待矩阵来描述这些信息. 每个队列记录着需要一定CPU、内存大小或者GPU等资源的容器的预期等待时间. 基于正在运行的容器以及等待中的容器, HRM建立一个预期等待时间的矩阵. 对于每一个容器, 也包含一个容器的执行时间, 这个时间是作业的理想运行时间. 队列等待矩阵随着容器的执行不断地被更新着. 资源调度器开始调度时, 获得队列等待矩阵的最新更新值, 并作为调度依据.
为了预测队列的等待时间, 集群资源信息分为资源单元状态表和容器状态表. 资源单元信息表存储可用的RU, 采用关系模型存储, 表示为
当集群的工作负荷较轻时, 各个作业的资源需求都会得到满足, 因此队列的预测等待时间为0; 当集群的工作负荷较重时, 各个队列上同时运行的容器数达到物理资源的最大值, 如果还存在作业的资源请求时, 作业对应的容器就需要在队列上排队. 如果存在多个队列时, 调度器需要选择一个排队时间最短的队列来运行容器, 而计算排队时间最短的依据就是队列的负载信息. 计算节点计算出各个队列的负载并提供给调度器, 调度器依据负载来提供资源. 队列负载小, 被优先选择的机会更高.
在计算队列的预测等待时间方面, 我们将作业分为均匀作业和不均匀作业两类.
(a) 均匀作业的队列等待时间的计算
资源调度器为作业分配计算资源前, 需要给出一个等待时间最少的队列, 作为候选资源来反馈给作业. 当作业的执行时间相对均匀时, 其等待时间就等于队列的排队长度. 均匀作业是计算每个队列排队长度. 队列排队长度较小的计算节点提供优先分配的机会. 这种方式可能导致一个失败的结果, 例如: 对于一些作业异构的场合, 一个计算节点上有2个容器在排队, 每个运行时间是500 s; 而一个计算节点有5个容器在排队, 每个运行时间只有2 s. 那么由于前者队列的长度是2, 具有较高的优先级别, 调度器选择这个计算节点. 实际上, 这个选择不是最好的, 从等待时间上看, 我们应该选择后者.
所以, 通过计算队列中等待作业的长度的方法只能适应于那些任务大小非常规整的场合, 例如图片的实时检测, 每个图片数据大小一样, 花费的检查时间也一样, 资源需求变化不大. 但是, 如果任务大小是变化的, 计算时间也变化, 这种调度方式就不是最好. 因此, 对于任务大小变化、计算时间也变化的场合, 我们通过估计每个RU的预计开始时间来进行调度, 选择最短的开始时间, 提供给需要调度的作业.
(b) 不均匀作业的队列等待时间的预测
当一个作业调度器为自己的容器提交资源申请时, 资源管理器开始计算容器运行在每个RU上的预计开始时间, 最后从这些RU上选择一个预计开始时间最小的队列, 作为容器运行的资源.
算法1给出了队列中容器的等待时间计算方法. 算法的输入是作业的一个容器, 算法的输出是容器到计算节点的资源单元RU的一个预计等待时间最短的映射. 另外, 算法的输入中包括一个作业类型变量, 分别代表均匀作业和不均匀作业. 算法1中, 首先检查计算节点上的全部RU, 如果有可用计算资源满足容器的资源需求且无任何排队的任务, 那么该容器的等待时间为0(算法1中的S2−S7); 否则就需要遍历所有的RU, 计算RU上的运行中的容器以及等待中的容器, 得到最小的等待时间, 从而得到请求资源的容器的预计开始时间.
输入:
输出:
S1:
S2:
S3:
S4:
S5:
S6:
S7:
S8:
S9:
S10:
S11:
S12:
S13:
S14:
S15:
S16:
S17:
S18:
S19:
S20:
S21: 把
S22:
S23:
S24:
S25:
S26:
S27:
S28:
S29:
S30:
S31:
Function:
S1:
S2: 将
S3:
S4:
S5:
S6:
S7:
S8: 把
S9:
S10:
S11:
S12:
S13:
S14:
S15:
S16:
S17:
针对最小等待时间的计算, 算法1在计算的时候将作业区分为均匀作业和不均匀作业. 均匀作业只需要简单地统计队列上的排队长度即可, 无需像不均匀作业计算最小等待时间那样复杂. 算法1中的S9代表均匀作业, S15代表不均匀作业. 对于均匀作业, S10−S14分别统计每个队列中等待作业的数量, 然后选择一个等待作业最小的队列作为输出.
对于不均匀作业, 算法以当前运行中的容器的剩余时间以及排队中的容器的运行时间为输入, 模拟容器的执行过程. 算法1中, S8遍历计算节点上的所有RU. S20−S22计算出运行中的容器的剩余运行时间, 然后将这些容器信息添加到队列
函数
算法1中, 只使用不均匀作业分支, 也适应于均匀作业的场合, 但是会导致算法的复杂性增加. 假如等待状态的任务和运行状态的任务数为
流处理作业的容器运行时间一般也比较长, 但是每个微批次数量不大, 运行中可能只是部分弹性扩展或者缩小. 基于这些信息, 资源调度器分配采用悲观封锁协议来分配资源. 其过程主要包含以下步骤.
(1) 流处理作业向某个分布式调度器注册;
(2) 分布式调度器向协调器发出资源封锁的消息;
(3) 封锁成功后, 协调器将最新的流处理队列状态信息发送给分布式调度器;
(4) 分布式调度器根据流处理作业容器的资源要求, 按照算法1计算出等待时间为0的可用资源;
(5) 分布式调度器向协调器发送调度结果, 并释放封锁;
(6) 分布式调度器根据资源所在计算节点信息, 启动容器.
当存在两个或者两个以上的流处理作业同时请求资源时, 协调器根据先来先服务的策略, 为每个作业分配足够的计算资源. 如果集群的整体资源无法满足两个以上的流处理作业的资源需求, 那么无法获得足够资源的流处理作业将被拒绝运行.
批处理采用运行一次的方式, 每个批次的数据规模较大, 资源调度器采用乐观封锁协议来进行批处理作业的资源分配. 分布式调度器可以获得一个不断更新的批处理队列状态信息的副本, 再利用状态信息副本来进行调度. 一旦调度器决策了作业的资源分配, 就请求协调器进行一次资源状态的更新. 在大多数情况下, 更新操作能够成功进行, 发生冲突的可能性比较小. 不管更新是否成功, 分布式调度器都会再次获得资源最新状态信息, 继续进行下一次的资源分配操作.
HRM的分布式调度器是完全并行运行, 为了避免资源更新时的冲突, 调度器采用增量更新方式, 也就是说发生冲突的资源被放弃, 没有发生冲突的资源正常更新. 批处理作业资源分配流程包含以下步骤.
(1) 各个集群节点向协调器汇报最新状态;
(2) 分布式调度器获得全局计算资源后, 进行资源分配;
(3) 调度器提出资源信息更新, 协调器经过检查后没有冲突, 更新全局资源状态矩阵; 如果发现存在资源冲突, 协调器将冲突资源写入队列中;
(4) 将分配结果通知分布式调度器, 由调度器向计算节点发起容器启动消息.
实现过程中, 主要包括两个部分: 一是分布式调度器根据全局资源的信息对任务的调度, 二是资源协调器对全局资源更新消息的提交两个过程(算法2、算法3).
输入:
输出: 资源分配单位集合.
S1:
S2:
S3:
S4: send
S5:
S6:
S7:
S8:
S9: 将资源发送给作业
S10:
S11:
Function
S1:
S2:
S3: call
S4:
S5:
S6:
S7:
S8:
S9:
输入:
输出: 更新资源.
S1:
S2:
S3: update
S4:
S5:
S6:
S7:
对于分布式调度器, 遍历作业队列中的每个作业, 算法2的S1, S2是从全局资源状态获得一个副本. S3为作业分配资源. S4将分配后的结果提交到协调器. S5为判断返回的结果. 如果需要的资源还没有满足, 当前作业加入作业队列(算法2的S7), 等待下一次调度.
函数
算法用来检测资源分配的冲突. 协调器接收到资源更新请求后, 检查当前集群计算节点的资源是否满足任务分配的资源(算法3的S2): 如果满足, 则不产生冲突, 更新全局集群资源状态(算法3的S3); 如果不满足, 则将当前的分配请求插入等待队列(算法3的S5), 由协调器在下一次心跳信息到达时再分配.
流处理作业依据流处理队列的状态, 采用先来先服务方式, 这里就不详细说明.
● 流处理作业之间的资源策略.
对于流处理作业, 必须保证作业的SLO, 需要一次性分配足够的资源. 为防止流处理作业资源分配冲突, 采用悲观封锁协议, 减少流处理作业之间的资源分配冲突. 按照先来先服务分配计算资源, 资源不足时, 不许可其他作业的注册.
● 批处理作业之间的资源策略.
对于批处理作业, 通过集群系统总的资源数量以及运行中的作业数量, 调度器计算出作业的资源份额. 例如: 假如集群系统中有
如果当前队列有长时间执行的任务, 导致任务长时间等待, 而其他节点有空闲资源的情况, HRM的全局资源策略通过队列之间的负载平衡来实现任务的迁移[
在提交作业时, 如果批处理作业和流处理作业使用相同的编程框架, 那么资源管理器很难选择调度策略.要么人工设置作业的类型, 即设置批处理作业或者流处理作业, 调度器根据作业类型采用不同的调度策略; 另外一种方式是自适应感知作业的类型, 然后使用不同的调度策略. 前者简单, 后者复杂.
为了实现自适应感知, 调度器需要获得作业的一些特征, 例如数据批次大小、作业延迟要求、作业触发时机等特征. 调度器收集这些特征, 然后进行决策.
目前, 论文只实现了对Spark执行引擎的自适应感知. Spark作为数据流计算模型的编程框架, Spark Structure Stream中的无界表结构, 使流处理作业和批处理作业共用一套代码, 因此Spark计算框架不但适合批处理作业, 也适用于流处理作业. 为了实现HRM中调度器的批流感知, 对spark的触发器进行了改造, 当应用程序的触发方式为执行一次时, 向HRM发送批处理特征; 当触发方式为按照窗口执行多次时, 向HRM发送流处理特征. HRM通过计算框架的触发方式来感知作业的类型, 对流处理作业采用悲观封锁协议分配资源, 对于批处理作业采用乐观封锁协议分配资源.
系统在一个集群上进行实验, 集群中包含8台NF5468M5服务器, 作为计算节点; 1台中科曙光服务器620/420, 作为调度节点. 每个服务器节点包含2个Xeon2.1处理器, 每个处理器包含8个核, 32 GB DDR4内存, 2块RTX2080TI GPU卡, 10 GB显存. 集群包含1台AS2150G2磁盘阵列. 服务器操作系统为Ubuntu 7.5.0, CUDA版本为10.1.105, 采用C++11作为编程语言. 为了测试框架的性能, 流处理和批处理作业使用Spark计算框架开发, 采用WordCount, Sort, TPC-H等负载, GPU应用主要使用YOLOv3为模型的视频流检测, 使用Java native来实现kernel函数的调用.
模拟测试中, 使用一台机器模拟Negotiator, 其余7台机器模拟工作节点. 每台工作节点机器设置150个队列, 一个队列模拟一台物理机器, 称为逻辑机器. 队列的runlimit设置为16, 代表每个逻辑机器拥有16个CPU核. 这样的话, 一台物理机器就可以模拟150台逻辑机器, 7台物理机器模拟1 050台逻辑机器, 每个逻辑机器上模拟16个CPU核, 那么整个模拟系统的CPU计算资源为16 800个CPU核.
作业按照一定的时间间隔向集群资源调度器注册并请求资源. 每个作业中设置数量不同的任务数. 当作业注册后, 调度分配计算资源, 并启动执行器. 执行器启动后, 向作业请求任务并执行任务, 任务的执行采用
为了说明HRM的调度性能, 与集中式调度器对比. 集中式调度器一次性提交60个、120个以及180个作业; 使用HRM时, 使用了6个分布式调度器上, 每个调度器提交10个、20个以及30个作业. 其中, 每个作业包含40个容器(执行器), 容器中任务的执行时间使用固定值或者随机值. 两组不同的测试中, 测试作业的调度延迟时间以及作业完成时间. 每一组至少测试3次, 统计平均值, 其结果如
不同作业数量时的性能
从
当大量作业注册到调度器上的时候, 作业首先进入调度器的等待队列, 等待调度器进行资源调度. 因此, 可以用某一时刻下等待调度的作业数量与注册的所有作业数量的比值来评价调度器的繁忙程度, 比值越大, 说明需要该调度器调度的作业的数量越多, 则说明该调度器越繁忙. 为了评价调度器的繁忙程度, 可以加大作业提交的数量, 通过作业数量的不断变化来评价调度器的特性, 包括作业的调度延迟以及调度器繁忙程度的变化情况.
调度器性能
资源分配冲突比例
为了测试真实负载情况下HRM的性能, 使用了4类负载: 第1类是CPU批处理作业, 由WordCount, Sort以及TPC-H查询组成; 第2类是CPU流处理作业, 为Top-
为了测试HRM的基准性能, 与mesos资源调度框架的RDF、Yarn的容量调度以及Yarn的公平调度策略进行了对比, 在集群数量有限的情况下, HRM的性能基本与mesos相当, 比Yarn的两种调度策略有提高.
CPU批/流处理作业完成时间
对于GPU流处理以及批处理, 由于mesos目前只支持整个GPU设备一次性分配给一个作业, 测试的集群只有16块GPU设备, 当HRM也采用粗粒度方式调度批处理与流处理作业时, 其性能基本一致, 所以论文没有给出基准测试数据的对比.
流处理和批处理作业同时运行时, 流处理作业和批处理作业必然同时向集群资源管理器请求计算资源.
流处理与批处理作业同时运行性能
从
● 当同时运行的批处理作业数量较少(1−3个), 流处理作业在HRM和mesos上, 对每个微批次数据的处理时间大致相同;
● 当同时运行的批处理作业数量扩大(5−9个), HRM资源管理下, 流处理作业的每个微批次数据处理时间变化不大; 而mesos资源管理下, 其微批次数据处理时间增加了2−3倍. 其原因在于: 批处理和流处理作业开始竞争资源, 当同时运行的批处理作业再增加, 可以看出, 流处理作业每个微批次数据处理时间增加的更大, 这说明流处理作业获得的计算资源数量大大减少. 由于HRM采用队列堆叠技术, 流处理和批处理作业分别使用各自的队列, 虽然同时运行的批处理作业数量增加, 但是流处理作业的队列的优先级较高, 其能够充分获得计算资源, 所以每个微批次数据处理时间变化不大.
HRM对每个物理资源设置一个排队队列, 当前容器结束, 排队中的容器立即开始执行, 就可以减少资源调度的延迟.
容器排队性能
HRM通过对GPU设备的显存进行管理, 在每个Executor不超过显存最大值的情况下, 能够在不同的作业之间共享同一个GPU设备, 提高GPU利用率.
GPU粗细粒度性能评价表
并行度 | 最大显存占用(MB) | GPU峰值利用率(%) | 处理时间(s) |
1 | 2 865 | 24 | 151.3 |
2 | 5 720 | 61 | 103.2 |
3 | 8 575 | 73 | 69 |
从
为了测试GPU流处理和CPU批处理作业混合时的资源利用率, 测试中, GPU流处理为使用darknet检测1 000张图片, CPU批处理为WordCount作业, 数据大小为25 GB. 测试时间大约为230 s左右, 我们计算了GPU利用率和CPU利用率的平均值, 其结果如
混合作业时资源利用率
● 单独执行detection时, GPU利用率为67%左右; CPU利用率较低, 大约为35%左右;
● 单独执行wordcount时, 其CPU利用率在78%左右;
● 混合执行decection和wordcount时, 其GPU利用率变化不大, 但是CPU利用率提高到93%左右.
因此, 使用HRM资源管理系统, 对于CPU任务与GPU任务混合场景, 在流处理作业的不受影响的情况下, CPU利用率会得到进一步的提高.
随着GPU设备在大数据分析中的广泛应用, 为批处理/流处理融合的应用提供CPU-GPU资源共享也越来越重要. HRM满足这些要求, 采用应用程序可感知的调度策略, 为流处理作业实现低延迟, 为批处理作业提供高通量处理. 在CPU-GPU资源调度方面, 利用队列机制将物理资源虚拟化并提供抢占式调度策略, 满足流处理作业实时性要求. 队列的使用, 也减少了反馈延迟问题.
doi: 10.1145/1327452.1327492]]]>
https://github.com/nathanmarz/storm]]>
et al. Naiad: A timely dataflow system. In: Proc. of the 24th ACM Symp. on Operating Systems Principles (SOSP 2013). New York: Association for Computing Machinery, 2013. 439−455. [doi: 10.1145/2517349.2522738]]]>
et al. Discretized streams: Fault-tolerant streaming computation at scale. In: Proc. of the 24th ACM Symp. on Operating Systems Principles (SOSP 2013). New York: Association for Computing Machinery, 2013. 423−438. [doi: 10.1145/2517349.2522737]]]>
Gates AF, Natkovich O, Chopra S,
https://github.com/GoogleCloudPlatform/DataflowJavaSDK]]>
https://cloud.google.com/dataflow/]]>
Akidau T, Bradshaw R, Chambers C,
Pishgoo B, Azirani AA, Raahemi B. A hybrid distributed batch-stream processing approach for anomaly detection. Information Sciences, 2020, 543: 309−327.
http://flink.apache.org/]]>
et al. Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing. In: Proc. of the 9th USENIX Conf. on Networked Systems Design and Implementation. Berkeley: USENIX Association, 2012. 2. [doi: 10.5555/2228298.2228301]]]>
Karanasos K, Raol S, Curino C,
Delgado P, Dinu F, Zwaenepoel W,
et al. Job-aware scheduling in Eagle: Divide and stick to your probes. In: Proc. of the 7th ACM Symp. on Cloud Computing. New York: ACM, 2016. 497−509. [doi: 10.1145/2987550.2987563]]]>
et al. Apache hadoop Yarn: Yet another resource negotiator. In: Proc. of the 4th Annual Symp. on Cloud Computing. New York: ACM, 2013. 1−16. [doi: 10.1145/2523616.2523633]]]>
et al. Mesos: A platform for fine-grained resource sharing in the data center. In: Proc. of the 8th USENIX Symp. on Networked Systems Design and Implementation. Berkeley: USENIX Association, 2011. 295−308. [doi: 10.5555/1972457.1972488]]]>
Burns B, Grant B, Oppenheimer D,
https://sourceforge.net/projects/cudawrapper/files/]]>
https://hadoop.apache.org/docs/r1.2.1/capacity_scheduler.html]]>
https://hadoop.apache.org/docs/r1.2.1/fair_scheduler.html]]>
et al. Dominant resource fairness: Fair allocation of multiple resource types. In: Proc. of the 8th USENIX Symp. on Networked Systems Design and Implementation. Berkeley: USENIX Association, 2011. 323−336. [doi: 10.5555/1972457.1972490]]]>
et al. Omega: Flexible, scalable schedulers for large compute clusters. In: Proc. of the 8th ACM European Conf. on Computer Systems. New York: ACM, 2013. 351−364. [doi: 10.1145/2465351.2465386]]]>
et al. Sparrow: Distributed, low latency scheduling. In: Proc. of the 24th ACM Symp. on Operating Systems Principles. New York: ACM, 2013. 69−84. [doi: 10.1145/2517349.2522716]]]>
et al. Apollo: Scalable and coordinated scheduling for cloud- scale computing. In: Proc. of the 11th USENIX Symp. on Operating Systems Design and Implementation. Berkeley: USENIX Association, 2014. 285−300. [doi: 10.5555/2685048.2685071]]]>
et al. Alsched: Algebraic scheduling of mixed workloads in heterogeneous clouds. In: Proc. of the 3rd ACM Symp. on Cloud Computing. New York: ACM, 2012. 1−7. [doi: 10.1145/2391229.2391254]]]>
Yao Y, Tai JZ, Sheng B,
Goder A, Spiridonov A, Wang Y. Bistro: Scheduling data-parallel jobs against live production systems. In: Proc. of the 2015 USENIX Annual Technical Conf. Berkeley: USENIX Association, 2015. 459−471.
Grandl R, Chowdhury M, Akella A,
et al. Choosy: Max-min fair sharing for datacenter jobs with constraints. In: Proc. of the 8th ACM European Conf. on Computer Systems. New York: ACM, 2013. 365−378. [doi: 10.1145/2465351.2465387]]]>
et al. Learning scheduling algorithms for data processing clusters. In: Proc. of the ACM Special Interest Group on Data Communication. New York: ACM, 2019. 270−288. [doi: 10.1145/3341302.3342080]]]>
et al. Delay scheduling: A simple technique for achieving locality and fairness in cluster scheduling. In: Proc. of the 5th ACM European Conf. on Computer Systems. New York: ACM, 2010. 265−278. [doi: 10.1145/1755913.1755940]]]>
et al. Effective straggler mitigation: Attack of the clones. In: Proc. of the 10th USENIX Symp. on Networked Systems Design and Implementation. Berkeley: USENIX Association, 2013. 185−198. [doi: 10.5555/2482626.2482645]]]>
et al. Quincy: Fair scheduling for distributed computing clusters. In: Proc. of the 22nd ACM Symp. on Operating Systems Principles. New York: ACM, 2009. 261−276. [doi: 10.1145/1629575.1629601]]]>
Gog I, Schwarzkopf M, Gleave A,
et al. Scheduling large jobs by abstraction refinement. In: Proc. of the 6th ACM European Conf. on Computer Systems. New York: ACM, 2011. 329−342. [doi: 10.1145/1966445.1966476]]]>
et al. Fuxi: A fault-tolerant resource management and job scheduling system at Internet scale. Proc. of the VLDB Endowment, 2014. 1393−1404. [doi: 10.14778/2733004.2733012]]]>
et al. Graphene: Packing and dependency-aware scheduling for data-parallel clusters. In: Proc. of the 12th USENIX Symp. on Operating Systems Design and Implementation. Berkeley: USENIX Association, 2016. 81−97. [doi: 10.5555/3026877.3026885]]]>
et al. Jockey: Guaranteed job latency in data parallel clusters. In: Proc. of the 7th ACM European Conf. on Computer Systems. New York: ACM, 2012. 99−112. [doi: 10.1145/2168836.2168847]]]>
Venkataraman S, Panda A, Ananthanarayanan G,
et al. Improving MapReduce performance in heterogeneous environments. In: Proc. of the 8th USENIX Symp. on Operating Systems Design and Implementation. Berkeley: USENIX Association, 2008. 29−42. [doi: 10.5555/1855741.1855744]]]>
et al. Morpheus: Towards automated SLOs for enterprise clusters. In: Proc. of the 12th USENIX Conf. on Operating Systems Design and Implementation (OSDI 2016). USENIX Association, 2016. 117−134. [doi: 10.5555/3026877.3026887]]]>
doi: 10.1145/2644865.2541941]]]>
Suzuki Y, Kato S, Yamada H,
Kato S, McThrow M, Maltzahn C,
et al. Design and evaluation of the gemtc framework for GPU-enabled many-task computing. In: Proc. of the 23rd Int'l Symp. on High-Performance Parallel and Distributed Computing (HPDC 2014). New York: Association for Computing Machinery, 2014. 153−164. [doi: 10.1145/2600212.2600228]]]>
et al. Pagoda: Fine-grained GPU resource virtualization for narrow tasks. In: Proc. of the 22nd ACM SIGPLAN Symp. on Principles and Practice of Parallel Programming (PPoPP 2017). New York: Association for Computing Machinery, 2017. 221−234. [doi: 10.1145/3155284.3018754]]]>
Shao J, Ma J, Li Y,
doi: 10.1109/CloudCom.2010.55]]]>
et al. GPU cluster for high performance computing. In: Proc. of the 2004 ACM/IEEE Conf. on Supercomputing (SC 2004). IEEE Computer Society, 2004. 47. [doi: 10.1109/SC.2004.26]]]>
et al. Large-scale cluster management at Google with Borg. In: Proc. of the 10th ACM European Conf. on Computer Systems, Vol. 18. New York: ACM, 2015. 1−17. [doi: 10.1145/2741948.2741964]]]>
Fukutomi D, Iida Y, Azumi T,
https://issues.apache.org/jira/browse/YARN-5517]]>
Gu JZ, Liu H, Zhou YF,
Gu JC, Chowdhury M, Shin KG,
Kshiteej M, Arjun S, Arjun B,
doi: 10.1109/IPDPS.2011.102]]]>
doi: 10.1145/2925426.2926261]]]>
et al. Efficient queue management for cluster scheduling. In: Proc. of the 11th European Conf. on Computer Systems (EuroSys 2016). New York: Association for Computing Machinery, 2016. 1−15. [doi: 10.1145/2901318.2901354]]]>
Tang XC. The research and implementation of job managementsystem based on cluster computing technology [Ph. D. Thesis]. Xi'an: Northwestern Polytechnical University, 2002. (in Chinese with English abstract).
汤小春. 基于集群技术的作业管理系统的研究与实现[博士学位论文]. 西安: 西北工业大学, 2002.