百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Spark源码阅读:DataFrame.collect 作业提交流程思维导图

liuian 2025-04-09 17:52 48 浏览

本文分为两个部分:

  1. 作业提交流程思维导图
  2. 关键函数列表

作业提交流程思维导图

collect后Job的提交流程

点击「链接」查看DataFrame.collect触发的作业提交流程思维导图。

关键函数列表

Dataset.collect

def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)

Dataset.withAction

Dataset.collectFromPlan

触发物理计划的执行,其中 plan 的类型是 SparkPlan

private def collectFromPlan(plan: SparkPlan): Array[T] = {
  val fromRow = resolvedEnc.createDeserializer()
  plan.executeCollect().map(fromRow)
}

Spark 有很多action 函数,比如:

  • collect
  • count
  • show

最终都是通过 collectFromPlan 去创建 Job

SparkPlan.executeCollect


executeCollect

这个函数分为三部:

  • getByteArrayRdd函数 将UnsafeRow RDD 转化为 byte array RDD,加速序列化
  • 然后调用了 RDD.collect
  • 解析 collect 结果,并返回

RDD.collect

Resilient Distributed Dataset (RDD), 是一种不可变、支持分区的数据集合。由于支持分区,该数据集支持并行访问。

class RDD是一个基类,它有很多子类:

  • ShuffledRDD:存储shuffle结果数据,parent RDD 是 Java key-value 对
  • ShuffledRowRDD:存储shuffle结果数据,parent RDD 是 InternalRow,SparkSQL使用
  • MapPartitionsRDD:算子会被应用到 parent RDD 的所有分区
  • UnionRDD:存储 union 的结果数据
  • 其他 RDD 子类

collect 方法的主要职能是提交 Spark 作业,该功能代理给了 SparkContext 去支持:

SparkContext.runJob

runJob 方法有很多重载,我们只关心最复杂的一个:

从功能上来说,它实现了

  • 准备 callSite,以便出问题知道是哪一行代码出错了
  • 通过 DAGScheduler.runJob提交作业
  • progressBar: 命令行里 stage的进度条显示
  • doCheckpoint 将 RDD的中间和最后结果缓存下来

从代码上来说,方法声明如下:

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit 

它有两个泛型类型参数:

  • T: ClassTag 输入RDD的类型
  • U: ClassTag 输出数据的类型

参数列表:

  • rdd: RDD[T] 指输入RDD类型,比如 RDD[(Long, Array[Byte])]
  • func: (TaskContext, Iterator[T]) => U。func会被作用到 rdd的每个分区,返回U
  • partitions: Seq[Int]。分区下标列表
  • resultHandler: (Int, U)。这是一个回调函数。处理func执行完返回的数据,第一个参数是分区index,第二个是func的返回值

返回值: Unit 表示没有任何返回值

DAGScheduler.runJob

对于 DAGScheduler 而言,Stage是最小的调度单元。它会

  • 给Job生成以Stage为调度单位的DAG图
  • 追踪RDD和Stage的输出状态,比如哪些已经被物化,并基于这些信息提供一个最优的调度方案
  • 提交Stage,以TaskSet的形式提交给 TasksetManager

DAGScheduler 对Job的调度是围绕
DAGSchedulerEventProcessLoop 展开的。这是一个经典的EventLoop使用场景。runJob 方法的执行流程如下:

  1. 提交任务本质上是向 EventLoop 发送一个 JobSubmitted 事件
  2. 通过一个JobWaiter对象等待结果

在 EventLoop 的另一端,onReceive 接收到 JobSubmitted事件,交给成员函数 handleJobSubmitted 处理该事件。

JobWaiter 内部有一个 Promise 对象,它会不停接收到 taskSucceeded,增加计数,知道成功task的数量等于task的总数量,将promise置为成功。

DAGSchedulerEventProcessLoop.onReceive

onReceive 负责接收各类事件,并分发给特定的 handler 函数处理,具体可以看思维导图或spark代码。

这里我们只看 handleJobSubmitted,它做了五件事情:

  • 创建Stage:递归式地创建,先创建parent stage
  • 注册Stage
  • 创建Job
  • 注册Job
  • 提交Stage

由于 stage 是一个有向无环图,所以创建和执行都遵循 topological order。

DAGScheduler.createResultStage

在 SparkPlan 对象调用 execute 时,会递归地生成 RDD,从而构成了 RDD Lineage Graph,它是一个有向无环图。那么在 RDD Lineage 上如何切分 stage 呢?

RDD依赖分为宽依赖和窄依赖,代码体现为两个类ShuffleDependency和NarrowDependency。在构建 RDD Lineage时,相邻的两个RDD必须有其中一种依赖关系。Spark通过这种依赖关系划分 Stage。根节点的RDD必须分配到 ResultStage里,而之前所有的Stage,不管有多少级依赖,都是 ShuffleMapStage。

DAGScheduler.getShuffleDependenciesAndResourceProfiles

方法中,通过一个栈来记录分配到当前stage中的 RDD(窄依赖中的rdd都会被push到栈里),碰到宽依赖,则加到 shuffleDeps 中。

getShuffleDependenciesAndResourceProfiles

相关推荐

电脑重装系统后没有声音怎么解决

电脑重装系统后没有声音,可能是声卡驱动未安装、声卡驱动不兼容或者声音相关服务未开启等原因。解决方法可以尝试重新安装声卡驱动、更新驱动程序软件或者打开声音相关设置。如果问题仍然存在,建议寻求专业人士的...

