百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Spark源码阅读:DataFrame.collect 作业提交流程思维导图

liuian 2025-04-09 17:52 45 浏览

本文分为两个部分:

  1. 作业提交流程思维导图
  2. 关键函数列表

作业提交流程思维导图

collect后Job的提交流程

点击「链接」查看DataFrame.collect触发的作业提交流程思维导图。

关键函数列表

Dataset.collect

def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)

Dataset.withAction

Dataset.collectFromPlan

触发物理计划的执行,其中 plan 的类型是 SparkPlan

private def collectFromPlan(plan: SparkPlan): Array[T] = {
  val fromRow = resolvedEnc.createDeserializer()
  plan.executeCollect().map(fromRow)
}

Spark 有很多action 函数,比如:

  • collect
  • count
  • show

最终都是通过 collectFromPlan 去创建 Job

SparkPlan.executeCollect


executeCollect

这个函数分为三部:

  • getByteArrayRdd函数 将UnsafeRow RDD 转化为 byte array RDD,加速序列化
  • 然后调用了 RDD.collect
  • 解析 collect 结果,并返回

RDD.collect

Resilient Distributed Dataset (RDD), 是一种不可变、支持分区的数据集合。由于支持分区,该数据集支持并行访问。

class RDD是一个基类,它有很多子类:

  • ShuffledRDD:存储shuffle结果数据,parent RDD 是 Java key-value 对
  • ShuffledRowRDD:存储shuffle结果数据,parent RDD 是 InternalRow,SparkSQL使用
  • MapPartitionsRDD:算子会被应用到 parent RDD 的所有分区
  • UnionRDD:存储 union 的结果数据
  • 其他 RDD 子类

collect 方法的主要职能是提交 Spark 作业,该功能代理给了 SparkContext 去支持:

SparkContext.runJob

runJob 方法有很多重载,我们只关心最复杂的一个:

从功能上来说,它实现了

  • 准备 callSite,以便出问题知道是哪一行代码出错了
  • 通过 DAGScheduler.runJob提交作业
  • progressBar: 命令行里 stage的进度条显示
  • doCheckpoint 将 RDD的中间和最后结果缓存下来

从代码上来说,方法声明如下:

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit 

它有两个泛型类型参数:

  • T: ClassTag 输入RDD的类型
  • U: ClassTag 输出数据的类型

参数列表:

  • rdd: RDD[T] 指输入RDD类型,比如 RDD[(Long, Array[Byte])]
  • func: (TaskContext, Iterator[T]) => U。func会被作用到 rdd的每个分区,返回U
  • partitions: Seq[Int]。分区下标列表
  • resultHandler: (Int, U)。这是一个回调函数。处理func执行完返回的数据,第一个参数是分区index,第二个是func的返回值

返回值: Unit 表示没有任何返回值

DAGScheduler.runJob

对于 DAGScheduler 而言,Stage是最小的调度单元。它会

  • 给Job生成以Stage为调度单位的DAG图
  • 追踪RDD和Stage的输出状态,比如哪些已经被物化,并基于这些信息提供一个最优的调度方案
  • 提交Stage,以TaskSet的形式提交给 TasksetManager

DAGScheduler 对Job的调度是围绕
DAGSchedulerEventProcessLoop 展开的。这是一个经典的EventLoop使用场景。runJob 方法的执行流程如下:

  1. 提交任务本质上是向 EventLoop 发送一个 JobSubmitted 事件
  2. 通过一个JobWaiter对象等待结果

在 EventLoop 的另一端,onReceive 接收到 JobSubmitted事件,交给成员函数 handleJobSubmitted 处理该事件。

JobWaiter 内部有一个 Promise 对象,它会不停接收到 taskSucceeded,增加计数,知道成功task的数量等于task的总数量,将promise置为成功。

DAGSchedulerEventProcessLoop.onReceive

onReceive 负责接收各类事件,并分发给特定的 handler 函数处理,具体可以看思维导图或spark代码。

这里我们只看 handleJobSubmitted,它做了五件事情:

  • 创建Stage:递归式地创建,先创建parent stage
  • 注册Stage
  • 创建Job
  • 注册Job
  • 提交Stage

由于 stage 是一个有向无环图,所以创建和执行都遵循 topological order。

DAGScheduler.createResultStage

在 SparkPlan 对象调用 execute 时,会递归地生成 RDD,从而构成了 RDD Lineage Graph,它是一个有向无环图。那么在 RDD Lineage 上如何切分 stage 呢?

RDD依赖分为宽依赖和窄依赖,代码体现为两个类ShuffleDependency和NarrowDependency。在构建 RDD Lineage时,相邻的两个RDD必须有其中一种依赖关系。Spark通过这种依赖关系划分 Stage。根节点的RDD必须分配到 ResultStage里,而之前所有的Stage,不管有多少级依赖,都是 ShuffleMapStage。

DAGScheduler.getShuffleDependenciesAndResourceProfiles

方法中,通过一个栈来记录分配到当前stage中的 RDD(窄依赖中的rdd都会被push到栈里),碰到宽依赖,则加到 shuffleDeps 中。

