Flink从入门到放弃之源码解析系列-第8章 任务调度与负载均衡

Flink从入门到放弃之源码解析系列
- Flink组件和逻辑计划
- Flink执行计划生成
- JobManager中的基本组件(1)
- JobManager中的基本组件(2)
- JobManager中的基本组件(3)
- TaskManager
- 算子
- 网络
- 水印WaterMark
- CheckPoint
- 任务调度与负载均衡
- 异常处理
- Alibaba Blink新特性
1前言
前面已经介绍了一系列的 flink 任务抽象、网络传输、可靠性机制等细节,有了这些铺垫,终于可以开心的介绍 flink 的任务调度机制了,也是不易^_^
因为没有这些铺垫,就无法明白 flink 为什么要设计这样的一套调度机制!
2资源组
资源组模型
flink 的一个 Instance 可以被划分出多个 Slot,通过初始参数可以指定,他们既可以是 SimpleSlot,也可以是同时跑多个 task 的 SharedSlot,为了约束 task 之间的运行时的绑定关系,flink 抽象出了 SlotSharingGroup 和 CoLocationGroup 的概念。
一个 SlotSharingGroup 规定了一个 Job 的 DAG 图中的哪些 JobVertex 的 sub task 可以部署到一个 SharedSlot 上,这是一个软限制,并不是一定会满足,只是调度的时候有位置偏好,而 CoLocationGroup 是在 SlotSharingGroup 的基础上的硬限制,它限定了 CoLocationGroup 中的 JobVertex 中的 sub task 运行必须是一一对应的:假如 CoLocationGrou 限定了 JobVertex A 和 B ,那么 A 的编号为 i 的 sub task 必须和 B 的编号为 i 的 sub task 跑在一起。假如一个 job 的运算逻辑包括 source -> head -> tail -> sink,那么它的 task 运行时限制关系见下图:

资源组
SlotSharingGroup
上面已经提到 SlotSharingGroup 具有绑定 JobVertex 的 sub task 运行的作用,用户可以自己为 JobVertex 定义一个 SlotSharingGroup,如果不定义的话使用名为 default 的 SlotSharingGroup,定义的接口如下:
someStream.filter(...).slotSharingGroup("name");
ColocationGroup
ColocationGroup 通过 CoLocationConstraint 来管理一个 SharedSlot 上的 sub task
用户同样可以通过 api 定义 ColocationGroup:
资源Slot
一个 TaskManager 在初始化时可以指定自己最大持有的 Slot 数,包括 SharedSlot 和 SimpleSlot。
flink 使用 slot 作为资源抽象【主要是 cpu 和 memory】,一个 Instance 可以持有多个 SharedSlot,一个 SharedSlot 可以并行执行多个 sub task,对于 PIPELINED 来说,一种典型的模式就是一个 SharedSlot 同时执行一个 job 每个 JobVertex 上的一个并行 task,这样不仅可以尽量保证每个 Instance 上的任务负载尽量均匀,也能最大化的利用 PIPELINED 的流水线处理特性优化网络传输。
flink 的 slot 有两种:SharedSlot 和 SimpleSlot,前者可以绑定执行多个 sub task,后者代表一个 task 的资源占用。
SharedSlot
一个 SharedSlot 可以拥有多个 SimpleSlot,也可以包含嵌套的 SharedSlot【ColocationConstraint】,这样便形成了树形结构,SimpleSlot 和 SharedSlot 继承自共同的接口:Slot,它们都包含如下的关键信息:
- jobID:被哪个 job 占有
- groupID:属于哪个 SlotSharingGroup
- instance:属于哪个 TaskManager,或者属于哪个物理节点
- status:当前分配状态,共有四种状态 ALLOCATED_AND_ALIVE、CANCELLED、RELEASED、unknown
只有定义了 SlotSharingGroup 时才会通过 SharedSlot 来绑定 sub task 的执行
SimpleSlot
SimpleSlot 是执行单个 task 的 slot 抽象,它既可以在 TaskManager 上独立存在,也可以作为 SharedSlot 的子节点,内部封装了一个 task 的一次 Execution
继承关系下如下图:
SharedSlot 可以视作管理 SimpleSlot 的工具,那么 SharedSlot 自身又由什么方式管理呢?
SlotSharingGroupAssignment
flink 通过抽象 SlotSharingGroupAssignment 来管理 SharedSlot,这里的资源以 JobVertex 微粒度划分 group,也就是一个 JobVertex 占有一个资源 group。
Slot初始划分
SlotSharingGroupAssignment 是如何添加一个初始的 SharedSlot 节点的呢?
//SlotSharingGroupAssignment line174
private SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality, JobVertexID groupId, CoLocationConstraint constraint) {
// sanity checks
if (!sharedSlot.isRootAndEmpty()) {
throw new IllegalArgumentException("The given slot is not an empty root slot.");
}
总结其逻辑:
- 这里必须是一个根节点 SharedSlot【没有父亲节点和子节点】,也就是 TaskManager 被配置的一个 Slot
- 如果没有强制的 task 位置绑定【ColocationConstraint】,从根 SharedSlot 上分配一个 SImpleSlot,编号为递增的 simple slot 个数
- 如果有 ColocationConstraint 限制,为传入的 SharedSlot 生成一个子 SharedSlot 并分配 SimpleSlot 注册到 ColocationConstraint 中
- 如果申请到 slot【simple or shared】,设置位置偏好:LOCAL、NON_LOCAL、UNCONSTRAINED【相对于持有的 Instance 来说】,并且将这个 SharedSlot 加入到其它 JobVertex 的可调度资源队列中,也就是说其它的 JobVertex 都可以讲在这个 SharedSlot 上部署自己的 sub task
为Task分配 SharedSlot
最底层的分配策略:
//SlotSharingGroupAssignment
private Pair<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId,
Iterable<Instance> preferredLocations,
boolean localOnly)
{
// check if there is anything at all in this group assignment
if (allSlots.isEmpty()) {
return null;
}
// get the available slots for the group
Map<Instance, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
总结其逻辑:
- 判断对应的 group 是否有资源,如果没有将 SlotSharingGroupAssignment 目前所有的 SharedSlot 槽位视为可用资源
- 如果对偏好位置有要求,从 group 里筛选是否有满足的 SharedSlot,如果有设置 Locality 为 LOCAL 并返回,同时从该 group 的资源组中移除该 SharedSlot
- 如果对位置有要求但是没有找到符合的 SharedSlot,则从资源组里选择第一个可用的 SharedSlot,并将 Locality 设置为 Locality.NON_LOCA
- 如果对位置没要求,则从资源组里选择第一个可用的 SharedSlot,并将 Locality 设置为 Locality.UNCONSTRAINED
- 如果没资源,返回 null
- 一旦一个 group 中的 SharedSlot 被分出去就会被从资源池中删除,也就是说一个 SharedSlot 不可能分配一个 JobVertex 的两个 sub tasks,这一点非常重要
无 CoLocationConstraint 限制的资源划分策略
主要是从走上面的逻辑,细节这里就不说了
有 CoLocationConstraint 限制的资源划分策略
有 CoLocationConstraint 限制的时候,优先考虑 CoLocationConstraint 中的 SharedSlot【如果之前 CoLocationGroup 中的其它 task 分配过】,如果 CoLocationConstraint 中还没有分配 SharedSlot 则重新分配,并且再分配一个 SharedSlot 子节点,再这个节点上划出 SimpleSlot 供 task 使用
//SlotSharingGroupAssignment
SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<Instance> locationPreferences) {
synchronized (lock) {
if (constraint.isAssignedAndAlive()) {
// the shared slot of the co-location group is initialized and set we allocate a sub-slot
final SharedSlot shared = constraint.getSharedSlot();
SimpleSlot subslot = shared.allocateSubSlot(null);
subslot.setLocality(Locality.LOCAL);
return subslot;
}
else if (constraint.isAssigned()) {
// we had an assignment before.
- 如果之前 CoLocationGroup 中的其它 sub task 有分配过资源,直接复用这个资源,对应 Locality 属性为 Locality.LOCAL
- 如果之前分配过,但是该 SharedSlot 但是却被标记死亡,那么依据之前的 SharedSlot 的所在节点重新分配一次 SharedSlot,再此基础上再分配一个 SharedSlot,后分配 SimpleSlot 返回,对应 Locality 属性为 Locality.LOCAL
- 如果是第一次分配,依据节点的偏好位置为参考,并再此基础上再分配一个 SharedSlot,后分配 SimpleSlot 返回,对应 Locality 属性为 Locality.LOCAL
3调度器
flink 调度器的调度单位被抽象为一个 ScheduledUnit,一个 ScheduledUnit 封装了以下信息:Execution、SlotSharingGroup、CoLocationConstraint
flink 的关于调度的细节全部集成于 Scheduler。
调度细节
首先来明确下 Scheduler 的调度核心:
//Scheduler
private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
if (task == null) {
throw new NullPointerException();
}
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling task " + task);
}
总结其逻辑:
- 先判断该类型的 ExecutionVertex 是否已强制指定执行节点
- 如果有设置 SlotSharingGroup ,拿到对应的 SlotSharingGroupAssignment 和 ColocationConstraint,走上面描述的接口拿到分配的 SimpleSlot A,如果有 ColocationConstraint,会锁定位置,表示已经获取 SharedSlot 并分配完成,如果这个 SimpleSlot 的 Locality 是 LOCAL【预期的位置】,则立即返回,否则走下面的流程
- 如果上面的过程没有获取到 SimpleSlot,那么表示当前已经没有符合要求的 SharedSlot,这时候会重新分配一个新的 SharedSlot,方式是先遍历当前有资源的 Instance 依据偏好位置找到其中一个,并在 SlotSharingGroupAssignment 中注册返回一个新的 SimpleSlot B【细节上面介绍 SlotSharingGroupAssignment 时已说明】
- 如果已走到这一步,比较 A 与 B 的优劣,主要是调度位置是否符合预期的比较,选择更优的,释放掉另一个
- 如果没有 SlotSharingGroup 的约束,直接从 Instance 上申请一个根 SimpleSlot 来执行这个 task
- 以上调度器在分配 SharedSlot 的时候维护了一个队列:instancesWithAvailableResources,每次有 Slot 资源的 Instance 被加入对列尾部,消费过的 Slot 会被 remove,这样可以轮询机器,可以使机器的 SharedSlot 分配尽量均衡
约束信息的生成
影响上面调度预期位置有三个重要因素:SlotSharingGroup、ColocationConstraint、prefferedLocations,我们逐一分析它们的生成逻辑:
SlotSharingGroup
向上追溯我们发现,Scheduler 的调度逻辑由 Execution 触发:
//Execution
public boolean scheduleForExecution(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
if (scheduler == null) {
throw new IllegalArgumentException("Cannot send null Scheduler when scheduling execution.");
}
final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();
final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();
// sanity check
if (locationConstraint != null && sharingGroup == null) {
throw new RuntimeException("Trying to schedule with co-location constraint but without slot sharing allowed.");
}
//StreamingJobGraphGenerator
private void setSlotSharing() {
Map<String, SlotSharingGroup> slotSharingGroups = new HashMap<>();
for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
String slotSharingGroup = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
SlotSharingGroup group = slotSharingGroups.get(slotSharingGroup);
if (group == null) {
group = new SlotSharingGroup();
slotSharingGroups.put(slotSharingGroup, group);
}
entry.getValue().setSlotSharingGroup(group);
}
//StreamGraphGenerator
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
if (specifiedGroup != null) {
return specifiedGroup;
} else {
String inputGroup = null;
for (int id: inputIds) {
String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
if (inputGroup == null) {
inputGroup = inputGroupCandidate;
} else if (!inputGroup.equals(inputGroupCandidate)) {
return "default";
}
}
return inputGroup == null ? "default" : inputGroup;
}
}
总结其逻辑:
- 如果用户没有为 JobVertex 指定 SlotSharingGroup ,则生成名为 'default‘ 的 SlotSharingGroup,否则为每个用户指定的名字定义一个 SlotSharingGroup
- 同一个 SlotSharingGroup 中的节点的 sub task 会共享 SharedSlot 资源
ColocationConstraint
//CoLocationGroup
public CoLocationConstraint getLocationConstraint(int subtask) {
ensureConstraints(subtask + 1);
return constraints.get(subtask);
}
private void ensureConstraints(int num) {
if (constraints == null) {
constraints = new ArrayList<CoLocationConstraint>(num);
} else {
constraints.ensureCapacity(num);
}
if (num > constraints.size()) {
constraints.ensureCapacity(num);
for (int i = constraints.size(); i < num; i++) {
constraints.add(new CoLocationConstraint(this));
}
}
}
总结其逻辑:
- CoLocationGroup 的生成逻辑和 SlotSharingGroup 类似,不过这个没有默认,需要用户手动指定,并且前提是需要有 SlotSharingGroup
- 从 CoLocationGroup 内 sub task 的 index 获取一个 ColocationConstraint,这样便实现了一一对应关系
prefferedLocations
这是调度位置信息的关键,不管是 CoLocationGroup 还是 SlotSharingGroup,都会优先参考节点偏好来申请资源,那么 flink 是依据什么信息来生成偏好位置的呢?
//ExecutionVertex
public Iterable<Instance> getPreferredLocations() {
// if we have hard location constraints, use those
List<Instance> constraintInstances = this.locationConstraintInstances;
if (constraintInstances != null && !constraintInstances.isEmpty()) {
return constraintInstances;
}
// otherwise, base the preferred locations on the input connections
if (inputEdges == null) {
return Collections.emptySet();
}
总结其逻辑:
- flink 计算节点的上游所有生产者所在节点,并作为自己的偏好位置
- 清空之前旧的偏好位置
这样就形成了最开始【资源组模型】一节中的调度模式,因为一开始的 source task 显然没有 prefferedLocations,由调度细节可以知道 flink 会轮询集群的不同 Instance,将 source task 分配在这些机器上,后面的 source task 的 consumer task 会优先调度到 source task 的节点上,这样便形成了一开始的调度模式!
触发调度
- 第一次提交任务时
- 有新的 Instance 或者新的 Slot 获取时,会轮询排对的调度任务进行调度
调度流程
这里只介绍 streaming 的流程,批处理类似,有兴趣的童鞋自行研究!
先梳理代码逻辑:
//ExecutionGraph
//schedule from source JobVertex first
public void scheduleForExecution(Scheduler scheduler) throws JobException {
if (scheduler == null) {
throw new IllegalArgumentException("Scheduler must not be null.");
}
if (this.scheduler != null && this.scheduler != scheduler) {
throw new IllegalArgumentException("Cannot use different schedulers for the same job");
}
if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
this.scheduler = scheduler;
switch (scheduleMode) {
case FROM_SOURCES:
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(scheduler, allowQueuedScheduling);
}
}
break;
//NetworkEnvironment
//For PIPELINED, eagerly notify is true, when source tasks is deployed and registered in NetworkEnvironment, will trigger consumer task to deploy
for (ResultPartition partition : producedPartitions) {
// Eagerly notify consumers if required.
if (partition.getEagerlyDeployConsumers()) {
jobManagerNotifier.notifyPartitionConsumable(
partition.getJobId(), partition.getPartitionId());
}
}
//NetworkEnvironment
public void notifyPartitionConsumable(JobID jobId, final ResultPartitionID partitionId) {
final ScheduleOrUpdateConsumers msg = new ScheduleOrUpdateConsumers(jobId, partitionId);
Future<Object> futureResponse = jobManager.ask(msg, jobManagerMessageTimeout);
...
//Explain why for PIPELINED, eagerlyDeployConsumers is always true here.
//JobVertex
public JobEdge connectNewDataSetAsInput(
JobVertex input,
DistributionPattern distPattern,
ResultPartitionType partitionType,
boolean eagerlyDeployConsumers) {
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
dataSet.setEagerlyDeployConsumers(eagerlyDeployConsumers);
JobEdge edge = new JobEdge(dataSet, this, distPattern);
this.inputs.add(edge);
dataSet.addConsumer(edge);
return edge;
}
//StreamingJobGraphGenerator
if (partitioner instanceof ForwardPartitioner) {
downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED,
true);
} else if (partitioner instanceof RescalePartitioner){
downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED,
true);
} else {
downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.ALL_TO_ALL,
ResultPartitionType.PIPELINED,
true);
}
//ResultPartition
//Also for a ResultPartition, when is has first buffer produced, it will notify JobManager to deploy it's consumer tasks
public void add(Buffer buffer, int subpartitionIndex) throws IOException {
boolean success = false;
try {
checkInProduceState();
final ResultSubpartition subpartition = subpartitions[subpartitionIndex];
synchronized (subpartition) {
success = subpartition.add(buffer);
// Update statistics
totalNumberOfBuffers++;
totalNumberOfBytes += buffer.getSize();
}
}
finally {
if (success) {
notifyPipelinedConsumers();
}
else {
buffer.recycle();
}
}
}
//ResultPartition
//Only PIPELINED will work here
private void notifyPipelinedConsumers() throws IOException {
if (partitionType.isPipelined() && !hasNotifiedPipelinedConsumers) {
partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionId);
hasNotifiedPipelinedConsumers = true;
}
}
简单总结:
- 对于 PIPELINED,在 source tasks 部署完成后立马会触发一次下游 consumer tasks 的部署
- 在生产者 task 产生第一个 buffer 数据的时候也会触发一次 consumer tasks 的部署
附一张图解释该流程:

经典调度图:




