MySQL, Oracle, Linux, 软件架构及大数据技术知识分享平台

网站首页 > 精选文章 / 正文

大数据专家,教你学Flink批处理生成最佳执行计划,不学没机会了

2025-04-27 14:06 huorong 精选文章 4 ℃ 0 评论

前言

生成最佳执行计划是一个递归计算的过程:正向从DataSinkNode开始直到DataSourceNode,分别计算每个OptimizerNode的最佳计划,然后反向逐步将整个OptimizerNode DAG图转换为PlanNode DAG图,得到一个最优计划。其中,PlanNode的类继承结构,如下图所示:

通过与OptimizerNode对应的节点结构类图对比,PlanNode更加抽象了一个层次,更关注Operator之间的数据交换策略。其中,生成最佳执行计划的过程,可以在Optimizer类中看到,如下代码所示:

// the final step is now to generate the actual plan alternatives

List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);

这是一个递归的过程,每个OptimizerNode都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从OptimizerNode DAG转换为PlanNode DAG。后面,我们会对SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode创建的处理过程进行详细说明。

基本数据结构和定义

首先,我们把与生成最佳执行计划相关的数据结构,以及一些常用的处理逻辑简单介绍一下,方便后续详细说明最佳执行计划的生成过程。

PlanNode的数据结构,如下图所示:

(1)对于Channel,对应属性说明如下:source:在PlanNode DAG图中,当前PlanNode的直接上游PlanNode的引用target:在PlanNode DAG图中的当前PlanNode的引用partitioner:根据指定的Key,计算分区,它对应的接口需要实现方法int partition(K key, int numPartitions)dataDistribution:用来描述数据分布的配置,主要包括数据被分为多少个Bucket、Bucket的Boundary为多少,目前只有在
PartitioningProperty.RANGE_PARTITIONED的情况下才会用到该属性配置tmpMode:在两个Operator连接之间,需要对数据进行临时物化处理的模式,枚举类包含NONE、PIPELINE_BREAKER、CACHED、CACHING_PIPELINE_BREAKER这4个取值dataExchangeMode:数据交换模式,包含PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED这4种取值localProps:局部属性,比如单个分区内部的属性配置globalProps:全局属性,比如跨分区(Partition)的属性配置(2)对于PlanNode,对应属性说明如下:template:它是一个OptimizerNode,用来基于它作为模板,创建新的PlanNodebroadcastInputs:用来描述当前PlanNode具有哪些输入的Broadcast DataSet,建立从输入Broadcast DataSet到当前PlanNode对应Operator的连接outChannels:用来描述当前PlanNode对应Operator,有哪些输出的下游Operator,建立到直接下游Operator之间的连接

在递归调用getAlternativePlans()方法的处理过程中,会使用到一个名称为Costs的类,它内部定义了一个Operator相关的代价属性,如下所示:

private double networkCost; // network cost, in transferred bytes

private double diskCost; // bytes to be written and read, in bytes

private double cpuCost; // CPU costs

private double heuristicNetworkCost;

private double heuristicDiskCost;

private double heuristicCpuCost;

其中,目前包括网络、I/O、CPU这三种主要的代价计算。在这里,主要是基于OptimizerNode DAG生成过程中对Operator计算所产生的代价进行处理的。

首先,我们介绍几个和设置数据交换模式有关的概念:(1)执行模式(ExecutionMode):主要描述一个批处理程序是Batched执行,还是Pipelined执行,它是从数据交换角度来确定的,使用枚举类ExecutionMode来定义的,目前取值包含PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED。(2)传输策略(ShipStrategyType):主要描述一个Operator输出结果的传输策略,比如本地FORWARD,HASH重分区、RANGE重分区,使用枚举类ShipStrategyType来定义的,该枚举类的取值包括NONE、FORWARD、PARTITION_RANDOM、PARTITION_HASH、PARTITION_RANGE、
PARTITION_FORCED_REBALANCE、BROADCAST、PARTITION_CUSTOM。其中,NONE和FORWARD这两个取值表示不经过网络传输,直接本地传输数据。(3)中断Pipeline:breakPipeline是一个变量,它标识了数据交换过程中需要中断Pipeline,主要是为了避免潜在的死锁(Deadlock)问题。设置该标志,实际上在数据交换过程中,对Operator之间的数据Connection做了一个解耦,比如不使用Pipeline方式,而是使用Batch方式,同样能够达到数据交换的目的。设置数据交换模式的值,主要是通过下面的代码来进行处理:

