RDD到底是什么?RDD的API_rdd bank-程序员宅基地

技术标签: Spark  分布式  大数据  

RDD到底是什么?RDD的API

大家好,我是W

今天给大家带来一篇关于Spark和RDD的博客,由于我也是初学者,所以没法带来那么深刻的东西,但是我希望用我的感性认知带给大家一点灵感,毕竟刚开始学习Spark的时候我对RDD概念、Spark流程是有很多困惑的,我觉得大家也可能存在这种问题。OK,接下来我将从以下几个角度来讲RDD和Spark:1、 Spark简介、对比hadoop、生态,2、 RDD概念

1、 Spark简介、对比hadoop、生态

1.1 Spark简介

Spark官网,可以看到官方对Spark的概述:

Spark Overview
Apache Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for incremental computation and stream processing.
Apache的Spark是一个用于大规模数据处理的统一分析引擎。它提供了一系列Java、Scaala、Python的高级API以及优化引擎,所以支持统一的操作。它同样的提供了一系列丰富的高阶工具,包括用于SQL查询、结构化数据处理的Spark SQL,用于机器学习的MLlib库,用于图处理的GraphX库,以及用于增量计算和流处理的Streaming库。

可以看到官网对Spark的定义就是一个大一统的框架,其中存在做结构化数据处理的组件Spark SQL,有用于机器学习的MLlib组件等等。在我实际学习的过程中可以感觉到组件间的关系就好像积木一样,需要的时候插上即可。

1.2 Spark对比Hadoop

Spark对比hadoop最大的特点就是快,在官网上第一张图就摆出来Spark比hadoop快了百倍,Spark的运算是基于内存的,而hadoop则需要通过HDFS将数据持久化到磁盘,所以显然是快的,但是快多少还是要看实际生产环境吧。

可是除了这点就没了吗?其实还有的,在《大数据基础:Spark工作原理及基础概念》中给大家罗列出来了:

特点 说明
spark 计算速度快 spark将每个任务构建成DAG进行计算,内部的计算过程通过弹性式分布式数据集RDD在内存在进行计算,相比于hadoop的mapreduce效率提升了100倍。
易于使用 spark 提供了大量的算子,开发只需调用相关api进行实现无法关注底层的实现原理。相较于以前离线任务采用mapreduce实现,实时任务采用storm实现,目前这些都可以通过spark来实现,降低来开发的成本。同时spark 通过spark SQL降低了用户的学习使用门槛,还提供了机器学习,图计算引擎等。
支持多种的资源管理模式 学习使用中可以采用local 模型进行任务的调试,在正式环境中又提供了standalone,yarn等模式,方便用户选择合适的资源管理模式进行适配。
社区支持 spark 生态圈丰富,迭代更新快,成为大数据领域必备的计算引擎。

1.3 Spark生态圈

其实刚刚介绍Spark的时候已经讲了一点了,大家请看图:

在这里插入图片描述

这是我找到比较合理的一张图,它把不同的工作内容分层,结构比较清晰。

说明
资源调度层 因为我们的任务是要提交到集群上运行的,不同的结点有不同的工作,所以需要对计算资源进行调度,而在这一层的资源调度方式就有很多:local模式、StandAlone模式、yarn模式、mesos模式等等。
计算层 计算层主要使用的是spark-core这个spark的核心库,其面向的是离线的计算,而R、Python这些就是所支持的语言。
存储层 存储层包括一系列的存储组件,最常见的比如有hadoop-HDFS、MySQL、HBASE、MongoDB、Redis等等,这些均是spark生态可以对接的存储组件,而右边的sparkSQL显然是支持这些数据源的,而下方的MLlib等等显然需要数据的支持。
数据流 在做实时计算的时候streaming可以对接flume、kafka等组件。

2、 RDD的概念(RDD到底是什么)、Spark的工作流程

这两个话题涉及了很多因素,我感觉这一篇文章还是不可能讲的很清楚,但是我会用我能做到的最朴素的语言给大家感性的讲一讲。同时,我建议大家多做几个小案例来加深认识。

2.1 RDD的概念

2.1.1 官方的定义

RDD是Spark中最重要的概念,其全称叫做Resilient Distributed Dataset (RDD),即弹性分布式数据集,是一种可容错的、可以被并行操作元素集合,是Spark中处理所有数据的一种基本抽象。

