百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT知识 > 正文

Spark源码阅读:DataFrame.collect 作业提交流程思维导图

liuian 2025-04-09 17:52 23 浏览

本文分为两个部分:

  1. 作业提交流程思维导图
  2. 关键函数列表

作业提交流程思维导图

collect后Job的提交流程

点击「链接」查看DataFrame.collect触发的作业提交流程思维导图。

关键函数列表

Dataset.collect

def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)

Dataset.withAction

Dataset.collectFromPlan

触发物理计划的执行,其中 plan 的类型是 SparkPlan

private def collectFromPlan(plan: SparkPlan): Array[T] = {
  val fromRow = resolvedEnc.createDeserializer()
  plan.executeCollect().map(fromRow)
}

Spark 有很多action 函数,比如:

  • collect
  • count
  • show

最终都是通过 collectFromPlan 去创建 Job

SparkPlan.executeCollect


executeCollect

这个函数分为三部:

  • getByteArrayRdd函数 将UnsafeRow RDD 转化为 byte array RDD,加速序列化
  • 然后调用了 RDD.collect
  • 解析 collect 结果,并返回

RDD.collect

Resilient Distributed Dataset (RDD), 是一种不可变、支持分区的数据集合。由于支持分区,该数据集支持并行访问。

class RDD是一个基类,它有很多子类:

  • ShuffledRDD:存储shuffle结果数据,parent RDD 是 Java key-value 对
  • ShuffledRowRDD:存储shuffle结果数据,parent RDD 是 InternalRow,SparkSQL使用
  • MapPartitionsRDD:算子会被应用到 parent RDD 的所有分区
  • UnionRDD:存储 union 的结果数据
  • 其他 RDD 子类

collect 方法的主要职能是提交 Spark 作业,该功能代理给了 SparkContext 去支持:

SparkContext.runJob

runJob 方法有很多重载,我们只关心最复杂的一个:

从功能上来说,它实现了

  • 准备 callSite,以便出问题知道是哪一行代码出错了
  • 通过 DAGScheduler.runJob提交作业
  • progressBar: 命令行里 stage的进度条显示
  • doCheckpoint 将 RDD的中间和最后结果缓存下来

从代码上来说,方法声明如下:

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit 

它有两个泛型类型参数:

  • T: ClassTag 输入RDD的类型
  • U: ClassTag 输出数据的类型

参数列表:

  • rdd: RDD[T] 指输入RDD类型,比如 RDD[(Long, Array[Byte])]
  • func: (TaskContext, Iterator[T]) => U。func会被作用到 rdd的每个分区,返回U
  • partitions: Seq[Int]。分区下标列表
  • resultHandler: (Int, U)。这是一个回调函数。处理func执行完返回的数据,第一个参数是分区index,第二个是func的返回值

返回值: Unit 表示没有任何返回值

DAGScheduler.runJob

对于 DAGScheduler 而言,Stage是最小的调度单元。它会

  • 给Job生成以Stage为调度单位的DAG图
  • 追踪RDD和Stage的输出状态,比如哪些已经被物化,并基于这些信息提供一个最优的调度方案
  • 提交Stage,以TaskSet的形式提交给 TasksetManager

DAGScheduler 对Job的调度是围绕
DAGSchedulerEventProcessLoop 展开的。这是一个经典的EventLoop使用场景。runJob 方法的执行流程如下:

  1. 提交任务本质上是向 EventLoop 发送一个 JobSubmitted 事件
  2. 通过一个JobWaiter对象等待结果

在 EventLoop 的另一端,onReceive 接收到 JobSubmitted事件,交给成员函数 handleJobSubmitted 处理该事件。

JobWaiter 内部有一个 Promise 对象,它会不停接收到 taskSucceeded,增加计数,知道成功task的数量等于task的总数量,将promise置为成功。

DAGSchedulerEventProcessLoop.onReceive

onReceive 负责接收各类事件,并分发给特定的 handler 函数处理,具体可以看思维导图或spark代码。

这里我们只看 handleJobSubmitted,它做了五件事情:

  • 创建Stage:递归式地创建,先创建parent stage
  • 注册Stage
  • 创建Job
  • 注册Job
  • 提交Stage

由于 stage 是一个有向无环图,所以创建和执行都遵循 topological order。

DAGScheduler.createResultStage

在 SparkPlan 对象调用 execute 时,会递归地生成 RDD,从而构成了 RDD Lineage Graph,它是一个有向无环图。那么在 RDD Lineage 上如何切分 stage 呢?

RDD依赖分为宽依赖和窄依赖,代码体现为两个类ShuffleDependency和NarrowDependency。在构建 RDD Lineage时,相邻的两个RDD必须有其中一种依赖关系。Spark通过这种依赖关系划分 Stage。根节点的RDD必须分配到 ResultStage里,而之前所有的Stage,不管有多少级依赖,都是 ShuffleMapStage。

