昆明制作网站费用,常德网络公司,网址域名注册多少钱,长链接转短链接简介#xff1a; 介绍 Flink 1.12 资源管理的一些特性#xff0c;包括内存管理、资源调度、扩展资源框架。 本文由社区志愿者陈政羽整理#xff0c;Apache Flink Committer、阿里巴巴技术专家宋辛童#xff0c;Apache Flink Contributor、阿里巴巴高级开发工程师郭旸泽分享…简介 介绍 Flink 1.12 资源管理的一些特性包括内存管理、资源调度、扩展资源框架。 本文由社区志愿者陈政羽整理Apache Flink Committer、阿里巴巴技术专家宋辛童Apache Flink Contributor、阿里巴巴高级开发工程师郭旸泽分享主要介绍 Flink 1.12 资源管理的一些特性。内容主要分为 4 部分 内存管理资源调度扩展资源框架未来规划一、内存管理
首先回顾 Flink 的内存模型变迁。下图左边分别为 Flink 1.10、Flink 1.11 引入的新的内存模型。尽管涉及的模块较多但 80% - 90% 的用户仅需关注真正用于任务执行的 Task Heap Memory、Task Off-Heap Memory、Network Memory、Managed Memory 四部分。
其它模块大部分是 Flink 的框架内存正常不需要调整即使遇到问题也可以通过社区文档来解决。除此之外“一个作业究竟需要多少内存才能满足实际生产需求” 也是大家不得不面临的问题比如其他指标的功能使用、作业是否因为内存不足影响了性能是否存在资源浪费等。 针对上述内容社区在 Flink 1.12 版本提供了一个全新的 关于 Task manager 和 Job manager 的 Web UI。 在新的 Web UI 中可以直接将每一项监控指标配置值、实际使用情况对应到内存模型中进行直观的展示。在此基础上可以更清楚的了解到作业的运行情况、该如何调整、用哪些配置参数调整等 (社区也有相应的文档提供支持)。通过新的 Web UI大家能更好的了解作业的使用情况内存管理也更方便。
1. 本地内存Managed Memory
Flink 托管内存实际上是 Flink 特有的一种本地内存不受 JVM 和 GC 的管理而是由 Flink 自行进行管理。
本地内存的特点主要体现在两方面
一方面是 slot 级别的预算规划它可以保证作业运行过程中不会因为内存不足造成某些算子或者任务无法运行也不会因为预留了过多的内存没有使用造成资源浪费。 同时 Flink 能保证当任务运行结束时准确将内存释放确保 Task Manager 执行新任务时有足够的内存可用。另一方面资源适应性也是托管内存很重要的特性之一指算子对于内存的需求是动态可调整的。具备了适应性算子就不会因为给予任务过多的内存造成资源使用上的浪费也不会因为提供的内存相对较少导致整个作业无法运行使内存的运用保持在一定的合理范围内。 当然在内存分配相对比较少情况下作业会受到一定限制例如需要通过频繁的落盘保证作业的运行这样可能会影响性能。
当前针对托管内存Flink 的使用场景如下
RocksDB 状态后端在流计算的场景中每个 Slot 会使用 State 的 Operator从而共享同一底层 的 RocksDB 缓存Flink 内置算子包含批处理、Table SQL、DataSet API 等算子每个算子有独立的资源预算不会相互共享Python 进程用户使用 PyFlink使用 Python 语言定义 UDF 时需要启动 Python 的虚拟机进程。
2. Job Graph 编译阶段
Flink 对于 management memory 的管理主要分为两个阶段。
2.1 作业的 Job Graph 编译阶段
在这个阶段需要注意三个问题 第一个问题是slot 当中到底有哪些算子或者任务会同时执行。这个问题关系到在一个查询作业中如何对内存进行规划是否还有其他的任务需要使用 management memory从而把相应的内存留出来。 在流式的作业中这个问题是比较简单的因为我们需要所有的算子同时执行才能保证上游产出的数据能被下游及时的消费掉这个数据才能够在整个 job grep 当中流动起来。 但是如果我们是在批处理的一些场景当中实际上我们会存在两种数据 shuffle 的模式 一种是 pipeline 的模式这种模式跟流式是一样的也就是我们前面说到的 bounded stream 处理方式同样需要上游和下游的算子同时运行上游随时产出下游随时消费。 另外一种是所谓的 batch 的 blocking的方式它要求上游把数据全部产出并且落盘结束之后下游才能开始读数据。这两种模式会影响到哪些任务可以同时执行。目前在 Flink 当中根据作业拓扑图中的一个边的类型 (如图上)。我们划分出了定义的一个概念叫做 pipelined region也就是全部都由 pipeline 的边锁连通起来的一个子图我们把这个子图识别出来用来判断哪些 task 会同时执行。 第二个问题是slot 当中到底有哪些使用场景我们刚才介绍了三种 manage memory 的使用场景。在这个阶段对于流式作业可能会出现 Python UDF 以及 Stateful Operator。这个阶段当中我们需要注意的是这里并不能肯定 State Operator 一定会用到 management memory因为这跟它的状态类型是相关的。 如果它使用了 RocksDB State Operator是需要使用 manage memory 的但是如果它使用的是 Heap State Backend则并不需要。然而作业在编译的阶段其实并不知道状态的类型这里是需要去注意的地方。 第三个问题对于 batch 的作业我们除了需要清楚有哪些使用场景还需要清楚一件事情就是前面提到过 batch 的 operator。它使用 management memory 是以一种算子独享的方式而不是以 slot 为单位去进行共享。我们需要知道不同的算子应该分别分配多少内存这个事情目前是由 Flink 的计划作业来自动进行设置的。
2.2 执行阶段 第一个步骤是根据 State Backend 的类型去判断是否有 RocksDB。如上图所示比如一个 slot有 ABC 三个算子B 跟 C 都用到了 PythonC 还用到了 Stateful 的 Operator。这种情况下如果是在 heap 的情况下我们走上面的分支整个 slot 当中只有一种在使用就是Python。之后会存在两种使用方式
其中一个是 RocksDB State Backend有了第一步的判断之后第二步我们会根据用户的配置去决定不同使用方式之间怎么样去共享 slot 的 management memory。 在这个 Steaming 的例子当中我们定义的 Python 的权重是 30%State Backend 的权重是 70%。在这样的情况下如果只有 PythonPython 的部分自然是使用 100% 的内存Streaming 的 Heap State Backend 分支 而对于第二种情况Streaming 的 RocksDB State Backend 分支B、C 的这两个 Operator 共用 30% 的内存用于 Python 的 UDF另外 C 再独享 70% 的内存用于 RocksDB State Backend。最后 Flink 会根据 Task manager 的资源配置一个 slot 当中有多少 manager memory 来决定每个 operator 实际可以用的内存的数量。批处理的情况跟流的情况有两个不同的地方首先它不需要去判断 State Backend 的类型这是一个简化 其次对于 batch 的算子上文提到每一个算子有自己独享的资源的预算这种情况下我们会去根据使用率算出不同的使用场景需要多少的 Shared 之后还要把比例进一步的细分到每个 Operator。
3. 参数配置
配置参数默认值备注大小taskmanager.memory.managed.size/绝对大小权重taskmanager.memory.managed.fraction0.4相对大小占用Flink总内存比例taskmanager.memory.managed.consumer-weightDATAPROC:70,PYTHON:30多种用途并存时候分配权重
上方图表展示了我们需要的是 managermemory 大小有两种配置方式
一种是绝对值的配置方式还有一种是作为 Task Manager 总内存的一个相对值的配置方式。
taskmanager.memory.managed.consumer-weight 是一个新加的配置项它的数据类型是 map 的类型也就是说我们在这里面实际上是给了一个 key 冒号 value然后逗号再加上下一组 key 冒号 value 的这样的一个数据的结构。这里面我们目前支持两种 consumer 的 key
一个是 DATAPROC DATAPROC 既包含了流处理当中的状态后端 State Backend 的内存也包含了批处理当中的 Batch Operator另外一种是 Python。
二、 资源调度
部分资源调度相关的 Feature 是其他版本或者邮件列表里面大家询问较多的这里我们也做对应的介绍。
1. 最大 Slot 数 Flink 在 1.12 支持了最大 slot 数的一个限制slotmanager.number-of-slots.max在之前我们也有提到过对于流式作业我们要求所有的 operator 同时执行起来才能够保证数据的顺畅的运行。在这种情况下作业的并发度决定了我们的任务需要多少个 slot 和资源去执行作业。
然而对于批处理其实并不是这样的批处理作业往往可以有一个很大的并发度但实际并不需要这么多的资源批处理用很少的资源跑完前面的任务腾出 Slot 给后续的任务使用。通过这种串行的方式去执行任务能避免 YARN/K8s 集群的资源过多的占用。目前这个参数支持在 yarn/mesos/native k8 使用。
2. TaskManager 容错
在我们实际生产中有可能会有程序的错误、网络的抖动、硬件的故障等问题造成 TaskManager 无法连接甚至直接挂掉。我们在日志中常见的就是 TaskManagerLost 这样的报错。对于这种情况需要进行作业重启在重启的过程中需要重新申请资源和重启 TaskManager 进程这种性能消耗代价是非常高昂的。
对于稳定性要求相对比较高的作业Flink1.12 提供了一个新的 feature能够支持在 Flink 集群当中始终持有少量的冗余的 TaskManager这些冗余的 TaskManager 可以用于在单点故障的时候快速的去恢复而不需要等待一个重新的资源申请的过程。 通过配置 slotmanager.redundant-taskmanager-num 可以实现冗余 TaskManager。这里所谓的冗余 TaskManager 并不是完完全全有两个 TaskManager 是空负载运行的而是说相比于我所需要的总共的资源数量会多出两个 TaskManager。
任务可能是相对比较均匀的分布在上面在能够在利用空闲 TaskManager 的同时也能够达到一个相对比较好的负载。 一旦发生故障的时候可以去先把任务快速的调度到现有的还存活的 TaskManager 当中然后再去进行新一轮的资源申请。目前这个参数支持在 yarn/mesos/native k8 使用。
3. 任务平铺分布
任务平铺问题主要出现在 Flink Standalone 模式下或者是比较旧版本的 k8s 模式部署下的。在这种模式下因为事先定义好了有多少个 TaskManager每个 TaskManager 上有多少 slot这样会导致经常出现调度不均的问题可能部分 manager 放的任务很满有的则放的比较松散。
在 1.11 的版本当中引入了参数 cluster.evenly-spread-out-slots这样的参数能够控制它去进行一个相对比较均衡的调度。 注意
第一这个参数我们只针对 Standalone 模式因为在 yarn 跟 k8s 的模式下实际上是根据你作业的需求来决定起多少 task manager 的所以是先有了需求再有 TaskManager而不是先有 task manager再有 slot 的调度需求。 在每次调度任务的时候实际上只能看到当前注册上来的那一个 TaskManagerFlink 没办法全局的知道后面还有多少 TaskManager 会注册上来这也是很多人在问的一个问题就是为什么特性打开了之后好像并没有起到一个很好的效果这是第一件事情。 第二个需要注意的点是这里面我们只能决定每一个 TaskManager 上有多少空闲 slot然而并不能够决定每个 operator 有不同的并发数Flink 并不能决定说每个 operator 是否在 TaskManager 上是一个均匀的分布因为在 flink 的资源调度逻辑当中在整个 slot 的 allocation 这一层是完全看不到 task 的。
三、扩展资源框架
1. 背景
近年来随着人工智能领域的不断发展深度学习模型已经被应用到了各种各样的生产需求中比较典型的场景如推荐系统广告推送智能风险控制。这些也是 Flink 一直以来被广泛使用的场景因此支持人工智能一直以来都是 Flink 社区的长远目标之一。针对这个目标目前已经有了很多第三方的开源扩展工作。由阿里巴巴开源的工作主要有两个
一个是 Flink AI Extended 的项目是基于 Flink 的深度学习扩展框架目前支持 TensorFlow、PyTorch 等框架的集成它使用户可以将 TensorFlow 当做一个算子放在 Flink 任务中。另一个是 Alink它是一个基于 Flink 的通用算法平台里面也内置了很多常用的机器学习算法。
以上的两个工作都是从功能性上对 Flink 进行扩展然而从算力的角度上讲深度学习模型亦或机器学习算法通常都是整个任务的计算瓶颈所在。GPU 则是这个领域被广泛使用用来加速训练或者预测的资源。因此支持 GPU 资源来加速计算是 Flink 在 AI 领域的发展过程中必不可少的功能。
2. 使用扩展资源
目前 Flink 支持用户配置的资源维度只有 CPU 与内存而在实际使用中不仅是 GPU我们还会遇到其他资源需求如 SSD 或 RDMA 等网络加速设备。因此我们希望提供一个通用的扩展资源框架任何扩展资源都可以以插件的形式来加入这个框架GPU 只是其中的一种扩展资源。
对于扩展资源的使用可以抽象出两个通用需求
需要支持该类扩展资源的配置与调度。用户可以在配置中指明对这类扩展资源的需求如每个 TaskManager 上需要有一块 GPU 卡并且当 Flink 被部署在 Kubernetes/Yarn 这类资源底座上时需要将用户对扩展资源的需求进行转发以保证申请到的 Container/Pod 中存在对应的扩展资源。需要向算子提供运行时的扩展资源信息。用户在自定义算子中可能需要一些运行时的信息才能使用扩展资源以 GPU 为例算子需要知道它内部的模型可以部署在那一块 GPU 卡上因此需要向算子提供这些信息。
3. 扩展资源框架使用方法
使用资源框架我们可以分为以下这 3 个步骤
首先为该扩展资源设置相关配置然后为所需的扩展资源准备扩展资源框架中的插件最后在算子中从 RuntimeContext 来获取扩展资源的信息并使用这些资源
3.1 配置参数
# 定义扩展资源名称“gpu”
external-resources: gpu
# 定义每个 TaskManager 所需的 GPU 数量
external-resource.gpu.amount: 1
# 定义Yarn或Kubernetes中扩展资源的配置键
external-resource.gpu.yarn.config-key: yarn.io/gpu
external-resource.gpu.kubernetes.config-key: nvidia.com/gpu
# 定义插件 GPUDriver 的工厂类。
external-resource.gpu.driver-factory.class:
org.apache.flink.externalresource.gpu.GPUDriverFactory
以上是使用 GPU 资源的配置示例
对于任何扩展资源用户首先需要将它的名称加入 external-resources 中这个名称也会被用作该扩展资源其他相关配置的前缀来使用。示例中我们定义了一种名为 gpu 的资源。在调度层目前支持用户在 TaskManager 的粒度来配置扩展资源需求。示例中我们定义每个 TaskManager 上的 GPU 设备数为 1。将 Flink 部署在 Kubernetes 或是 Yarn 上时我们需要配置扩展资源在对应的资源底座上的配置键以便 Flink 对资源需求进行转发。示例中展示了 GPU 对应的配置。如果提供了插件则需要将插件的工厂类名放入配置中。
3.2 前置准备
在实际使用扩展资源前还需要做一些前置准备工作以 GPU 为例
在 Standalone 模式下集群管理员需要保证 GPU 资源对 TaskManager 进程可见。在 Kubernetes 模式下需要集群支持 Device Plugin[6]对应的 Kubernetes 版本为 1.10并且在集群中安装了 GPU 对应的插件。在 Yarn 模式下GPU 调度需要集群 Hadoop 版本在 2.10 或 3.1 以上并正确配置了 resource-types.xml 等文件。
3.3 扩展资源框架插件
完成了对扩展资源的调度后用户自定义算子可能还需要运行时扩展资源的信息才能使用它。扩展资源框架中的插件负责完成该信息的获取它的接口如下
public interface ExternalResourceDriverFactory {/*** 根据提供的设置创建扩展资源的Driver*/ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
}public interface ExternalResourceDriver {/*** 获取所需数量的扩展资源信息*/Set? extends ExternalResourceInfo retrieveResourceInfo(long amount) throws Exception;
}
ExternalResourceDriver 会在各个 TaskManager 上启动扩展资源框架会调用各个 Driver 的 retrieveResourceInfo 接口来获得 TaskManager 上的扩展资源信息并将得到的信息传到算子的 RuntimeContext。ExternalResourceDriverFactory 则为插件的工厂类。
4. GPU 插件
Flink 目前内置了针对 GPU 资源的插件其内部通过执行名为 Discovery Script 的脚本来获取当前环境可用的 GPU 信息目前信息中包含了 GPU 设备的 Index。
Flink 提供了一个默认脚本位于项目的 plugins/external-resource-gpu/ 目录用户也可以实现自定义的 Discovery Script 并通过配置来指定使用自定义脚本。该脚本与 GPU 插件的协议为
当调用脚本时所需要的 GPU 数量将作为第一个参数输入之后为用户自定义参数列表。若脚本执行正常则输出 GPU Index 列表以逗号分隔。若脚本出错或执行结果不符合预期则脚本以非零值退出这会导致 TaskManager 初始化失败并在日志中打印脚本的错误信息。
Flink 提供的默认脚本是通过 nvidia-smi 工具来获取当前的机器中可用的 GPU 数量以及 index并根据所需要的 GPU 数量返回对应数量的 GPU Index 列表。当无法获取到所需数量的 GPU 时脚本将以非零值退出。
GPU 设备的资源分为两个维度流处理器与显存其显存资源只支持独占使用。因此当多个 TaskManager 运行在同一台机器上时若一块 GPU 被多个进程使用可能导致其显存 OOM。因此Standalone 模式下需要 TaskManager 级别的资源隔离机制。
默认脚本提供了 Coordination Mode 来支持单机中多个 TaskManager 进程之间的 GPU 资源隔离。该模式通过使用文件锁来实现多进程间 GPU 使用信息同步协调同一台机器上多个 TaskManager 进程对 GPU 资源的使用。
5. 在算子中获取扩展资源信息
在用户自定义算子中可使用在 external-resources 中定义的资源名称来调用 RuntimeContext 的 getExternalResourceInfos 接口获取对应扩展资源的信息。以 GPU 为例得到的每个 ExternalResourceInfo 代表一块 GPU 卡而其中包含名为 index 的字段代表该 GPU 卡的设备 Index。
public class ExternalResourceMapFunction extends RichMapFunctionString, String {private static finalRESOURCE_NAMEgpu;Overridepublic String map(String value) {SetExternalResourceInfo gpuInfos getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME);ListString indexes gpuInfos.stream().map(gpuInfo - gpuInfo.getProperty(index).get()).collect(Collectors.toList());// Map function with GPU// ... }
}
6. MNIST Demo
下图以 MNIST 数据集的识别任务来演示使用 GPU 加速 Flink 作业。 MNIST 如上图所示为手写数字图片数据集每个图片可表示为为 28*28 的矩阵。在该任务中我们使用预训练好的 DNN 模型图片输入经过一层全连接网络得到一个 10 维向量该向量最大元素的下标即为识别结果。
我们在一台拥有两块 GPU 卡的 ECS 上启动一个有两个 TaskManager 进程的 Standalone 集群。借助默认脚本提供的 Coordination Mode 功能我们可以保证每个 TaskManager 各使用其中一块 GPU 卡。
该任务的核心算子为图像识别函数 MNISTClassifier核心实现如下所示
class MNISTClassifier extends RichMapFunctionListFloat, Integer {Overridepublic void open(Configuration parameters) {//获取GPU信息并且选择第一块GPUSetExternalResourceInfo externalResourceInfos getRuntimeContext().getExternalResourceInfos(resourceName);final OptionalString firstIndexOptional externalResourceInfos.iterator().next().getProperty(index);// 使用第一块GPU的index初始化JCUDA组件JCuda.cudaSetDevice(Integer.parseInt(firstIndexOptional.get()));JCublas.cublasInit();}
}
在 Open 方法中从 RuntimeContext 获取当前 TaskManager 可用的 GPU并选择第一块来初始化 JCuda 以及 JCublas 库。
class MNISTClassifier extends RichMapFunctionListFloat, Integer {Overridepublic Integer map(ListFloat value) {// 使用Jucblas做矩阵算法JCublas.cublasSgemv(n, DIMENSIONS.f1, DIMENSIONS.f0, 1.0f,matrixPointer, DIMENSIONS.f1, inputPointer, 1, 0.0f, outputPointer, 1);// 获得乘法结果并得出该图所表示的数字JCublas.cublasGetVector(DIMENSIONS.f1, Sizeof.FLOAT, outputPointer, 1, Pointer.to(output), 1);JCublas.cublasFree(inputPointer);JCublas.cublasFree(outputPointer);int result 0;for (int i 0; i DIMENSIONS.f1; i) {result output[i] output[result] ? i : result;}return result;}
}
在 Map 方法中将预先训练好的模型参数与输入矩阵放入 GPU 显存使用 JCublas 进行 GPU 中的矩阵乘法运算最后将结果向量从 GPU 显存中取出并得到识别结果数字。
具体案例演示流程可以前往观看视频或者参考 Github 上面的链接动手尝试。
四、未来计划
除了上文介绍的这些已经发布的特性外Apache Flink 社区也正在积极准备更多资源管理方面的优化特性在未来的版本中将陆续和大家见面。
被动资源调度模式托管内存使得 Flink 任务可以灵活地适配不同的 TaskManager/Slot 资源充分利用可用资源为计算任务提供给定资源限制下的最佳算力。但用户仍需指定计算任务的并行度Flink 需要申请到满足该并行度数量的 TaskManager/Slot 才能顺利执行。被动资源调度将使 Flink 能够根据可用资源动态改变并行度在资源不足时能够 best effort 进行数据处理同时在资源充足时恢复到指定的并行度保障处理性能。细粒度资源管理Flink 目前基于 Slot 的资源管理与调度机制认为所有的 Slot 都具有相同的规格。对于一些复杂的规模化生产任务往往需要将计算任务拆分成多个子图每个子图单独使用一个 Slot 执行。当子图间的资源需求差异较大时使用相同规格的 Slot 往往难以满足资源效率方面的需求特别是对于 GPU 这类成本较高的扩展资源。细粒度资源管理允许用户为作业的子图指定资源需求Flink 会根据资源需求使用不同规格的 TaskManager/Slot 执行计算任务从而优化资源效率。
五、总结
通过文章的介绍相信大家对 Flink 内存管理有了更加清晰的认知。
首先从本地内存、Job Graph 编译阶段、执行阶段来解答每个流程的内存管理以及内存分配细节通过新的参数配置控制 TaskManager的内存分配然后从大家平时遇到资源调度相关问题包括最大 Slot 数使用如何进行 TaskManager 进行容错任务如何通过任务平铺均摊任务资源最后在机器学习和深度学习领域常常用到 GPU 进行加速计算通过解释 Flink 在 1.12 版本如何使用扩展资源框架和演示 Demo 给我们展示了资源扩展的使用。再针对资源利用率方面提出 2 个社区未来正在做的计划包括被动资源模式和细粒度的资源管理。
原文链接
本文为阿里云原创内容未经允许不得转载。