光是看这一句还是不够的,我在源码中找来注释给大家看一下,我建议大家仔细看下源码的注释

/**
 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
 * partitioned collection of elements that can be operated on in parallel. This class contains the
 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`.
 * 一个弹性分布式数据集(RDD),是Spark里的基本抽象。
 * 它代表了可以被并行操作的不可变的分区元素集合。这个类包含了各种RDD都支持的基本操作,比如map、filter、persist等。
 * 
 * In addition,[[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such as `groupByKey` and `join`;
 * 此外,org.apache.spark.rdd.PairRDDFunctions里还包含了只有键值对(key-value)类型RDD可用的操作,比如groupByKey、join等。
 * 
 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of Doubles; 
 * org.apache.spark.rdd.DoubleRDDFunctions 里包含了只有Double数据类型的RDD可用的操作。
 * 
 * and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can be saved as SequenceFiles.
 * org.apache.spark.rdd.SequenceFileRDDFunctions 里包含了可以被序列化成文件的RDD所包含的操作。
 * 
 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit.
 * 所有的操作都可以通过implicit来赋予。
 * 
 * Internally, each RDD is characterized by five main properties:
 * 在RDD内部,每一个RDD都由这五个主要特征来描述:
 * 
 *  - A list of partitions
 *  - 一系列分区
 *  
 *  - A function for computing each split
 *  - 对每一个分片做计算的函数
 *  
 *  - A list of dependencies on other RDDs
 *  - 一系列对其他RDD的依赖
 *  
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - 视情况而定,一个作用于键值对RDD的分区器
 *  
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
 *  - 视情况而定, 要计算每个分片的首选位置的列表
 *
 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
 * reading data from a new storage system) by overriding these functions. Please refer to the
 * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
 * for more details on RDD internals.
 * Spark里的所有scheduling和execution都是基于这些方法(通过赋予RDD操作的方式)来实现其自身的计算方式,当然用户可以通过重写方法自定义RDD。
 */

最后注释中还贴心的给出了RDD的提出的论文:《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》

RDD的操作分为两大类,Transformation、Action。

Transformation是对已有的RDD进行转换(记录下一步操作)然后生成新的RDD,采用的是lazy策略,不会立即计算出结果。

Action是让已有的RDD对数据执行它的操作。

表格来自:大数据之Spark简介及RDD说明

Transformation
方法(算子) 说明
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
sample(withReplacement, fraction, seed) 根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
cartesian(otherDataset) 笛卡尔积
pipe(command, [envVars])
coalesce(numPartitions)
repartition(numPartitions)
repartitionAndSortWithinPartitions(partitioner)
Action
方法(算子) 说明
reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
takeOrdered(n, [ordering])
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path)
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
2.1.2 我的感性认识

刚开始我对RDD也是很迷惑,它是在哪里体现了并行化计算的?但是当我真正正正做一个完整的案例时,我才对他有那么一点理解。

大家可以想一个完整的离线计算案例,比如:

我们需要计算美团上外卖的标签,那么我们会有类似以下数据集:

商品ID 用户ID 评价(String)
109283 yyyyxxx 味道还不错,就是有点贵
109283 swssim 虽然有点贵,但是分量足
109284 swssim 好难吃!

我们的目标是针对商品做标签,依据是商品出现最多的5个评价标签。

  • 1、 首先我们通过sparkContext读取数据
  • 2、 因为我们拿到的是评价String,所以做分词,这里假设分词调包成功,评价此时不再是一个长长的话,而是:评价1,评价2
  • 3、 接下来,提取出商品ID,评价
  • 4、 根据商品ID聚类,即groupByKey
  • 5、 对后面标签做操作…

请大家注意第3步,我们的程序放到集群中,而集群中显然不止一台worker,即显然不止一个executor,所以我们整个spark集群中每一个executor拿到的只是整个数据集的一部分(第一台拿0 - n-1行,第二台拿n - 2n-1行类似这样),但是我们的操作是写在一份程序里面,如何对不同机器中的数据集做统一的操作呢?