DAGScheduler.getShuffleDependenciesAndResourceProfiles

方法中,通过一个栈来记录分配到当前stage中的 RDD(窄依赖中的rdd都会被push到栈里),碰到宽依赖,则加到 shuffleDeps 中。

getShuffleDependenciesAndResourceProfiles

相关推荐

深入解析 MySQL 8.0 JSON 相关函数:解锁数据存储的无限可能

引言在现代应用程序中,数据的存储和处理变得愈发复杂多样。MySQL8.0引入了丰富的JSON相关函数,为我们提供了更灵活的数据存储和检索方式。本文将深入探讨MySQL8.0中的JSON...

MySQL的Json类型个人用法详解(mysql json类型对应java什么类型)

前言虽然MySQL很早就添加了Json类型,但是在业务开发过程中还是很少设计带这种类型的表。少不代表没有,当真正要对Json类型进行特定查询,修改,插入和优化等操作时,却感觉一下子想不起那些函数怎么使...

MySQL的json查询之json_array(mysql json_search)

json_array顾名思义就是创建一个数组,实际的用法,我目前没有想到很好的使用场景。使用官方的例子说明一下吧。例一selectjson_array(1,2,3,4);json_array虽然单独...

头条创作挑战赛#一、LSTM 原理 长短期记忆网络

#头条创作挑战赛#一、LSTM原理长短期记忆网络(LongShort-TermMemory,LSTM)是一种特殊类型的循环神经网络(RNN),旨在解决传统RNN在处理长序列数据时面临的梯度...

TensorBoard最全使用教程:看这篇就够了

机器学习通常涉及在训练期间可视化和度量模型的性能。有许多工具可用于此任务。在本文中,我们将重点介绍TensorFlow的开源工具套件,称为TensorBoard,虽然他是TensorFlow...

图神经网络版本的Kolmogorov Arnold(KAN)代码实现和效果对比

本文约4600字,建议阅读10分钟本文介绍了图神经网络版本的对比。KolmogorovArnoldNetworks(KAN)最近作为MLP的替代而流行起来,KANs使用Kolmogorov-Ar...

kornia,一个实用的 Python 库!(python kkb_tools)

大家好,今天为大家分享一个实用的Python库-kornia。Github地址:https://github.com/kornia/kornia/Kornia是一个基于PyTorch的开源计算...

图像分割掩码标注转YOLO多边形标注

Ultralytics团队付出了巨大的努力,使创建自定义YOLO模型变得非常容易。但是,处理大型数据集仍然很痛苦。训练yolo分割模型需要数据集具有其特定格式,这可能与你从大型数据集中获得的...

[python] 向量检索库Faiss使用指北

Faiss是一个由facebook开发以用于高效相似性搜索和密集向量聚类的库。它能够在任意大小的向量集中进行搜索。它还包含用于评估和参数调整的支持代码。Faiss是用C++编写的,带有Python的完...

如何把未量化的 70B 大模型加载到笔记本电脑上运行?

并行运行70B大模型我们已经看到,量化已经成为在低端GPU(比如Colab、Kaggle等)上加载大型语言模型(LLMs)的最常见方法了,但这会降低准确性并增加幻觉现象。那如果你和你的朋友们...

ncnn+PPYOLOv2首次结合!全网最详细代码解读来了

编辑:好困LRS【新智元导读】今天给大家安利一个宝藏仓库miemiedetection,该仓库集合了PPYOLO、PPYOLOv2、PPYOLOE三个算法pytorch实现三合一,其中的PPYOL...

人工智能——图像识别(人工智能图像识别流程)

概述图像识别(ImageRecognition)是计算机视觉的核心任务之一,旨在通过算法让计算机理解图像内容,包括分类(识别物体类别)、检测(定位并识别多个物体)、分割(像素级识别)等,常见的应用场...

PyTorch 深度学习实战(15):Twin Delayed DDPG (TD3) 算法

在上一篇文章中,我们介绍了DeepDeterministicPolicyGradient(DDPG)算法,并使用它解决了Pendulum问题。本文将深入探讨TwinDelayed...

大模型中常用的注意力机制GQA详解以及Pytorch代码实现

分组查询注意力(GroupedQueryAttention)是一种在大型语言模型中的多查询注意力(MQA)和多头注意力(MHA)之间进行插值的方法,它的目标是在保持MQA速度的同时...

pytorch如何快速创建具有特殊意思的tensor张量?

专栏推荐正文我们通过值可以看到torch.empty并没有进行初始化创建tensor并进行随机初始化操作,常用rand/rand_like,randint正态分布(0,1)指定正态分布的均值还有方差i...