Flink 的网络栈在相互通信时(例如在 keyBy() 要求的网络混洗期间)对子任务的逻辑视图如下所示。
它是以下三个概念的不同设置的抽象:
子任务输出类型(ResultPartitionType):
流水线的(有界的或无界的): 一旦产生数据就向下游发送,可能是逐个发送的,有界或无界的记录流。
阻塞: 仅在生成完整结果时向下游发送数据。
调度类型:
一次性(即刻): 同时部署一个作业的所有子任务(用于流应用)。
第一个输出的下一阶段(懒惰): 任一生产者生成输出后就部署下游任务。
完整输出的下一阶段: 任一或所有生产者生成完整输出后部署下游任务。
传输:
高吞吐量:Flink 不会逐个发送记录,而是将一堆记录缓冲到网络缓冲区中并同时发送它们。这会降低每个记录的开销并带来更高的吞吐量。
减少缓冲超时实现低延迟: 缓冲区未填满就发送记录,从而牺牲吞吐量来换取低延迟。
下面详细说明输出和调度类型。首先要知道,子任务输出类型和调度类型是紧密相关的,只有特定的组合才能正常工作。
流水线式结果分区是流式输出,需要向实时子任务目标发送数据。可以在生成结果之前或首次输出时安排目标。批处理作业生成有界结果分区,而流式处理作业生成无界结果。
批处理作业也可能以阻塞方式生成结果,具体取决于所使用的运算符和连接模式。在这种情况下必须先生成完整的结果,然后才能安排接收任务。这样一来批处理作业的效率会更高,需要的资源更少。
下表总结了所有有效组合:
输出类型 | 调度类型 | 应用到…… |
---|---|---|
流水线,无界 | 一次性 | 流式作业 |
第一个输出的下一阶段 | N/A(1) | |
流水线,有界 | 一次性 | N/A(2) |
第一个输出的下一阶段 | 批处理作业 | |
阻塞 | 完整输出的下一阶段 | 批处理作业 |
目前 Flink 尚未使用。
批量 / 流式统一完成后可能对流式作业可用。
此外,对于具有多个输入的子任务而言调度以两种方式启动:在 所有输入 或 任一输入 的生产者生成记录 / 其完整数据集之后启动。要调整批处理作业中的输出类型和调度决策,请查看 ExecutionConfig #setExecutionMode(),特别是 ExecutionMode 以及 ExecutionConfig #setDefaultInputDependencyConstraint()。
先回忆一下,在 Flink 中不同的任务可以通过插槽共享组(https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/operators/#task-chaining-and-resource-groups)共享相同的插槽。TaskManager 还可以提供多个插槽来将同一任务的多个子任务安排到同一个 TaskManager 上。
在下图的示例中,我们假设并行度为 4,部署中有两个 TaskManager,各有两个插槽。TaskManager 1 执行子任务 A.1、A.2、B.1 和 B.2,TaskManager 2 执行子任务 A.3、A.4、B.3 和 B.4。在任务 A 和任务 B 之间的混洗类型连接(例如通过 keyBy())中,在每个 TaskManager 上有 2x4 个逻辑连接,其中一些是本地的,一些是远程的:
B.1/B.2 | B.3/B.4 | |
---|---|---|
A.1/A.2 | 本地 | 远程 |
A.3/A.4 | 远程 | 本地 |
不同任务之间的每个(远程)网络连接将在 Flink 的网络栈中获得自己的 TCP 通道。但是,如果同一任务的不同子任务被安排到了同一个 TaskManager,则它们与同一个 TaskManager 的网络连接将被多路复用,并共享一个 TCP 信道以减少资源占用。在这个示例中是 A.1→B.3、A.1→B.4 以及 A.2→B.3 和 A.2→B.4 的情况,如下图所示:
每个子任务的结果称为结果分区(ResultPartition),每个结果拆分到单独的子结果分区(ResultSubpartitions)中——每个逻辑通道有一个。在堆栈的这一部分中,Flink 不再处理单个记录,而是将一组序列化记录组装到网络缓冲区中。每个子任务可用于其自身的本地缓冲池中的缓冲区数量(每次发送方和接收方各一个)上限符合下列规则:
#channels * buffers-per-channel + floating-buffers-per-gate
单个 TaskManager 上的缓冲区总数通常不需要配置。需要配置时请参阅配置网络缓冲区文档(https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/config.html#configuring-the-network-buffers)。
每当子任务的发送缓冲池耗尽时——也就是缓存驻留在结果子分区的缓存队列中或更底层的基于 Netty 的网络栈中时——生产者就被阻塞了,无法继续工作,并承受背压。接收器也是类似:较底层网络栈中传入的 Netty 缓存需要通过网络缓冲区提供给 Flink。如果相应子任务的缓冲池中没有可用的网络缓存,Flink 将在缓存可用前停止从该通道读取。这将对这部分多路传输链路发送的所有子任务造成背压,因此也限制了其他接收子任务。下图中子任务 B.4 过载了,它会对这条多路传输链路造成背压,还会阻止子任务 B.3 接收和处理新的缓存。
为了防止这种情况发生,Flink 1.5 引入了自己的流量控制机制。
基于信用的流量控制可确保“线上”的任何内容都能被接收器处理。它是 Flink 原有机制的自然拓展,基于网络缓冲区的可用性实现。每个远程输入通道现在都有自己的一组 独占缓冲区,而非使用共享的本地缓冲池。而本地缓冲池中的缓存称为 浮动缓存,因为它们会浮动并可用于所有输入通道。
接收器将缓存的可用性声明为发送方的 信用(1 缓存 = 1 信用)。每个结果子分区将跟踪其 通道信用值。如果信用可用,则缓存仅转发到较底层的网络栈,并且发送的每个缓存都会让信用值减去一。除了缓存外,我们还发送有关当前 backlog 大小的信息,从而指定在此子分区的队列中等待的缓存数量。接收器将使用它来请求适当数量的浮动缓冲区,以便更快处理 backlog。它将尝试获取与 backlog 大小一样多的浮动缓冲区,但有时并不会如意,可能只获取一点甚至获取不到缓冲。接收器将使用检索到的缓存,并将继续监听可用的缓存。
基于信用的流量控制将使用每通道缓冲区来指定本地缓冲池(可选(3))的独占(强制)缓存数和每个门的浮动缓冲区,从而实现与没有流量控制时相同的缓冲区上限。这两个参数的默认值会使流量控制的最大(理论)吞吐量至少与没有流量控制时一样高,前提是网络的延迟处于一般水平上。你可能需要根据实际的网络延迟和带宽来调整这些参数。
如果没有足够的缓存,每个缓冲池将从全局可用缓冲池中获取相同份额(±1)。
相比没有流量控制的接收器的背压机制,信用机制提供了更直接的控制逻辑:如果接收器能力不足,其可用信用将减到 0,并阻止发送方将缓存转发到较底层的网络栈上。这样只在这个逻辑信道上存在背压,并且不需要阻止从多路复用 TCP 信道读取内容。因此,其他接收器在处理可用缓存时就不受影响了。
通过流量控制,多路复用链路中的信道就不会阻塞链路中的另一个逻辑信道,提升了整体资源利用率。此外,我们还能通过完全控制“在线”数据的数量来改善检查点对齐情况:如果没有流量控制,通道需要一段时间才能填满网络堆栈的内部缓冲区,并广播接收器已经停止读取的消息。这段时间里会多出很多缓存。所有检查点障碍都必须在这些缓存后面排队,因此必须等到所有这些缓存处理完毕后才能启动(“障碍永远不会越过记录!”)。
但是,来自接收器的附加通告消息可能会产生一些额外开销,尤其是在使用 SSL 加密通道的设置中更是如此。此外,单个输入通道不能使用缓冲池中的所有缓存,因为独占缓存不能共享。它也不能立即开始发送尽可能多的数据,所以在加速期间(生成数据的速度比计算信用的速度更快时)可能需要更长时间才能发送数据。虽然这可能会影响你的作业性能,但这些代价相比收益来说还是值得的。你可能希望通过每个通道的缓冲区增加独占缓存的数量,但代价是使用更多内存。但与之前的实现相比总体内存占用可能还是要少一些,因为较底层的网络栈不再需要缓存大量数据了,我们总是可以立即将其传输到 Flink 中。
还有一件事要注意:由于我们在发送方和接收方之间缓存的数据更少了,你可能会更早地遇到背压。但这也在预料之中,而且缓存的数据再多也没什么用。如果你想要缓存更多数据,同时还要有流量控制,可以考虑通过每个门的浮动缓冲区来提升浮动缓存的数量。
优势
通过多路复用连接中的数据倾斜提升资源利用率
改善了检查点对齐
减少内存占用(较底层网络层中的数据更少)
劣势
额外的信用通知消息
额外的 backlog 通知消息(缓存消息附带,几乎没有开销)
潜在的往返延迟
背压出现得更早
注意:如果你需要关闭基于信用的流量控制,可以将下列代码添加到 flink-conf.yaml:taskmanager.network.credit-model: false。但此参数已弃用,最终将与不基于信用的流控制代码一起被移除。
下面的视图比之前的级别更高一些,其中包含网络栈及其周围组件的更多详细信息:
一个记录被创建并传递之后(例如通过 Collector #colle()),它会被递交到 RecordWriter,其将来自 Java 对象的记录序列化为一个字节序列,后者最终成为网络缓存,然后像前文提到的那样被处理。RecordWriter 首先使用 SpanningRecordSerializer 将记录序列化为一个灵活的堆上字节数组。然后,它尝试将这些字节写入目标网络通道的关联网络缓存。
在接收方,较底层的网络栈(netty)将接收到的缓存写入适当的输入通道。最后(流式)任务的线程从这些队列中读取并尝试在 RecordReader 的帮助下,通过 SpillingAdaptiveSpanningRecordDeserializer 将积累的数据反序列化为 Java 对象。与序列化器类似,这个反序列化器还必须处理特殊情况,例如跨越多个网络缓冲区的记录——这可能是因为记录大于网络缓冲区(默认为 32KiB,通过 taskmanager.memory.segment-size 设置);或者是因为序列化记录被添加到了没有足够剩余空间的网络缓冲区中。不管怎样,Flink 将使用这些数据,并继续将剩余数据写入新的网络缓冲区。
在上图中,基于信用的流量控制机制实际上位于“Netty 服务器”(和“Netty 客户端”)组件内部,RecordWriter 写入的缓存始终以空状态添加到结果子分区中,然后逐渐填满(序列化)记录。但是什么时候 Netty 真的得到了缓存呢?显然,只要它们可用时就不能接收数据了,因为这不仅会因为跨线程通信和同步而增加大量成本,而且还会让整个缓存都过时。
在 Flink 中,有三种情况下 Netty 服务器可以消费缓存:
写入记录时缓冲区变满
缓存超时命中
发送特殊事件,例如检查点障碍
RecordWriter 与本地序列化缓冲区一起使用当前记录,并将这些数据逐渐写入位于相应结果子分区队列的一个或多个网络缓冲区。虽然 RecordWriter 可以处理多个子分区,但每个子分区只有一个 RecordWriter 向其写入数据。另一方面,Netty 服务器正在从多个结果子分区读取并将适当的分区复用到单个信道中,如上所述。这是一个典型的生产者——消费者模式,网络缓冲区位于中间位置,如下图所示。在(1)序列化和(2)将数据写入缓冲区之后,RecordWriter 相应地更新缓冲区的写入器索引。一旦缓冲区被完全填满,记录写入器将(3)从其本地缓冲池中获取当前记录(或下一个记录)的所有剩余数据生成新的缓存,并将新的缓存添加到子分区队列。这将(4)通知 Netty 服务器还有数据可用(注 4)。每当 Netty 有能力处理此通知时,它将(5)获取缓存并沿适当的 TCP 通道发送它。
如果队列中有更多处理完的缓存,我们可以假设 Netty 已经收到了通知
为了降低延迟,我们不能在缓冲区填满之后才向下游发送数据。有些情况下某个通信信道没有流过那么多记录,这样会带来无意义的延迟。为此,一个名为输出刷新器的定期进程将刷新堆栈中可用的任何数据。可以通过 StreamExecutionEnvironment#setBufferTimeout 配置周期间隔,这个间隔对于低吞吐量通道来说就是延迟上限(注 5)。下图显示了它与其他组件的交互方式:RecordWriter 还是会序列化并写入网络缓冲区,但同时,如果 Netty 服务器尚未知晓,输出刷新器可以(3,4)通知 Netty 服务器有数据可用(类似上面的“缓冲区已满”场景)。当 Netty 处理此通知(5)时,它将使用缓冲区中的可用数据并更新缓冲区的读取器索引。缓存保留在队列中——从 Netty 服务器端对此缓存做进一步操作后,将在下次继续读取读取器索引。
严格来说,输出刷新器没法给出任何保证——它只会向 Netty 发送通知而已,后者是否响应通知则要取决于其意愿和能力。这也意味着如果通道在经受背压,输出刷新器就没用了。
某些特殊事件如果通过 RecordWriter 发送,也会触发立即刷新。最重要的特殊事件是检查点障碍或分区结束事件,显然它们应该快速执行,而不是等待输出刷新器启动。
相比 Flink 1.5 之前的版本,请注意(a)网络缓冲区现在直接放在子分区队列中,(b)我们不会在每次刷新时关闭缓冲区。这也带来了一些好处:
同步开销较少(输出刷新和 RecordWriter 是各自独立的)
在高负载场景中,当 Netty 是瓶颈时(因为背压或直接原因),我们仍然可以在不完整的缓冲区中积累数据
Netty 通知明显减少
但在低负载情况下 CPU 使用率和 TCP 包速率可能会增加。这是因为新版 Flink 将使用所有可用的 CPU 周期来维持所需的延迟。当负载增加时它将通过填充更多的缓冲区来自我调整。由于同步开销减少了,高负载场景不会受到影响,甚至可以获得更大的吞吐量。
如果你想更深入地了解如何在 Flink 中实现生产者——消费者机制,请仔细查看 Flink 1.5 中引入的 BufferBuilder 和 BufferConsumer 类。虽然读取可能是按缓存逐个进行的,但写入是按记录进行的这样 Flink 中的所有网络通信都走热路径。因此,我们非常清楚我们需要在任务的线程和 Netty 线程之间建立轻量连接,这不会导致过多的同步开销。详细信息可以参阅源代码(https://github.com/apache/flink/tree/release-1.8/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer)。
引入网络缓冲区能获得更高的资源利用率和吞吐量,代价是让一些记录在缓冲区中等待一段时间。虽然可以通过缓冲区超时设置来限制这个延迟,但你很可能想要知道延迟和吞吐量之间的权衡关系——显然它们不可兼得。下图显示了缓冲区超时设置的不同值——从 0 开始(每个记录都刷新)到 100 毫秒(默认值)——以及在有 100 个节点,每个节点 8 个插槽各运行一个作业的集群上对应的吞吐量;作业没有业务逻辑,只用来测试网络栈。为了对比,我们还加入了 Flink 1.4 版本的情况。
如你所见,使用 Flink 1.5+ 版本时即使是非常低的缓冲区超时(例如 1ms,适合低延迟场景)也设置也只比默认超时设置高出最多 75%的吞吐量。
现在你了解了结果分区、批处理和流式传输的各种网络连接和调度类型。你还了解了基于信用的流量控制以及网络栈的内部工作机制,知道怎样调整网络相关的参数,知道怎样判断某些作业行为。本系列的后续文章将基于这些知识探讨更多操作细节,包括需要查看的相关指标、进一步的网络栈调整以及要避免的常见反模式。敬请期待。
原文链接:
https://flink.apache.org/2019/06/05/flink-network-stack.html
你也「在看」吗?👇