这显然就是RDD的作用,程序提交时会经过cluster manager分配资源、通过driver提交代码到executor,然后经过各种scheduler把程序进行分析,分成多个stage每一个stage代表了不需要跨机器执行的操作的集合(比如map、filter),而当出现要跨机器操作(比如collect、reduce)时,则会把数据集中到一台机器去操作。

说了那么多,RDD到底是什么呢?

解释1 : 因为每一台机器都知道哪几步本机器不需要依靠别人可以自己做(stage),所以可以先做,不需要看别人脸色,而遇到大家统一的操作时通过网络把数据合并由一台机器做。RDD就是定义这些操作的对象,RDD操作的对象就是分布在不同机器上的同一格式的数据集。

解释2 : 数据集分布在不同机器中,RDD定义了各个机器对这份数据的同一操作(先做什么再做什么)。就好像你安排你的小弟,去不同银行,插入银行卡,输入密码,取5000块钱,然后拿回来,最后给你汇总一样。

参考

总结

Spark毫无疑问是个非常优秀的框架,其中的组件就仿佛积木一般随时插拔。RDD作为Spark的最重要的概念,对Spark整个框架起着至关重要的作用。RDD的操作分为Transoformation和Action两种,其核心理念是定义一个抽象的数据操作,从而方便每个分区针对各自所管理的数据做统一的操作。今天这篇博客可能还有很多没法讲清楚的地方,接下来我会继续把Spark的其他概念、RDD涉及的相关概念更详细的给大家理清楚。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/Alian_W/article/details/109770644

智能推荐

python opencv resize函数_python opencv 等比例调整(缩放)图片分辨率大小代码 cv2.resize()...-程序员宅基地

文章浏览阅读1.3k次。# -*- coding: utf-8 -*-"""@File : 200113_等比例调整图像分辨率大小.py@Time : 2020/1/13 13:38@Author : Dontla@Email : [email protected]@Software: PyCharm"""import cv2def img_resize(image):height, width = image...._opencv小图等比例缩放

【OFDM、OOK、PPM、QAM的BER仿真】绘制不同调制方案的误码率曲线研究(Matlab代码实现)-程序员宅基地

文章浏览阅读42次。对于这些调制技术的误码率(BER)研究是非常重要的,因为它们可以帮助我们了解在不同信道条件下系统的性能表现。通过以上步骤,您可以进行OFDM、OOK、PPM和QAM的误码率仿真研究,并绘制它们的误码率曲线,以便更好地了解它们在不同信道条件下的性能特点。针对这些调制技术的BER研究是非常重要的,可以帮助我们更好地了解这些技术在不同信道条件下的性能表现,从而指导系统设计和优化。6. 分析结果:根据误码率曲线的比较,分析每种调制方案在不同信噪比条件下的性能,包括其容忍的信道条件和适用的应用场景。_ber仿真

【已解决】Vue的Element框架,日期组件(el-date-picker)的@change事件,不会触发。_el-date-picker @change不触发-程序员宅基地

文章浏览阅读2.5w次,点赞3次,收藏3次。1、场景照抄官方的实例,绑定了 myData.Age 这个值。实际选择某个日期后,从 vuetool(开发工具)看,值已经更新了,但视图未更新。2、尝试绑定另一个值: myData,可以正常的触发 @change 方法。可能是:值绑定到子对象时,组件没有侦测到。3、解决使用 @blur 代替 @change 方法。再判断下 “值有没有更新” 即可。如有更好的方法,欢迎评论!..._el-date-picker @change不触发

PCL学习:滤波—Projectlnliers投影滤波_projectinliers-程序员宅基地