DataExchangeMode em = DataExchangeMode.select(exchangeMode, shipStrategy, breakPipeline);

如何根据exchangeMode、shipStrategy、breakPipeline来设置DataExchangeMode呢?其实,在DataExchangeMode中定义了4组映射关系,能够基于Operator翻译处理过程中exchangeMode、shipStrategy、breakPipeline的取值不同,根据设置的ExecutionMode的值,直接映射到对应的DataExchangeMode,代码片段如下所示:

private static final DataExchangeMode[] FORWARD = new DataExchangeMode[ExecutionMode.values().length];

private static final DataExchangeMode[] SHUFFLE = new DataExchangeMode[ExecutionMode.values().length];

private static final DataExchangeMode[] BREAKING = new DataExchangeMode[ExecutionMode.values().length];

// initialize the map between execution modes and exchange modes in

static {

FORWARD[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;

SHUFFLE[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;

BREAKING[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;

FORWARD[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;

SHUFFLE[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;

BREAKING[ExecutionMode.PIPELINED.ordinal()] = BATCH;

FORWARD[ExecutionMode.BATCH.ordinal()] = PIPELINED;

SHUFFLE[ExecutionMode.BATCH.ordinal()] = BATCH;

BREAKING[ExecutionMode.BATCH.ordinal()] = BATCH;

FORWARD[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;

SHUFFLE[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;

BREAKING[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;

}

这是一个预先初始化设置的配置选项矩阵,分别对应FORWARD、SHUFFLE、BREAKING三种场景下ExecutionMode与DataExchangeMode的映射关系,为了更加直观的看清上述矩阵,我们画了一个图,如下图所示:

上图中,三种连接场景与四种执行模式相互组合,非常方便得到对应的数据交换模式(DataExchangeMode)。根据不同情况能够直接获取对应DataExchangeMode的值,例如,Flink中计算DataExchangeMode值的代码片段,如下所示:

if (breakPipeline) {

return getPipelineBreakingExchange(executionMode);

}

else if (shipStrategy == ShipStrategyType.FORWARD) {

return getForForwardExchange(executionMode);

}

else {

return getForShuffleOrBroadcast(executionMode);

}

创建SourcePlanNode

因为构建OptimizerNode DAG的过程中,都是采用从后向前的方式对Operator进行遍历,从而生成了对应的计算DAG图表示。所以,生成执行计划过程中,也是首先会从创建SourcePlanNode后开始返回,对应的代码,如下所示:

SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")",

this.gprops, this.lprops);

if(!replicatedInput) {

candidate.updatePropertiesWithUniqueSets(getUniqueFields());

final Costs costs = new Costs();

if (FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass()) &&

this.estimatedOutputSize >= 0) {

estimator.addFileInputCost(this.estimatedOutputSize, costs);

}

candidate.setCosts(costs);

} else {

// replicated input

final Costs costs = new Costs();

InputFormat<?,?> inputFormat =

((ReplicatingInputFormat<?,?>) getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();

if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&

this.estimatedOutputSize >= 0) {

estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), costs);

}

candidate.setCosts(costs);

}

上面代码,首先,创建了一个SourcePlanNode;然后,针对Source是否是replicatedInput,取值不同与对应下游Operator的数据交换代价是不同的:ReplicatedInput是从Source开始,会有多个并行的Operator同时以该ReplicatedInput DataSet作为输入,那么对应的代价估计就需要乘以并行度(this.estimatedOutputSize * this.getParallelism());最后,返回了一个包含该SourcePlanNode的List。

创建SingleInputPlanNode

如果一个OptimizerNode是SingleInputNode类型,它主要包括MapNode、FilterNode、FlatMapNode、ReduceNode、GroupCombineNode、GroupReduceNode、MapPartitionNode、PartitionNode、SortPartitionNode这几种具体实现,那么都会创建一个SingleInputPlanNode(它是一个PlanNode)来进行抽象表示。因为每个SingleInputNode对应的Operator都允许设置Broadcast输入DataSet,所以,对于Broadcast DataSet会做如下处理:

// calculate alternative sub-plans for broadcast inputs

final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>();

List<DagConnection> broadcastConnections = getBroadcastConnections();

List<String> broadcastConnectionNames = getBroadcastConnectionNames();

for (int i = 0; i < broadcastConnections.size(); i++ ) {

DagConnection broadcastConnection = broadcastConnections.get(i);

String broadcastConnectionName = broadcastConnectionNames.get(i);

List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator);

// wrap the plan candidates in named channels

HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size());

for (PlanNode plan: broadcastPlanCandidates) {

NamedChannel c = new NamedChannel(broadcastConnectionName, plan);

DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),

ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());

c.setShipStrategy(ShipStrategyType.BROADCAST, exMode);

broadcastChannels.add(c);

}

broadcastPlanChannels.add(broadcastChannels);

}