getShuffleDependenciesAndResourceProfiles

相关推荐

注册qq号免费立即申请不用手机号

1、先在网页上搜索安装谷歌浏览器2、安装完成后,不要在电脑上登录任何一个QQ,打开QQ登录的界面,点击注册账号。3、在注册账号的页面,填上相应要求的信息,手机号码的部分先不要填。4、点击谷歌浏览器右上...

office2016破解版安装教程(office2016下载破解版)

microsoftoffice2016激活与破解的区别是版本不同。①尽量使用官方的原版程序、原版软件,避免使用来路不明的"XX"版。②若程序对非正版授权用户有功能或使用期限制,但仍能满...

tenda登录密码(tenda登录入口密码)

腾达路由器登录的密码和用户名为ADMIN。老版的兴化的都使用的是这一个用户名和密码新出的版本,登录的地址和用户名密码都在路由器背面,标签上的是随机产生的,没有规律,只需查看按照地址输入用户名和密码,就...

android系统更新(android系统更新opengl版本)

1.1、打开手机页面,点击进入oppo的官网;2、进入页面后,点击下载系统包的按钮,系统自动升级。2.1、持手机卡去oppo手机专卖店;2、刷卡在专卖店里直接升级。3.1、打开电脑,开机进入页面;2、...

tplink路由器登录名和密码(tp link无线路由器用户名和密码)

1、tp-link无线路由器,上网账号就是宽带账号,口令就是宽带密码,设置方法如下:一、接线方法,外网进线接入路由器wan口,路由器lan口接线到电脑网线接口。二、路由器设置,打开浏览器http://...

win10硬盘格式mbr还是guid(won10硬盘格式)

作为人类的我回答你的问题。在选择WIN10分区类型时,我建议使用GUID分区表(GPT)。原因如下:1.GPT支持更大容量的硬盘,可以处理大型数据存储需求,而MBR分区表限制了最大可用空间为2TB。...

台式电脑网线怎么插(台式电脑网线插在猫上还是路由器上)
台式电脑网线怎么插(台式电脑网线插在猫上还是路由器上)

1、如果你家里没有用路由器,那么电脑主机上的网线,需要插在猫的网口/LAN口。温馨提示:没有用路由器的情况下,电脑要上网的话,你需要打开电脑中的“宽带连接”程序,然后填写你家的宽带账号、宽带密码,就能连接上网了。如果你不知道如何用“宽带连...

2025-11-08 03:05 liuian

电脑开机进入不了系统怎么办

电脑开机正常但无法进入系统,一般是系统故障或硬件故障。硬件故障:通常是电压不稳定导致,安装稳压器能解决。或者是主机机箱内灰尘过多,导致容易产生静电,清理机箱灰尘,重新拔插内存条可解决。把电脑关机之后重...

电脑不能正常关机(电脑强制关机后无法正常启动)

1解决电脑无法关机的方法2电脑无法关机可能是由于软件冲突、系统故障或者硬件问题等原因造成的。可以尝试以下几种解决方法:a)强制关机:按住电脑主机上的电源按钮直到电脑完全关闭,但这种方法可能会...

qq对战平台下载官网(qq对战平台安卓版)

1.在左边游戏分类上选择你想要玩的游戏,双击游戏名称(cs,魔兽,星际);2.右边房间列表出现不同游戏版本的房间,请对应你安装的游戏的版本选择房间,双击进入;3.点击“设置”按钮,弹出QQ对战平台...

分区助手专业版下载(分区助手6.0中文版)

区别主要有以下几点:1.功能差异:傲梅分区助手绿色版相对于专业版功能较少,仅提供基本的分区操作,如创建、删除、合并、移动、调整分区大小等,而专业版则提供更多的高级功能,如转换磁盘类型、拷贝分区、修复...

驱动程序在哪里找(驱动程序在哪里找出来)

驱动程序在电脑中可以这样查找:1.打开设备管理器:在Windows系统中,你可以通过“控制面板”>“设备管理器”来打开设备管理器。2.查找驱动程序:在设备管理器中,你可以看到你的电脑中安装的...

用光盘怎么重装系统(用光盘怎么重装系统win7)

惠普笔记本有系统光盘重装系统的具体步骤如下:1、当我们用光盘来进行系统重装的时候,我们需要准备好微软系统的系统盘。2、首先我们打开电脑机箱上的光驱,直接放入光碟,此时电脑会自动重启进入读取系统光盘操作...

质量管理体系有哪些(永辉质量管理体系有哪些)

   常见4种。见下:  质量管理体系常用的包括ISO9000质量管理体系、精益生产管理体系、六西格玛质量管理体系、资质体系等。ISO9000...

联想电脑如何截屏截图(联想电脑上怎样截图)

用lenovo电脑如果想截屏,我们可以采用了以下几个方法。一个方法就是用笔记本电脑截屏的快捷键来进行截屏。我们在浏览网页的时候,如果想把网页截屏下来,可以用笔记本电脑的Prtsc键。这个键就是截屏的...