word2007安装产品密钥(安装office2010产品密钥)

可以通过以下方式获取Word文档的产品密钥:购买正版Word软件,从官方渠道获得产品密钥。下载并安装MicrosoftOffice,从安装过程中获取产品密钥。请注意,任何未经授权的方式获取...

绝地求生电脑配置要求(绝地求生电脑配置要求2024)
绝地求生电脑配置要求(绝地求生电脑配置要求2024)

绝地求生的最低配置国服官方给出的最低配置是内存6G,CPUInteli3-4340/AMDFX-630,显卡GTX660/HD7850,所有效果最低,但是说实话,这个配置玩起来,体验太差,而且没几局就需要重启客户端,而且GTX6...

2025-12-23 21:05 liuian

安卓仿苹果ios14主题桌面(安卓仿ios14主题全套免费)
  • 安卓仿苹果ios14主题桌面(安卓仿ios14主题全套免费)
  • 安卓仿苹果ios14主题桌面(安卓仿ios14主题全套免费)
  • 安卓仿苹果ios14主题桌面(安卓仿ios14主题全套免费)
  • 安卓仿苹果ios14主题桌面(安卓仿ios14主题全套免费)
亲手自己重装系统win7(自己怎么重装win7)

要一键安装重装系统Win7,您可以使用Windows7安装盘或USB驱动器。首先,将安装盘或USB插入计算机,并重启计算机。然后,在计算机启动时按下相应的按键(通常是F12或Del键)进入启动菜单。...

windows7旗舰版临时激活(win7暂时激活)

关于这个问题,目前,有几种方法可以激活Windows7旗舰版,以下是最简单的几种方法:1.使用激活工具:可以使用一些第三方激活工具,如KMSpico、MicrosoftToolkit等工具来激活...

免费查序列号入口(免费查序列号入口平板)

苹果查序列号入口可登陆苹果官网checkcoverage.apple.com进行查询,具体步骤如下:1、打开手机设置,点击“通用”;2、进入页面后点击“关于本机”;3、页面跳转后,我们就可以看到本机的...

磁盘被保护了如何取消保护(磁盘被保护了如何取消保护设置)

1、打开磁盘分区管理窗口中选择要去掉被写保护的磁盘。2、选中磁盘后单击鼠标右键可显示出选项列表下选择属性。3、打开磁盘属性对话框中选择点击硬件选项卡。4、然后在硬件页面中选中所有磁盘后再单击属性按钮。...

win7系统硬盘分区教程(win7如何对硬盘分区)

在Win7中,你可以使用磁盘管理工具来给硬盘分区。首先,打开控制面板,点击“系统和安全”,然后选择“管理工具”。在管理工具中,找到“计算机管理”,点击打开。在计算机管理窗口中,选择“磁盘管理”。在磁盘...

电脑不识别移动硬盘怎么办(笔记本电脑不识别移动硬盘怎么办)

电脑无法识别移动硬盘的原因有很多,以下是一些可能的原因和相应的解决方法:1.USB供电不足:移动硬盘功率较大,可能需要更多的电压。前置USB接口可能无法提供足够的电压。解决方法是将移动硬盘接到...

cf穿越火线烟雾头盔怎么调(cf的最新烟雾头盔怎么调)

cf新版烟雾保护头盔调置:?cf这款游戏中,更新后调整烟雾头的方法是打开NVIDIA控制面板,在NVIDIA控制面板中选择调整视频颜色设置,接着点击通过NVIDIA设置选项,然后将亮度调整到79%,对...

u盘怎样格式化最安全(u盘怎么格式化最干净)

只需将U盘插入到电脑之后,然后在我的电脑中找到U盘的盘符,使用鼠标右键点击打开菜单,其中就可以看到【格式化】的选项,根据需要选择然后点击【快速格式化】即可U盘格式是FAT32格式,那么其传输速度会明显...

移动路由器怎么改wifi密码(移动网络路由器怎么改密码wifi密码)
移动路由器怎么改wifi密码(移动网络路由器怎么改密码wifi密码)

1.打开手机设置,找到wifi点击进入,点击已连接的wifi。2.里面有一个路由器的选项,记住路由器后面一串数字。3.打开手机网页,在地址栏输入刚记住的那串数字,点击进入,选择继续访问网页版,输入管理员密码,点击确定。4.点击路由设置,点击...

2025-12-23 16:05 liuian

手机五笔输入法怎么使用(手机五笔怎么用新手教程)
手机五笔输入法怎么使用(手机五笔怎么用新手教程)

手机使用五笔输入法操作方法1、打开手机,点击应用商店,下载搜狗输入法,并安装。2、打开下载后的搜狗输入法,在打开页面勾选搜狗输入法,然后点击启用。3、在弹出页面,点击搜狗输入法。在编辑文字页面,点击如图所示按钮。4、在弹出页面,点击其他输入...

2025-12-23 15:55 liuian

天翼网关怎么设置wifi(天翼网关怎么设置wifi密码)

天翼网关设置wifi的方法步骤如下:首先打开浏览器,在地址栏中输入198.168.1.1,确定。输入用户名、密码,如果以前没改过,这个密码就在网关背面的标签上面,可以去查看一下,输入后点击“确定”。点...