上面代码中,最核心的数据结构就是NamedChannel,它是设置名称的Channel,正好能够将用户编程设置的Broadcast DataSet与对应的字符串名称对应起来。NamedChannel中,name是Broadcast DataSet的字符串名称,value是一个Broadcast DataSet对应的PlanNode对象,同时还包含了对应的数据传输策略ShipStrategyType等其他属性。继续看代码,需要对递归调用getAlternativePlans()返回的PlanNode的List进行处理,分别对每个PlanNode进行处理,代码如下所示:

// create all candidates

for (PlanNode child : subPlans) {

if (child.getGlobalProperties().isFullyReplicated()) {

// fully replicated input is always locally forwarded if the parallelism is not changed

if (parallelismChange) {

// can not continue with this child

childrenSkippedDueToReplicatedInput = true;

continue;

} else {

this.inConn.setShipStrategy(ShipStrategyType.FORWARD);

}

}

if (this.inConn.getShipStrategy() == null) {

// pick the strategy ourselves

for (RequestedGlobalProperties igps: intGlobal) {

final Channel c = new Channel(child, this.inConn.getMaterializationMode());

igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline);

// if the parallelism changed, make sure that we cancel out properties, unless the

// ship strategy preserves/establishes them even under changing parallelisms

if (parallelismChange && !c.getShipStrategy().isNetworkStrategy()) {

c.getGlobalProperties().reset();

}

// check whether we meet any of the accepted properties

// we may remove this check, when we do a check to not inherit

// requested global properties that are incompatible with all possible

// requested properties

for (RequestedGlobalProperties rgps: allValidGlobals) {

if (rgps.isMetBy(c.getGlobalProperties())) {

c.setRequiredGlobalProps(rgps);

addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);

break;

}

}

}

} else {

// hint fixed the strategy

final Channel c = new Channel(child, this.inConn.getMaterializationMode());

final ShipStrategyType shipStrategy = this.inConn.getShipStrategy();

final DataExchangeMode exMode = DataExchangeMode.select(executionMode, shipStrategy, breaksPipeline);

if (this.keys != null) {

c.setShipStrategy(shipStrategy, this.keys.toFieldList(), exMode);

} else {

c.setShipStrategy(shipStrategy, exMode);

}

if (parallelismChange) {

c.adjustGlobalPropertiesForFullParallelismChange();

}

// check whether we meet any of the accepted properties

for (RequestedGlobalProperties rgps: allValidGlobals) {

if (rgps.isMetBy(c.getGlobalProperties())) {

addLocalCandidates(c, broadcastPlanChannels, rgps, outputPlans, estimator);

break;

}

}

}

}

如果输入是一个ReplicatedInput,表示当前这个PlanNode是ReplicatedInput下游多个并行Operator中的一个,这种情况下并行度和非ReplicatedInput情况的上游Operator是不相同的,否则直接使用本地的ShipStrategyType.FORWARD数据传输策略即可。如果当前SingleInputNode对应的输入DagConnection没有设置ShipStrategyType,则会读取它所持有的RequestedGlobalProperties配置信息(如果设置了ShipStrategyType,处理逻辑基本相差不多,后面不再累述):

final Set<RequestedGlobalProperties> intGlobal = this.inConn.getInterestingProperties().getGlobalProperties();

同时,还会创建Channel对象,上面igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline)主要是为Channel设置对应的属性,从而使Channel能够得到它需要的GlobalProperties设置,主要有如下几种情况:(1)如果Channel对应的Source是ReplicatedInput,则对应的分区属性必须是
PartitioningProperty.FULL_REPLICATION和
PartitioningProperty.ANY_DISTRIBUTION(2)如果PartitioningProperty设置为
PartitioningProperty.RANDOM_PARTITIONED(或没有设置PartitioningProperty的值),或者值为
PartitioningProperty.ANY_DISTRIBUTION,则ShipStrategyType的值需要根据是否对上游(发送数据)和下游(接收数据)的并行度进行了修改来设置:若并行度未变,则为ShipStrategyType.FORWARD,否则为
ShipStrategyType.PARTITION_RANDOM(3)如果并行度没有修改,并且检查GlobalProperties的内容是否满足要求,满足的话设置为ShipStrategyType.FORWARD(4)除了上面的情况,其他都会根据PartitioningProperty的枚举值来设置ShipStrategyType,主要包括:

PartitioningProperty.FULL_REPLICATION → ShipStrategyType.BROADCAST

PartitioningProperty.ANY_PARTITIONING || PartitioningProperty.HASH_PARTITIONED → ShipStrategyType.PARTITION_HASH

PartitioningProperty.RANGE_PARTITIONED → ShipStrategyType.PARTITION_RANGE

PartitioningProperty.FORCED_REBALANCED → ShipStrategyType.PARTITION_FORCED_REBALANCE

PartitioningProperty.CUSTOM_PARTITIONING → ShipStrategyType.PARTITION_CUSTOM

通过上面处理,已经对Channel进行了正确的配置,那么最后还需要检查Channel设置的GlobalProperties与RequestedGlobalProperties的兼容性,代码片段如下所示:

for (RequestedGlobalProperties rgps: allValidGlobals) {

if (rgps.isMetBy(c.getGlobalProperties())) {

c.setRequiredGlobalProps(rgps);

addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);

break;

}

}

如果rgps与c.getGlobalProperties()兼容,则上面addLocalCandidates()方法会处理RequestedLocalProperties,这里会创建(基于前面的Channel对象克隆)一个新的Channel对象,来对其进行LocalProperties的设置。其中,主要是对枚举类型LocalStrategy的变量进行设置,主要包括3个取值:NONE(没有设置任何Local Strategy)、SORT(输入是经过排序的)、COMBININGSORT(输入是经过排序的,并且排序过程中应用了Combiner)。经过前面的处理,已经得到了一个PlanNode的列表,即对应outputPlans列表。最后会对outputPlans中每个PlanNode对应的Operator进行代价计算,对得到的执行计划进行剪枝处理,代码如下所示:

// cost and prune the plans

for (PlanNode node : outputPlans) {

estimator.costOperator(node);

}

prunePlanAlternatives(outputPlans);

outputPlans.trimToSize();


this.cachedPlans = outputPlans;

return outputPlans;

上面代码中代价计算与剪枝处理,我们不在这里展开,后续单独进行详细分析。这样,处理到当前SingleInputNode时,我们得到了需要的执行计划outputPlans列表。

创建SinkPlanNode

遍历整个OptimizerNode DAG过程中,最后返回处理的最后一个OptimizerNode应该是DataSinkNode,它也是实现了getAlternativePlans()方法来最终创建生成SinkPlanNode,对应代码如下所示:

// calculate alternative sub-plans for predecessor

List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);

List<PlanNode> outputPlans = new ArrayList<PlanNode>();

… …

InterestingProperties ips = this.input.getInterestingProperties();

for (PlanNode p : subPlans) {

for (RequestedGlobalProperties gp : ips.getGlobalProperties()) {

for (RequestedLocalProperties lp : ips.getLocalProperties()) {

Channel c = new Channel(p);

gp.parameterizeChannel(c, dopChange, executionMode, breakPipeline);

lp.parameterizeChannel(c);

c.setRequiredLocalProps(lp);

c.setRequiredGlobalProps(gp);

outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c));

}

}

}

// cost and prune the plans

for (PlanNode node : outputPlans) {

estimator.costOperator(node);

}

prunePlanAlternatives(outputPlans);

通过上述代码可以看出,也对outputPlans中每一个PlanNode对应的Operator进行了代价计算,以及对生成的执行计划进行剪枝处理,最后得到了最终的执行计划outputPlans。

创建DualInputPlanNode

对TwoInputNode进行处理,和SingleInputNode的大体逻辑类似,只是TwoInputNode具有两个DagConnection来连接输入的两个OptimizerNode,处理起来比SingleInputNode更为复杂一些,最终也会包括后面的代价计算和剪枝处理,这里不做过多累述。(本文来自:时延军,包含链接:http://shiyanjun.cn)

感谢大家支持,多多转发关注~~~~

Tags:bat脚本

猜你喜欢

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言