rdd 内生分组_04、常用RDD操作整理-程序员宅基地

技术标签: rdd 内生分组  

常用Transformation

注:某些函数只有PairRDD只有,而普通的RDD则没有,比如gropuByKey、reduceByKey、sortByKey、join、cogroup等函数要根据Key进行分组或直接操作

RDD基本转换:

RDD[U]

map(f: T => U)

T:原RDD中元素类型

U:新RDD中元素类型

函数将T元素转换为新的U元素

rdd.map(x

=> x + 1)

{1, 2, 3, 3}

=>{2,

3, 4, 4}

RDD[U]

flatMap(f: T => TraversableOnce[U])

TraversableOnce:集合与迭代器的父类

函数将T元素转换为含有新类型U元素的集合,并将这些集合展平(两层转换成一层)后的元素形成新的RDD

rdd.flatMap(x

=> x.to(3))

{1, 2, 3, 3}

=>{1,

2, 3, 2, 3, 3, 3}

RDD[T]

filter(f: T => Boolean)

函数对每个元素进行过滤,通过的元素形成新的RDD

rdd.filter(x

=> x != 1)

{1, 2, 3, 3}

=>{2,

3, 3}

RDD[T]

distinct()

去重

rdd.distinct()

{1, 2, 3, 3}

=>{1,

2, 3}

RDD[U]

mapPartitions(f: Iterator[T] =>

Iterator[U])

与map一样,只是转换时是以分区为单位,将一个分区所有元素包装成Iterator一次性传入函数进行处理,而不像map函数那样每个元素都会调用一个函数,即这里有几个分区则才调用几次函数

假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次

valarr= Array(1,2,3,4,5)

valrdd=sc.parallelize(arr,2)

rdd.mapPartitions((it:

Iterator[Int]) => {varl = List[Int]();

it.foreach((e: Int) => l = e *2:: l); l.iterator })

=>{2,4,6,8,10}

RDD[U]

mapPartitionsWithIndex(f: (Int, Iterator[T]) => Iterator[U])

与mapPartitions类似,不同的时函数多了个分区索引的参数

RDD[T]

union(other: RDD[T])

两个RDD并集,包括重复的元素

