rdd 内生分组_04、常用RDD操作整理-程序员宅基地

技术标签: rdd 内生分组  

常用Transformation

注:某些函数只有PairRDD只有,而普通的RDD则没有,比如gropuByKey、reduceByKey、sortByKey、join、cogroup等函数要根据Key进行分组或直接操作

RDD基本转换:

RDD[U]

map(f: T => U)

T:原RDD中元素类型

U:新RDD中元素类型

函数将T元素转换为新的U元素

rdd.map(x

=> x + 1)

{1, 2, 3, 3}

=>{2,

3, 4, 4}

RDD[U]

flatMap(f: T => TraversableOnce[U])

TraversableOnce:集合与迭代器的父类

函数将T元素转换为含有新类型U元素的集合,并将这些集合展平(两层转换成一层)后的元素形成新的RDD

rdd.flatMap(x

=> x.to(3))

{1, 2, 3, 3}

=>{1,

2, 3, 2, 3, 3, 3}

RDD[T]

filter(f: T => Boolean)

函数对每个元素进行过滤,通过的元素形成新的RDD

rdd.filter(x

=> x != 1)

{1, 2, 3, 3}

=>{2,

3, 3}

RDD[T]

distinct()

去重

rdd.distinct()

{1, 2, 3, 3}

=>{1,

2, 3}

RDD[U]

mapPartitions(f: Iterator[T] =>

Iterator[U])

与map一样,只是转换时是以分区为单位,将一个分区所有元素包装成Iterator一次性传入函数进行处理,而不像map函数那样每个元素都会调用一个函数,即这里有几个分区则才调用几次函数

假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次

valarr= Array(1,2,3,4,5)

valrdd=sc.parallelize(arr,2)

rdd.mapPartitions((it:

Iterator[Int]) => {varl = List[Int]();

it.foreach((e: Int) => l = e *2:: l); l.iterator })

=>{2,4,6,8,10}

RDD[U]

mapPartitionsWithIndex(f: (Int, Iterator[T]) => Iterator[U])

与mapPartitions类似,不同的时函数多了个分区索引的参数

RDD[T]

union(other: RDD[T])

两个RDD并集,包括重复的元素