文章浏览阅读1.5k次,点赞2次,收藏8次。Projectlnliersclass pcl: : Projectlnliers< PointT >类 Projectlnliers 使用一个模型和一组的内点的索引,将内点投影到模型形成新的一个独立点云。关键成员函数 void setModelType(int model) 通过用户给定的参数设置使用的模型类型 ,参数 Model 为模型类型(见 mo..._projectinliers

未处理System.BadImageFormatException”类型的未经处理的异常在 xxxxxxx.exe 中发生_“system.badimageformatexception”类型的未经处理的异常在 未知模块。 -程序员宅基地

文章浏览阅读2.4k次。“System.BadImageFormatException”类型的未经处理的异常在 xxxx.exe 中发生其他信息: 未能加载文件或程序集“xxxxxxx, Version=xxxxxx,xxxxxxx”或它的某一个依赖项。试图加载格式不正确的程序。此原因是由于 ” 目标程序的目标平台与 依赖项的目标编译平台不一致导致,把所有的项目都修改到同一目标平台下(X86、X64或AnyCPU)进行编译,一般即可解决问题“。若果以上方式不能解决,可采用如下方式:右键选择配置管理器,在这里修改平台。_“system.badimageformatexception”类型的未经处理的异常在 未知模块。 中发生

PC移植安卓---2018/04/26_电脑软件移植安卓-程序员宅基地

文章浏览阅读2.4k次。记录一下碰到的问题:1.Assetbundle加载问题: 原PC打包后的AssetBundle导入安卓工程后,加载会出问题。同时工程打包APK时,StreamingAssets中不能有中文。解决方案: (1).加入PinYinConvert类,用于将中文转换为拼音(多音字可能会出错,例如空调转换为KongDiao||阿拉伯数字不支持,如Ⅰ、Ⅱ、Ⅲ、Ⅳ(IIII)、Ⅴ、Ⅵ、Ⅶ、Ⅷ、Ⅸ、Ⅹ..._电脑软件移植安卓

随便推点

聊聊线程之run方法_start 是同步还是异步-程序员宅基地

文章浏览阅读2.4k次。话不多说参考书籍 汪文君补充知识:start是异步,run是同步,start的执行会经过JNI方法然后被任务执行调度器告知给系统内核分配时间片进行创建线程并执行,而直接调用run不经过本地方法就是普通对象执行实例方法。什么是线程?1.现在几乎百分之百的操作系统都支持多任务的执行,对计算机来说每一个人物就是一个进程(Process),在每一个进程内部至少要有一个线程实在运行中,有时线..._start 是同步还是异步

制作非缘勿扰页面特效----JQuery_单击标题“非缘勿扰”,<dd>元素中有id属性的<span>的文本(主演、导演、标签、剧情-程序员宅基地

文章浏览阅读5.3k次,点赞9次,收藏34次。我主要用了层次选择器和属性选择器可以随意选择,方便简单为主大体CSS格式 大家自行构造网页主体<body> <div class='main' > <div class='left'> <img src="images/pic.gif" /> <br/><br/> <img src="images/col.gif" alt="收藏本片"/&_单击标题“非缘勿扰”,元素中有id属性的的文本(主演、导演、标签、剧情

有了这6款浏览器插件,浏览器居然“活了”?!媳妇儿直呼“大开眼界”_浏览器插件助手-程序员宅基地

文章浏览阅读901次,点赞20次,收藏23次。浏览器是每台电脑的必装软件,去浏览器搜索资源和信息已经成为我们的日常,我媳妇儿原本也以为浏览器就是上网冲浪而已,哪有那么强大,但经过我的演示之后她惊呆了,直接给我竖起大拇指道:“原来浏览器还能这么用?大开眼界!今天来给大家介绍几款实用的浏览器插件,学会之后让你的浏览器“活过来”!_浏览器插件助手

NumPy科学数学库_数学中常用的环境有numpy-程序员宅基地

文章浏览阅读101次。NumPy是Python中最常用的科学数学计算库之一,它提供了高效的多维数组对象以及对这些数组进行操作的函数NumPy的核心是ndarray(N-dimensional array)对象,它是一个用于存储同类型数据的多维数组Numpy通常与SciPy(Scientific Python)和 Matplotlib(绘图库)一起使用,用于替代MatLabSciPy是一个开源的Python算法库和数学工具包;Matplotlib是Python语言及其Numpy的可视化操作界面'''_数学中常用的环境有numpy

dind(docker in docker)学习-程序员宅基地

文章浏览阅读1.1w次。docker in docker说白了,就是在docker容器内启动一个docker daemon,对外提供服务。优点在于:镜像和容器都在一个隔离的环境,保持操作者的干净环境。想到了再补充 :)一:低版本启动及访问启动1.12.6-dinddocker run --privileged -d --name mydocker docker:1.12.6-dind在其他容器访问d..._dind

推荐文章

热门文章

相关标签