rdd.union(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{1,

2, 2, 3, 3, 3, 4, 5}

RDD[T]

intersection(other: RDD[T])

两个RDD交集

rdd.intersection(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{3}

RDD[T]

subtract(other: RDD[T])

两个RDD相减

rdd.subtract(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{1,

2, 2}

RDD[(T,

U)] cartesian(other: RDD[U])

两个RDD相减笛卡儿积

rdd.cartesian(otherRdd)

{ 1, 2 }

{ 3, 4}

=>{(1,3),(1,4),(2,3),(2,4)}

RDD[T]

sortBy( f: (T) => K, ascending:

Boolean,numPartitions: Int)

根据转换后的值进行排序,传入的是一个(T) => K转换函数

rdd.sortBy(_._2,

false, 1)

这里根据value进行降序排序

{("leo", 65), ("tom", 50), ("marry", 100),

("jack", 80)}

=>{("marry",

100),("jack", 80),("leo", 65), ("leo", 65)}

RDD[Array[T]]

glom()

将RDD的每个分区中的类型为T的元素转换换数组Array[T]

valarr= Array(1,2,3,4,5)

valrdd=sc.parallelize(arr,2)

valarrRDD=rdd.glom()arrRDD.foreach {

(arr: Array[Int]) => { println("[ "+ arr.mkString("

") +" ]"); } }

=>[ 1 2 ],[ 3 4 5 ]

键-值RDD转换:

RDD[(K,

U)] mapValues[U](f: V => U)

K:key类型

V:value类型

将value转换为新的U元素,Key不变

rdd.mapValues(_

+ 1)

{"class1", 80), ("class2", 70)}

=>{"class1",

81), ("class2", 71)}

RDD[(K,

U)] flatMapValues(f: V =>

TraversableOnce[U])

对[K,V]型数据中的V值flatmap操作

rdd.flatMapValues(_.toCharArray())

{ (1, "ab"), (2, "bc")}

=>{(1,

'a'), (1, 'b'), (2, 'b'), (2, 'c')}

RDD[(K,

Iterable[V])] groupByKey()

根据key进行分组,同一组的元素组成Iterable,并以(key, Iterable)元组类型为元素作为新的RDD返回

rdd.groupByKey()

{("class1", 80), ("class2", 75),

("class1", 90), ("class2", 60)}

=>{("class1",[80,90]),("class2",[75,60])}

RDD[(K,

Iterable[T])] groupBy(f: T => K)

T:原RDD元素类型

K:新RDD中元素Key的类型

根据函数将元素T映射成相应K后,以此K进行分组

rdd.groupBy({

case 1 => 1; case 2 => 2; case "二" => 2 })

{ 1, 2, "二"

}

=>{(1,[1]),(2,[2,

"二"])}

RDD[(K,

V)] reduceByKey(func: (V, V) => V)

先根据key进行分组,再对同一组中的的value进行reduce操作:第一次调用函数时传入的是两个Key所对应的value,从第二次往后,传入的两个参数中的第一个为上次函数计算的结果,第二个参数为其它Key的value

rdd.

reduceByKey(_ + _)

{("class1", 80), ("class2", 75),

("class1", 90), ("class2", 60)}

=>{("class1",

170),("class2", 135)}

RDD[(K,

V)] sortByKey()

根据key的大小进行排序(注:并不是先以Key进行分组,再对组类进行排序,而是直接根据Key的值进行排序)

rdd.sortByKey(false)

{(65, "leo"), (50, "tom"),(100,

"marry"), (85, "jack")}

=>{(100,

"marry"),(85, "jack"),(65, "eo"),(50,

"tom")}

RDD[(K,

V)] foldByKey(zeroValue: V)(func: (V,

V) => V):

zeroValue:每个分区相同Key累计时的初始值,以及不同分区相同Key合并时的初始值

e.g., Nilfor list concatenation, 0

for addition, or 1 for multiplication

对每个value先进行func操作,且funcfoldByKey函数是通过调用函数实现的。

zeroVale:对V进行初始化,实际上是通过CombineByKey的createCombiner实现的V =>

(zeroValue,V),再通过func函数映射成新的值,即func(zeroValue,V)

func: Value将通过func函数按Key值进行合并(实际上是通过CombineByKey的mergeValue,mergeCombiners函数实现的,只不过在这里,这两个函数是相同的)

valpeople= List(("Mobin",1), ("Lucy",2), ("Amy",3), ("Amy",4), ("Lucy",5))

valrdd=sc.parallelize(people,2)

valfoldByKeyRDD=rdd.foldByKey(10)((v1, v2)

=> { println(v1 +" + "+ v2 +" =

"+ (v1 + v2)); v1 + v2 })//先对每个V都加10,再对相同Key的value值相加

foldByKeyRDD.foreach(println)

//处理第一个分区数据

10+ 1 = 11 // ("Mobin",

1)

10+ 2 = 12 // ("Lucy",

2)

=====================

//处理第二个分区数据

10+ 3 = 13 // ("Amy", 3)

13 + 4

= 17 // ("Amy", 4)同分区同Key的Val先合并

10+ 5 = 15 // ("Lucy",

5)

=====================

//将不同分区相同Key的Value合并起来

12 +

15 = 27 // "Lucy"跨分区,所以需合并

(Amy,17)

(Mobin,11)

(Lucy,27)

RDD[(K,

(V, Option[W]))] leftOuterJoin[W](other:

RDD[(K, W)]):

左外连接,包含左RDD的所有数据,如果右边没有与之匹配的用None表示

valarr= List(("A",1), ("A",2), ("B",1))

valarr1= List(("A","A1"), ("A","A2"))

valrdd=sc.parallelize(arr,2)

valrdd1=sc.parallelize(arr1,2)

valleftOutJoinRDD=rdd.leftOuterJoin(rdd1)

leftOutJoinRDD.foreach(println)

=>

(B,(1,None))

(A,(1,Some(A1)))

(A,(1,Some(A2)))

(A,(2,Some(A1)))

(A,(2,Some(A2)))

RDD[(K,

(Option[V], W))] rightOuterJoin[W](other:

RDD[(K, W)])

右外连接,包含右RDD的所有数据,如果左边没有与之匹配的用None表示

valarr= List(("A",1), ("A",2))

valarr1= List(("A","A1"), ("A","A2"), ("B",1))

valrdd=sc.parallelize(arr,2)

valrdd1=sc.parallelize(arr1,2)

valleftOutJoinRDD=rdd.rightOuterJoin(rdd1)

leftOutJoinRDD.foreach(println)

(B,(None,1))

(A,(Some(1),A1))

(A,(Some(1),A2))

(A,(Some(2),A1))

(A,(Some(2),A2))

RDD[(K,

(V, W))] join(other: RDD[(K, W))

W:另一RDD元素的value的类型

对两个包含对的RDD根据key进行join操作,返回类型

rdd.join(otherRdd)

{(1, "leo"),(2, "jack"),(3, "tom")}

{(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)}

=>{(1,("leo",100)),(1,("leo",70)),(2,

("jack",90),(2, ("jack",80),(3, ("tom",60),(3,

("tom",50))}

RDD[(K,

(Iterable[V], Iterable[W]))] cogroup(other:

RDD[(K, W)])

同join,也是根据key进行join,只不过相同key的value分别存放到Iterable中

rdd.cogroup(otherRdd)

{(1, "leo"),(2, "jack"),(3, "tom")}

{(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)}

=>{(1,(["leo"],[100,70])),(2,

(["jack"],[90,80])),(3,

(["tom","lily"],[60,50]))}

常用Action

T reduce(f: (T, T) => T)

对所有元素进行reduce操作

rdd.reduce(_

+ _)

{1, 2, 2, 3, 3, 3}

=>14

Array[T]

collect()

将RDD中所有元素返回到一个数组里

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.collect()

{1, 2, 3, 3}

=>[1,

2, 3, 3]

Map[K,

V] collectAsMap()

作用于K-V类型的RDD上,作用与collect不同的是collectAsMap函数不包含重复的key,对于重复的key,后面的元素覆盖前面的元素

rdd.collectAsMap()

{ ("leo", 65), ("tom", 50), ("tom",

100)}

=>{

("leo", 65), ("tom", 100)}

Long count()

统计RDD中的元素个数

rdd.count()

{1, 2, 3, 3}

=>4

Map[T,

Long] countByValue()

各元素在RDD中出现的次数

注意:This method should only

be used if the resulting map is expected to be small, as the whole thing is

loaded into the driver's memory.

To handle

very large results, consider usingrdd.map(x => (x, 1L)).reduceByKey(_ + _), which

returns anRDD[T, Long]instead of amap.

rdd.countByValue()

{1, 2, 3, 3}

=>Map(1

-> 1, 3 -> 2, 2 -> 1)

Map[K,

Long] countByKey()

先根据Key进行分组,再对每组里的value分别进行计数统计

注意:This method should only

be used if the resulting map is expected to be small, as the whole thing is

loaded into the driver's memory.

To handle

very large results, consider usingrdd.mapValues(_ => 1L).reduceByKey(_ + _), whichreturns

anRDD[T, Long]instead of amap.

{ ("leo", 65), ("tom", 50), ("tom", 100),

("tom", 100) }

=>Map(leo

-> 1, tom -> 3)

T first()

取第一个元素,实质上是调用take(1)实现的

rdd.first()

{3, 2,

1, 4}

=>3

Array[T]

take(num: Int)

从RDD中返回前num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.take(2)

{3, 2, 1, 4}

=>[3,

2]

Array[T]

top(num: Int ) (implicit ord:

Ordering[T])

如果没有传递ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,可以传递一个自定义的Ordering来覆盖默认提供。top实现是将Ordering反序后再调用takeOrdered的:takeOrdered(num)(ord.reverse)

默认从RDD中返回最最大的num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.top(2)

{3, 2, 1, 4}

=>[4,

3]

Array[T]

takeOrdered(num: Int)(implicit ord:

Ordering[T])

如果没有传递ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,可以传递一个自定义的Ordering来覆盖默认提供

与top相反,默认取的是前面最小的num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.takeOrdered(2)(myOrdering)

{3, 2, 1, 4}

=>[1,

2]

T fold(zeroValue: T)(op: (T, T) => T)

zeroValue:为每个分区累计的初始值,以及不同分区累计的初始值

e.g., Nilfor list concatenation, 0

for addition, or 1 for multiplication

和reduce()一

样, 但 是 需 要

提供初始值。注意:每个分区应用op函数时,都会以zeroValue为初始值进行计算,然后将每个分区的结果合并时,还是会以zeroValue为初始值进行合并计算

valarr= Array(1,2,3,4,5);

valrdd=sc.parallelize(arr,2)//分成两分区[1,

2] [3, 4, 5]

println(rdd.fold(10)((v1, v2)

=> { println(v1 +" + "+ v2 +" =

"+ (v1 + v2)); v1 + v2 }))

//处理第一个分区数据

10+ 1 = 11

11 + 2

= 13 //从第二个元素起,每分区内先累加

=====================

//处理第一个分区数据

10+ 3 = 13

13 + 4

= 17 //从第二个元素起,每分区内先累加

17 + 5

= 22 //从第二个元素起,每分区内先累加

=====================

//将各分区汇总起来

10+ 13 = 23 //汇总时还会使用初始值来作起始

23 +

22 = 45

45

U aggregate (zeroValue: U)(seqOp: (U, T) => U,

combOp: (U, U) => U)

初始值类型与原始数据类型可以不同,但初始值类型决定了返回值类型

与fold一样,计算时需要提供初始值,不同的是,分区的计算函数(seqOp)与分区合并计算函数(combOp)是不同的,但fold分区计算函数与分区合并计算函数是同一函数

rdd.fold(5)(_

+ _, _ + _)

val

arr = Array(1, 2, 3, 4);

val

rdd = sc.parallelize(arr, 2)

println(rdd.aggregate(5)(

(v1,

v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 +

v2 },

(v1,

v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 +

v2 })

)

过程与结果与上面的fold函数一样

Unit saveAsTextFile(path: String)

将RDD元素保存到文件中,对每个元素调用toString方法

Unit foreach(f: T => Unit)

遍历RDD中的每个元素

rdd.foreach(println(_))

comineByKey

defcombineByKey[C](

createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,

partitioner: Partitioner,

mapSideCombine: Boolean =true,

serializer: Serializer =null): RDD[(K, C)]

createCombiner:在第一次遇到Key时创建组合器函数,将RDD数据集中的V类型值转换C类型值(V => C),

mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner道理的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C

mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值

partitioner:使用已有的或自定义的分区函数,默认是HashPartitioner

mapSideCombine:是否在map端进行Combine操作,默认为true

例:统计男性和女生的个数,并以(性别,(名字,名字....),个数)的形式输出

objectCombineByKey {

defmain(args:

Array[String]) {

valconf=newSparkConf().setMaster("local").setAppName("combinByKey")

valsc=newSparkContext(conf)

valpeople= List(("male","Mobin"), ("male","Kpop"), ("female","Lucy"), ("male","Lufei"), ("female","Amy"))

valrdd=sc.parallelize(people)

valcombinByKeyRDD=rdd.combineByKey(

(x: String) => (List(x),1),

(peo: (List[String], Int), x: String) => (x :: peo._1, peo._2+1),

(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1::: sex2._1, sex1._2+ sex2._2))

combinByKeyRDD.foreach(println)

sc.stop()

}

}

输出:

(male,(List(Lufei, Kpop,

Mobin),3))

(female,(List(Amy,

Lucy),2))

计算过程:

Partition1:

K="male"  -->

("male","Mobin")  -->

createCombiner("Mobin") =>  peo1 = (

List("Mobin") , 1 )

K="male"  -->

("male","Kpop")  -->

mergeValue(peo1,"Kpop") =>  peo2 = (

"Kpop"  ::  peo1_1 , 1 + 1 )//Key相同调用mergeValue函数对值进行合并

K="female"  -->

("female","Lucy")  -->

createCombiner("Lucy") =>  peo3 = (

List("Lucy") , 1 )

Partition2:

K="male"  -->

("male","Lufei")  -->

createCombiner("Lufei") =>  peo4 = (  List("Lufei")

, 1 )

K="female"  -->

("female","Amy")  -->

createCombiner("Amy") =>  peo5 = (

List("Amy") , 1 )

Merger Partition:

K="male" --> mergeCombiners(peo2,peo4) =>

(List(Lufei,Kpop,Mobin))

K="female" --> mergeCombiners(peo3,peo5)

=> (List(Amy,Lucy))

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_39646706/article/details/111483055

智能推荐

解决win10/win8/8.1 64位操作系统MT65xx preloader线刷驱动无法安装_mt65驱动-程序员宅基地

文章浏览阅读1.3w次。转载自 http://www.miui.com/thread-2003672-1-1.html 当手机在刷错包或者误修改删除系统文件后会出现无法开机或者是移动定制(联通合约机)版想刷标准版,这时就会用到线刷,首先就是安装线刷驱动。 在XP和win7上线刷是比较方便的,用那个驱动自动安装版,直接就可以安装好,完成线刷。不过现在也有好多机友换成了win8/8.1系统,再使用这个_mt65驱动

SonarQube简介及客户端集成_sonar的客户端区别-程序员宅基地

文章浏览阅读1k次。SonarQube是一个代码质量管理平台,可以扫描监测代码并给出质量评价及修改建议,通过插件机制支持25+中开发语言,可以很容易与gradle\maven\jenkins等工具进行集成,是非常流行的代码质量管控平台。通CheckStyle、findbugs等工具定位不同,SonarQube定位于平台,有完善的管理机制及强大的管理页面,并通过插件支持checkstyle及findbugs等既有的流..._sonar的客户端区别

元学习系列(六):神经图灵机详细分析_神经图灵机方法改进-程序员宅基地

文章浏览阅读3.4k次,点赞2次,收藏27次。神经图灵机是LSTM、GRU的改进版本,本质上依然包含一个外部记忆结构、可对记忆进行读写操作,主要针对读写操作进行了改进,或者说提出了一种新的读写操作思路。神经图灵机之所以叫这个名字是因为它通过深度学习模型模拟了图灵机,但是我觉得如果先去介绍图灵机的概念,就会搞得很混乱,所以这里主要从神经图灵机改进了LSTM的哪些方面入手进行讲解,同时,由于模型的结构比较复杂,为了让思路更清晰,这次也会分开几..._神经图灵机方法改进

【机器学习】机器学习模型迭代方法(Python)-程序员宅基地

文章浏览阅读2.8k次。一、模型迭代方法机器学习模型在实际应用的场景,通常要根据新增的数据下进行模型的迭代,常见的模型迭代方法有以下几种:1、全量数据重新训练一个模型,直接合并历史训练数据与新增的数据,模型直接离线学习全量数据,学习得到一个全新的模型。优缺点:这也是实际最为常见的模型迭代方式,通常模型效果也是最好的,但这样模型迭代比较耗时,资源耗费比较多,实时性较差,特别是在大数据场景更为困难;2、模型融合的方法,将旧模..._模型迭代

base64图片打成Zip包上传,以及服务端解压的简单实现_base64可以装换zip吗-程序员宅基地

文章浏览阅读2.3k次。1、前言上传图片一般采用异步上传的方式,但是异步上传带来不好的地方,就如果图片有改变或者删除,图片服务器端就会造成浪费。所以有时候就会和参数同步提交。笔者喜欢base64图片一起上传,但是图片过多时就会出现数据丢失等异常。因为tomcat的post请求默认是2M的长度限制。2、解决办法有两种:① 修改tomcat的servel.xml的配置文件,设置 maxPostSize=..._base64可以装换zip吗

Opencv自然场景文本识别系统(源码&教程)_opencv自然场景实时识别文字-程序员宅基地

文章浏览阅读1k次,点赞17次,收藏22次。Opencv自然场景文本识别系统(源码&教程)_opencv自然场景实时识别文字

随便推点

ESXi 快速复制虚拟机脚本_exsi6.7快速克隆centos-程序员宅基地

文章浏览阅读1.3k次。拷贝虚拟机文件时间比较长,因为虚拟机 flat 文件很大,所以要等。脚本完成后,以复制虚拟机文件夹。将以下脚本内容写入文件。_exsi6.7快速克隆centos

好友推荐—基于关系的java和spark代码实现_本关任务:使用 spark core 知识完成 " 好友推荐 " 的程序。-程序员宅基地

文章浏览阅读2k次。本文主要实现基于二度好友的推荐。数学公式参考于:http://blog.csdn.net/qq_14950717/article/details/52197565测试数据为自己随手画的关系图把图片整理成文本信息如下:a b c d e f yb c a f gc a b dd c a e h q re f h d af e a b gg h f bh e g i di j m n ..._本关任务:使用 spark core 知识完成 " 好友推荐 " 的程序。

南京大学-高级程序设计复习总结_南京大学高级程序设计-程序员宅基地

文章浏览阅读367次。南京大学高级程序设计期末复习总结,c++面向对象编程_南京大学高级程序设计

4.朴素贝叶斯分类器实现-matlab_朴素贝叶斯 matlab训练和测试输出-程序员宅基地

文章浏览阅读3.1k次,点赞2次,收藏12次。实现朴素贝叶斯分类器,并且根据李航《统计机器学习》第四章提供的数据训练与测试,结果与书中一致分别实现了朴素贝叶斯以及带有laplace平滑的朴素贝叶斯%书中例题实现朴素贝叶斯%特征1的取值集合A1=[1;2;3];%特征2的取值集合A2=[4;5;6];%S M LAValues={A1;A2};%Y的取值集合YValue=[-1;1];%数据集和T=[ 1,4,-1;..._朴素贝叶斯 matlab训练和测试输出

Markdown 文本换行_markdowntext 换行-程序员宅基地

文章浏览阅读1.6k次。Markdown 文本换行_markdowntext 换行

错误:0xC0000022 在运行 Microsoft Windows 非核心版本的计算机上,运行”slui.exe 0x2a 0xC0000022″以显示错误文本_错误: 0xc0000022 在运行 microsoft windows 非核心版本的计算机上,运行-程序员宅基地

文章浏览阅读6.7w次,点赞2次,收藏37次。win10 2016长期服务版激活错误解决方法:打开“注册表编辑器”;(Windows + R然后输入Regedit)修改SkipRearm的值为1:(在HKEY_LOCAL_MACHINE–》SOFTWARE–》Microsoft–》Windows NT–》CurrentVersion–》SoftwareProtectionPlatform里面,将SkipRearm的值修改为1)重..._错误: 0xc0000022 在运行 microsoft windows 非核心版本的计算机上,运行“slui.ex