rdd.union(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{1,

2, 2, 3, 3, 3, 4, 5}

RDD[T]

intersection(other: RDD[T])

两个RDD交集

rdd.intersection(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{3}

RDD[T]

subtract(other: RDD[T])

两个RDD相减

rdd.subtract(otherRdd)

{ 1, 2, 2, 3, 3}

{ 3, 4, 5}

=>{1,

2, 2}

RDD[(T,

U)] cartesian(other: RDD[U])

两个RDD相减笛卡儿积

rdd.cartesian(otherRdd)

{ 1, 2 }

{ 3, 4}

=>{(1,3),(1,4),(2,3),(2,4)}

RDD[T]

sortBy( f: (T) => K, ascending:

Boolean,numPartitions: Int)

根据转换后的值进行排序,传入的是一个(T) => K转换函数

rdd.sortBy(_._2,

false, 1)

这里根据value进行降序排序

{("leo", 65), ("tom", 50), ("marry", 100),

("jack", 80)}

=>{("marry",

100),("jack", 80),("leo", 65), ("leo", 65)}

RDD[Array[T]]

glom()

将RDD的每个分区中的类型为T的元素转换换数组Array[T]

valarr= Array(1,2,3,4,5)

valrdd=sc.parallelize(arr,2)

valarrRDD=rdd.glom()arrRDD.foreach {

(arr: Array[Int]) => { println("[ "+ arr.mkString("

") +" ]"); } }

=>[ 1 2 ],[ 3 4 5 ]

键-值RDD转换:

RDD[(K,

U)] mapValues[U](f: V => U)

K:key类型

V:value类型

将value转换为新的U元素,Key不变

rdd.mapValues(_

+ 1)

{"class1", 80), ("class2", 70)}

=>{"class1",

81), ("class2", 71)}

RDD[(K,

U)] flatMapValues(f: V =>

TraversableOnce[U])

对[K,V]型数据中的V值flatmap操作

rdd.flatMapValues(_.toCharArray())

{ (1, "ab"), (2, "bc")}

=>{(1,

'a'), (1, 'b'), (2, 'b'), (2, 'c')}

RDD[(K,

Iterable[V])] groupByKey()

根据key进行分组,同一组的元素组成Iterable,并以(key, Iterable)元组类型为元素作为新的RDD返回

rdd.groupByKey()

{("class1", 80), ("class2", 75),

("class1", 90), ("class2", 60)}

=>{("class1",[80,90]),("class2",[75,60])}

RDD[(K,

Iterable[T])] groupBy(f: T => K)

T:原RDD元素类型

K:新RDD中元素Key的类型

根据函数将元素T映射成相应K后,以此K进行分组

rdd.groupBy({

case 1 => 1; case 2 => 2; case "二" => 2 })

{ 1, 2, "二"

}

=>{(1,[1]),(2,[2,

"二"])}

RDD[(K,

V)] reduceByKey(func: (V, V) => V)

先根据key进行分组,再对同一组中的的value进行reduce操作:第一次调用函数时传入的是两个Key所对应的value,从第二次往后,传入的两个参数中的第一个为上次函数计算的结果,第二个参数为其它Key的value

rdd.

reduceByKey(_ + _)

{("class1", 80), ("class2", 75),

("class1", 90), ("class2", 60)}

=>{("class1",

170),("class2", 135)}

RDD[(K,

V)] sortByKey()

根据key的大小进行排序(注:并不是先以Key进行分组,再对组类进行排序,而是直接根据Key的值进行排序)

rdd.sortByKey(false)

{(65, "leo"), (50, "tom"),(100,

"marry"), (85, "jack")}

=>{(100,

"marry"),(85, "jack"),(65, "eo"),(50,

"tom")}

RDD[(K,

V)] foldByKey(zeroValue: V)(func: (V,

V) => V):

zeroValue:每个分区相同Key累计时的初始值,以及不同分区相同Key合并时的初始值

e.g., Nilfor list concatenation, 0

for addition, or 1 for multiplication

对每个value先进行func操作,且funcfoldByKey函数是通过调用函数实现的。

zeroVale:对V进行初始化,实际上是通过CombineByKey的createCombiner实现的V =>

(zeroValue,V),再通过func函数映射成新的值,即func(zeroValue,V)

func: Value将通过func函数按Key值进行合并(实际上是通过CombineByKey的mergeValue,mergeCombiners函数实现的,只不过在这里,这两个函数是相同的)

valpeople= List(("Mobin",1), ("Lucy",2), ("Amy",3), ("Amy",4), ("Lucy",5))

valrdd=sc.parallelize(people,2)

valfoldByKeyRDD=rdd.foldByKey(10)((v1, v2)

=> { println(v1 +" + "+ v2 +" =

"+ (v1 + v2)); v1 + v2 })//先对每个V都加10,再对相同Key的value值相加

foldByKeyRDD.foreach(println)

//处理第一个分区数据

10+ 1 = 11 // ("Mobin",

1)

10+ 2 = 12 // ("Lucy",

2)

=====================

//处理第二个分区数据

10+ 3 = 13 // ("Amy", 3)

13 + 4

= 17 // ("Amy", 4)同分区同Key的Val先合并

10+ 5 = 15 // ("Lucy",

5)

=====================

//将不同分区相同Key的Value合并起来

12 +

15 = 27 // "Lucy"跨分区,所以需合并

(Amy,17)

(Mobin,11)

(Lucy,27)

RDD[(K,

(V, Option[W]))] leftOuterJoin[W](other:

RDD[(K, W)]):

左外连接,包含左RDD的所有数据,如果右边没有与之匹配的用None表示

valarr= List(("A",1), ("A",2), ("B",1))

valarr1= List(("A","A1"), ("A","A2"))

valrdd=sc.parallelize(arr,2)

valrdd1=sc.parallelize(arr1,2)

valleftOutJoinRDD=rdd.leftOuterJoin(rdd1)

leftOutJoinRDD.foreach(println)

=>

(B,(1,None))

(A,(1,Some(A1)))

(A,(1,Some(A2)))

(A,(2,Some(A1)))

(A,(2,Some(A2)))

RDD[(K,

(Option[V], W))] rightOuterJoin[W](other:

RDD[(K, W)])

右外连接,包含右RDD的所有数据,如果左边没有与之匹配的用None表示

valarr= List(("A",1), ("A",2))

valarr1= List(("A","A1"), ("A","A2"), ("B",1))

valrdd=sc.parallelize(arr,2)

valrdd1=sc.parallelize(arr1,2)

valleftOutJoinRDD=rdd.rightOuterJoin(rdd1)

leftOutJoinRDD.foreach(println)

(B,(None,1))

(A,(Some(1),A1))

(A,(Some(1),A2))

(A,(Some(2),A1))

(A,(Some(2),A2))

RDD[(K,

(V, W))] join(other: RDD[(K, W))

W:另一RDD元素的value的类型

对两个包含对的RDD根据key进行join操作,返回类型

rdd.join(otherRdd)

{(1, "leo"),(2, "jack"),(3, "tom")}

{(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)}

=>{(1,("leo",100)),(1,("leo",70)),(2,

("jack",90),(2, ("jack",80),(3, ("tom",60),(3,

("tom",50))}

RDD[(K,

(Iterable[V], Iterable[W]))] cogroup(other:

RDD[(K, W)])

同join,也是根据key进行join,只不过相同key的value分别存放到Iterable中

rdd.cogroup(otherRdd)

{(1, "leo"),(2, "jack"),(3, "tom")}

{(1, 100), (2, 90), (3, 60), (1, 70), (2, 80), (3, 50)}

=>{(1,(["leo"],[100,70])),(2,

(["jack"],[90,80])),(3,

(["tom","lily"],[60,50]))}

常用Action

T reduce(f: (T, T) => T)

对所有元素进行reduce操作

rdd.reduce(_

+ _)

{1, 2, 2, 3, 3, 3}

=>14

Array[T]

collect()

将RDD中所有元素返回到一个数组里

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.collect()

{1, 2, 3, 3}

=>[1,

2, 3, 3]

Map[K,

V] collectAsMap()

作用于K-V类型的RDD上,作用与collect不同的是collectAsMap函数不包含重复的key,对于重复的key,后面的元素覆盖前面的元素

rdd.collectAsMap()

{ ("leo", 65), ("tom", 50), ("tom",

100)}

=>{

("leo", 65), ("tom", 100)}

Long count()

统计RDD中的元素个数

rdd.count()

{1, 2, 3, 3}

=>4

Map[T,

Long] countByValue()

各元素在RDD中出现的次数

注意:This method should only

be used if the resulting map is expected to be small, as the whole thing is

loaded into the driver's memory.

To handle

very large results, consider usingrdd.map(x => (x, 1L)).reduceByKey(_ + _), which

returns anRDD[T, Long]instead of amap.

rdd.countByValue()

{1, 2, 3, 3}

=>Map(1

-> 1, 3 -> 2, 2 -> 1)

Map[K,

Long] countByKey()

先根据Key进行分组,再对每组里的value分别进行计数统计

注意:This method should only

be used if the resulting map is expected to be small, as the whole thing is

loaded into the driver's memory.

To handle

very large results, consider usingrdd.mapValues(_ => 1L).reduceByKey(_ + _), whichreturns

anRDD[T, Long]instead of amap.

{ ("leo", 65), ("tom", 50), ("tom", 100),

("tom", 100) }

=>Map(leo

-> 1, tom -> 3)

T first()

取第一个元素,实质上是调用take(1)实现的

rdd.first()

{3, 2,

1, 4}

=>3

Array[T]

take(num: Int)

从RDD中返回前num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.take(2)

{3, 2, 1, 4}

=>[3,

2]

Array[T]

top(num: Int ) (implicit ord:

Ordering[T])

如果没有传递ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,可以传递一个自定义的Ordering来覆盖默认提供。top实现是将Ordering反序后再调用takeOrdered的:takeOrdered(num)(ord.reverse)

默认从RDD中返回最最大的num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.top(2)

{3, 2, 1, 4}

=>[4,

3]

Array[T]

takeOrdered(num: Int)(implicit ord:

Ordering[T])

如果没有传递ord参数,则使用隐式参数,且提供的默认隐式参数为升序排序,可以传递一个自定义的Ordering来覆盖默认提供

与top相反,默认取的是前面最小的num个元素

注意:This method should only

be used if the resulting array is expected to be small, as all the data is

loaded into the driver's memory.

rdd.takeOrdered(2)(myOrdering)

{3, 2, 1, 4}

=>[1,

2]

T fold(zeroValue: T)(op: (T, T) => T)

zeroValue:为每个分区累计的初始值,以及不同分区累计的初始值

e.g., Nilfor list concatenation, 0

for addition, or 1 for multiplication

和reduce()一

样, 但 是 需 要

提供初始值。注意:每个分区应用op函数时,都会以zeroValue为初始值进行计算,然后将每个分区的结果合并时,还是会以zeroValue为初始值进行合并计算

valarr= Array(1,2,3,4,5);

valrdd=sc.parallelize(arr,2)//分成两分区[1,

2] [3, 4, 5]

println(rdd.fold(10)((v1, v2)

=> { println(v1 +" + "+ v2 +" =

"+ (v1 + v2)); v1 + v2 }))

//处理第一个分区数据

10+ 1 = 11

11 + 2

= 13 //从第二个元素起,每分区内先累加

=====================

//处理第一个分区数据

10+ 3 = 13

13 + 4

= 17 //从第二个元素起,每分区内先累加

17 + 5

= 22 //从第二个元素起,每分区内先累加

=====================

//将各分区汇总起来

10+ 13 = 23 //汇总时还会使用初始值来作起始

23 +

22 = 45

45

U aggregate (zeroValue: U)(seqOp: (U, T) => U,

combOp: (U, U) => U)

初始值类型与原始数据类型可以不同,但初始值类型决定了返回值类型

与fold一样,计算时需要提供初始值,不同的是,分区的计算函数(seqOp)与分区合并计算函数(combOp)是不同的,但fold分区计算函数与分区合并计算函数是同一函数

rdd.fold(5)(_

+ _, _ + _)

val

arr = Array(1, 2, 3, 4);

val

rdd = sc.parallelize(arr, 2)

println(rdd.aggregate(5)(

(v1,

v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 +

v2 },

(v1,

v2) => { println("v1 = " + v1 + " ; v2 = " + v2); v1 +

v2 })

)

过程与结果与上面的fold函数一样

Unit saveAsTextFile(path: String)

将RDD元素保存到文件中,对每个元素调用toString方法

Unit foreach(f: T => Unit)

遍历RDD中的每个元素

rdd.foreach(println(_))

comineByKey

defcombineByKey[C](

createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,

partitioner: Partitioner,

mapSideCombine: Boolean =true,

serializer: Serializer =null): RDD[(K, C)]

createCombiner:在第一次遇到Key时创建组合器函数,将RDD数据集中的V类型值转换C类型值(V => C),

mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner道理的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C

mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值

partitioner:使用已有的或自定义的分区函数,默认是HashPartitioner

mapSideCombine:是否在map端进行Combine操作,默认为true

例:统计男性和女生的个数,并以(性别,(名字,名字....),个数)的形式输出

objectCombineByKey {

defmain(args:

Array[String]) {

valconf=newSparkConf().setMaster("local").setAppName("combinByKey")

valsc=newSparkContext(conf)

valpeople= List(("male","Mobin"), ("male","Kpop"), ("female","Lucy"), ("male","Lufei"), ("female","Amy"))

valrdd=sc.parallelize(people)

valcombinByKeyRDD=rdd.combineByKey(

(x: String) => (List(x),1),

(peo: (List[String], Int), x: String) => (x :: peo._1, peo._2+1),

(sex1: (List[String], Int), sex2: (List[String], Int)) => (sex1._1::: sex2._1, sex1._2+ sex2._2))

combinByKeyRDD.foreach(println)

sc.stop()

}

}

输出:

(male,(List(Lufei, Kpop,

Mobin),3))

(female,(List(Amy,

Lucy),2))

计算过程:

Partition1:

K="male"  -->

("male","Mobin")  -->

createCombiner("Mobin") =>  peo1 = (

List("Mobin") , 1 )

K="male"  -->

("male","Kpop")  -->

mergeValue(peo1,"Kpop") =>  peo2 = (

"Kpop"  ::  peo1_1 , 1 + 1 )//Key相同调用mergeValue函数对值进行合并

K="female"  -->

("female","Lucy")  -->

createCombiner("Lucy") =>  peo3 = (

List("Lucy") , 1 )

Partition2:

K="male"  -->

("male","Lufei")  -->

createCombiner("Lufei") =>  peo4 = (  List("Lufei")

, 1 )

K="female"  -->

("female","Amy")  -->

createCombiner("Amy") =>  peo5 = (

List("Amy") , 1 )

Merger Partition:

K="male" --> mergeCombiners(peo2,peo4) =>

(List(Lufei,Kpop,Mobin))

K="female" --> mergeCombiners(peo3,peo5)

=> (List(Amy,Lucy))

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_39646706/article/details/111483055

智能推荐

240320俄罗斯方块java,JAVA游戏编程之三----j2me 手机游戏入门开发--俄罗斯方块_2-程序员宅基地

文章浏览阅读202次。packagecode;//importjava.awt.*;//importjava.awt.Canvas;//importjava.awt.event.*;//importjavax.swing.*;importjava.util.Random;importjavax.microedition.lcdui.*;//写界面所需要的包/***//***俄罗斯方块*高雷*2007年1..._240×320java游戏

在线电影院售票平台(源码+开题报告)-程序员宅基地

文章浏览阅读779次,点赞14次,收藏19次。然后,实现系统的数据管理和服务功能,包括用户的注册与登录、电影的分类与展示、电影信息的查询与推荐、座位的选择与预订、在线支付与电子票生成等。此外,随着在线视频平台的兴起,越来越多的人选择在线观看电影,这对传统电影院产生了巨大的冲击。研究意义: 开发在线电影院售票平台对于提升用户的观影体验、优化电影院的运营效率、促进电影产业的发展具有重要的意义。该系统旨在通过技术手段解决传统电影院售票中的问题,提供一个集成化的电影信息展示、座位选择、在线支付和用户评价平台,同时也为电影院和电影制作方提供有效的工具。

程序员熬夜写代码,用C/C++打造一个安全的即时聊天系统!_基于c++的即时聊天系统设计-程序员宅基地

文章浏览阅读509次。保护我们剩下的人的通话信息安全,使用TOX可以让你在和家人,朋友,爱人交流时保护你的隐私不受政府无孔不入的的偷窥.关于TOX:其他牛逼的软件因为一些细化服务问你要钱的时候, TOX分文不取 . 你用了TOX, 想干嘛就干嘛.网友评论:项目源码展示:源码测试效果:最后,如果你学C/C++编程有什么不懂的,可以来问问我哦,或许我能够..._基于c++的即时聊天系统设计

linux Java服务swap分区被占用内存泄露问题故障及解决方法_linux swap占用很高-程序员宅基地

文章浏览阅读584次。鱼弦:CSDN内容合伙人、CSDN新星导师、全栈领域创作新星创作者 、51CTO(Top红人+专家博主) 、github开源爱好者(go-zero源码二次开发、游戏后端架构 https://github.com/Peakchen)当Java服务在Linux系统中运行时,可能会出现swap分区被占用的内存泄露问题,导致系统性能下降或者崩溃。下面是该问题的故障及解决方法、底层结构、架构图、工作原理、使用场景详解和实际应用方式、原理详细描述、相关命令使用示例以及文献材料链接。_linux swap占用很高

word中利用宏替换标点标点全角与半角-程序员宅基地

文章浏览阅读662次。Alt+F11,然后插入-模块:复制下面代码到编辑窗口:Sub 半角标点符号转换为全角标点符号()'中英互译文档中将中文段落中的英文标点符号替换为中文标点符号 Dim i As Paragraph, ChineseInterpunction() As Variant, EnglishInterpunction() As Variant Dim MyRange..._替换半角宏

Android WebView使用总结_android webview真正加载完成-程序员宅基地

文章浏览阅读2.8k次。#.简介: WebView是Android提供的用来展示展示web页面的View,内部使用webkit浏览器引擎(一个轻量级的浏览器引擎),除了展示Web页面外,还可与Web页面内的JS脚本交互调用。WebView内部的WebSetting对象负责管理WebView的参数配置; WebViewClient负责处理WebView的各种请求和通知事件,在对应事件发生时会执行WebViewClient的对应回调; ChromeWebviewClient辅助Webview处理与JS一些交互......_android webview真正加载完成

随便推点

bitcoin 调试环境搭建-程序员宅基地

文章浏览阅读1.6k次。_bitcoin 调试环境搭建

曲线生成 | 图解B样条曲线生成原理(基本概念与节点生成算法)-程序员宅基地

文章浏览阅读4.3k次,点赞93次,收藏94次。为了解决贝塞尔曲线无法局部修正、控制性减弱、曲线次数过高、不易拼接的缺陷,引入B样条曲线(B-Spline)。本文介绍B样条曲线的基本概念:节点向量、支撑性、次数阶数、加权性质、节点生成算法等,为后续曲线计算打下基础。_样条曲线生成

CDH安装宝典之ClouderaManager_/opt/cloudera/cm-agent/service/mgmt/mgmt.sh: line -程序员宅基地

文章浏览阅读902次。配置本地repo库下载我的阿里云盘文件文件放置#创建目录mkdir -p /opt/cloudera/parcel-repo/mkdir -p /opt/cloudera/cm/yum install createrepoCDH 6.2.0 的三个文件放到/opt/cloudera/parcel-repo/中,并且注意把sha256后缀的文件名修改为sha#执行createrepo命令生成rpm元数据 最终/opt/cloudera/parcel-repo/会多一个repodata目录_/opt/cloudera/cm-agent/service/mgmt/mgmt.sh: line 76: /usr/java/jdk1.8.0_181

uni.canvasToTempFilePath在app正常,微信小程序报错: fail canvas is empty-程序员宅基地

文章浏览阅读943次,点赞2次,收藏2次。uni.canvasToTempFilePath_uni.canvastotempfilepath

SDRAM笔记_sdram 干扰-程序员宅基地

文章浏览阅读3.1k次。SRAM :静态RAM,不用刷新,速度可以非常快,像CPU内部的cache,都是静态RAM,缺点是一个内存单元需要的晶体管数量多,因而价格昂贵,容量不大。DRAM:动态RAM,需要刷新,容量大。SDRAM:同步动态RAM,需要刷新,速度较快,容量大。DDR SDRAM:双通道同步动态RAM,需要刷新,速度快,容量大。........................_sdram 干扰

Excel转SQL语句_excel数据怎么生成sql语句-程序员宅基地

文章浏览阅读7.3k次。假设表格有A、B、C、D四列数据,希望导入到你的数据库中表格table,对应的字段分别是col1、col2、col3、col4。_excel数据怎么生成sql语句

推荐文章

热门文章

相关标签