【云星数据---Apache Flink实战系列(精品版)】:Apache Flink批处理API详解与编程实战009--DateSet实用API详解009_def map[r: typeinformation: classtag]-程序员宅基地

技术标签: flink  cloudcomputing  数据  bigdata  apache  批处理  编程  api  

DateSet的API详解九

join

def join[O](other: DataSet[O], strategy: JoinHint): UnfinishedJoinOperation[T, O]
def join[O](other: DataSet[O]): UnfinishedJoinOperation[T, O]

Creates a new DataSet by joining this DataSet with the other DataSet.

将两个DataSet进行join操作

join示例一:

执行程序:

//1.创建一个 DataSet其元素为[(Int,String)]类型
val input1: DataSet[(Int, String)] =  benv.fromElements(
(2,"zhagnsan"),(3,"lisi"),(4,"wangwu"),(5,"zhaoliu"))

//2.创建一个 DataSet其元素为[(Double, Int)]类型
val input2: DataSet[(Double, Int)] =  benv.fromElements(
(1850.98,4),(1950.98,5),(2350.98,6),(3850.98,3))

//3.两个DataSet进行join操作,条件是input1(0)==input2(1)
val result = input1.join(input2).where(0).equalTo(1)

//4.显示结果
result.collect

执行结果:

res56: Seq[((Int, String), (Double, Int))] = Buffer(
((4,wangwu),(1850.98,4)), 
((5,zhaoliu),(1950.98,5)), 
((3,lisi),(3850.98,3)))

web ui中的执行效果:
这里写图片描述

join示例二:

A Join transformation can also call a user-defined join function to process joining tuples. 
A join function receives one element of the first input DataSet and one element of the second 
input DataSet and returns exactly one element.

The following code performs a join of DataSet with custom java objects and a Tuple DataSet using 
key-selector functions and shows how to use a user-defined join function:

执行程序:

//1.定义case class
case class Rating(name: String, category: String, points: Int)

//2.定义DataSet[Rating]
val ratings: DataSet[Rating] = benv.fromElements(
Rating("moon","youny1",3),Rating("sun","youny2",4),
Rating("cat","youny3",1),Rating("dog","youny4",5))

//3.创建DataSet[(String, Double)] 
val weights: DataSet[(String, Double)] = benv.fromElements(
("youny1",4.3),("youny2",7.2),
("youny3",9.0),("youny4",1.5))

//4.使用方法进行join
val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
  (rating, weight) => (rating.name, rating.points + weight._2)
}

//5.显示结果
weightedRatings.collect

程序解析:

res57: Seq[(String, Double)] = Buffer((moon,7.3), (sun,11.2), (cat,10.0), (dog,6.5))

web ui中的执行效果:
这里写图片描述

join示例三:???

A Join transformation can also call a user-defined join function to process joining tuples. 
A join function receives one element of the first input DataSet and one element of the second 
input DataSet and returns exactly one element.

The following code performs a join of DataSet with custom java objects and a Tuple DataSet using 
key-selector functions and shows how to use a user-defined join function:

执行程序:


case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Rating] = benv.fromElements(
Rating("moon","youny1",3),Rating("sun","youny2",4),
Rating("cat","youny3",1),Rating("dog","youny4",5))

val weights: DataSet[(String, Double)] = benv.fromElements(
("youny1",4.3),("youny2",7.2),
("youny3",9.0),("youny4",1.5))

val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
  (rating, weight, out: Collector[(String, Double)]) =>
    if (weight._2 > 0.1) out.collect(rating.name, rating.points * weight._2)
}

weightedRatings.collect

join示例四:执行join操作时暗示数据大小

在执行join操作时暗示数据大小,可以帮助flink优化它的执行策略,提高执行效率。

执行程序:

//1.定义DataSet[(Int, String)]
val input1: DataSet[(Int, String)] = 
benv.fromElements((3,"zhangsan"),(2,"lisi"),(4,"wangwu"),(6,"zhaoliu"))

//2.定义 DataSet[(Int, String)]
val input2: DataSet[(Int, String)] = 
benv.fromElements((4000,"zhangsan"),(70000,"lisi"),(4600,"wangwu"),(53000,"zhaoliu"))

// 3.暗示第二个输入很小
val result1 = input1.joinWithTiny(input2).where(1).equalTo(1)
result1.collect

// 4.暗示第二个输入很大
val result2 = input1.joinWithHuge(input2).where(1).equalTo(1)
result2.collect

执行结果:

Scala-Flink> result1.collect
res12: Seq[((Int, String), (Int, String))] = Buffer(
((3,zhangsan),(4000,zhangsan)), ((2,lisi),(70000,lisi)), 
((4,wangwu),(4600,wangwu)), ((6,zhaoliu),(53000,zhaoliu)))


Scala-Flink> result2.collect
res13: Seq[((Int, String), (Int, String))] = Buffer(
((3,zhangsan),(4000,zhangsan)), ((2,lisi),(70000,lisi)), 
((4,wangwu),(4600,wangwu)), ((6,zhaoliu),(53000,zhaoliu)))

web ui中的执行效果:
这里写图片描述

join示例五:执行join操作时暗示数据大小

flink有很多种执行join的策略,你可以指定一个执行策略,以便提高执行效率。

执行程序:

//1.定义两个 DataSet
val input1: DataSet[(Int, String)] = 
benv.fromElements((3,"zhangsan"),(2,"lisi"),(4,"wangwu"),(6,"zhaoliu"))
val input2: DataSet[(Int, String)] = 
benv.fromElements((4000,"zhangsan"),(70000,"lisi"),(4600,"wangwu"),(53000,"zhaoliu"))

//2.暗示input2很小
val result1 = input1.join(input2, JoinHint.BROADCAST_HASH_FIRST).where(1).equalTo(1)

//3.显示结果
result1.collect

执行结果:

res15: Seq[((Int, String), (Int, String))] = Buffer(
((3,zhangsan),(4000,zhangsan)),
((2,lisi),(70000,lisi)), 
((4,wangwu),(4600,wangwu)),
((6,zhaoliu),(53000,zhaoliu)))

暗示项说明:

暗示有如下选项:
1.JoinHint.OPTIMIZER_CHOOSES:
    没有明确暗示,让系统自行选择。
2.JoinHint.BROADCAST_HASH_FIRST
    把第一个输入转化成一个哈希表,并广播出去。适用于第一个输入数据较小的情况。
3.JoinHint.BROADCAST_HASH_SECOND:
    把第二个输入转化成一个哈希表,并广播出去。适用于第二个输入数据较小的情况。
4.JoinHint.REPARTITION_HASH_FIRST:(defalut)
    1.如果输入没有分区,系统将把输入重分区。
    2.系统将把第一个输入转化成一个哈希表广播出去。
    3.两个输入依然比较大。
    4.适用于第一个输入小于第二个输入的情况。
5.JoinHint.REPARTITION_HASH_SECOND:
    1.如果输入没有分区,系统将把输入重分区。
    2.系统将把第二个输入转化成一个哈希表广播出去。
    3.两个输入依然比较大。
    4.适用于第二个输入小于第一个输入的情况。
6.JoinHint.REPARTITION_SORT_MERGE:
    1.如果输入没有分区,系统将把输入重分区。
    2.如果输入没有排序,系统将吧输入重排序。
    3.系统将合并两个排序好的输入。
    4.适用于一个或两个分区已经排序好的情况。
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/liguohuaBigdata/article/details/78547601

智能推荐

【论文笔记 医疗影像分割—nnUNet】nnU-Net: Self-adapting Framework for U-Net-Based Medical Image Segmentation-程序员宅基地

文章浏览阅读6.4k次,点赞3次,收藏41次。文章目录1.Abstract说明:本文是对原版论文和一位大神解读的基础上,加以自己的理解而作,如有错误,欢迎指正。大神的文章链接1.Abstract'对于深度学习模型来说,当用在一个新的问题上,就需要对可变设置人为设定。对新问题的适应包括精确架构、预训练、训练、推理对个自由度,这些选择对整体性能有很大的影响。本文提出nnU-Net(no-new-net)????,是一种基于三个模型:2D U-Net, 3D U-Net 和U-Net Cascade(级联的3D U-net,之后会有介绍)上的自适应框_nnunet

Java 异常的处理_catch中不用system.exit(1)会怎样-程序员宅基地

文章浏览阅读242次。Java 异常的处理Java 应用程序中,对异常的处理有两种方式:处理异常和声明异常。处理异常:try、catch 和 finally若要捕获异常,则必须在代码中添加异常处理器块。这种 Java 结构可能包含 3 个部分,都有 Java 关键字。try 语句块:将一个或者多个语句放入 try 时,则表示这些语句可能抛出异常。编译器知道可能要发生异常,于是用一个特殊结构评估块内所_catch中不用system.exit(1)会怎样

OSPF 多区域配置实验_area0.0.1.0等于多少-程序员宅基地

文章浏览阅读263次。OSPF 多区域配置_area0.0.1.0等于多少

字典树_字典树建树-程序员宅基地

文章浏览阅读271次。原创字典树字典树,又称单词查找树,Trie树,是一种树形结构,哈希表的一个变种。用于统计,排序和保存大量的字符串(也可以保存其的)。优点就是利用公共的前缀来节约存储空间。在这举个简单的例子:比如说我们想储存3个单词,nyist、nyistacm、nyisttc。如果只是单纯的按照以前的字符数组存储的思路来存储的话,那么我们需要定义三个字符串数组。但是_字典树建树

Android framework--谈谈AMS.updateOomAdjLocked-程序员宅基地

文章浏览阅读3.1k次。关于Android系统的内存回收机制,相信大家都不陌生,Android基于各个应用进程承载四大组件的状态对应用进程进行重要性评估,并在系统内存紧张时根据重要性由低到高来选择杀死应用进程,以达到释放内存的目的。重要性评估由AMS执行,具体来说就是AMS.updateOomAdjLocked函数,反过来说,AMS.updateOomAdjLocked的作用就是更新应用进程的重要性。应用进程(Pro..._updateoomadjlocked

计算机基础——操作系统-程序员宅基地

文章浏览阅读8.5k次,点赞28次,收藏38次。本章将会讲解计算机的操作系统。操作系统(Operating System,OS)就好比一个计算机内部的管理者,是管理和控制计算机硬件与软件资源的计算机程序,直接运行在“裸机”上的最基本的系统软件,任何其他应用软件都必须在操作系统的支持下才能运行,操作系统是用户和计算机的接口,同时也是计算机硬件和其他软件的接口。操作系统的功能包括管理计算机系统的硬件,软件及数据资源,控制程序运行,为其他应用软件提供支持等。_操作系统

随便推点

漫威所有电影的 按时间线的观影顺序-程序员宅基地

文章浏览阅读3.1k次。美国队长1 - 2011年惊奇队长 - 2019年钢铁侠1 - 2008年无敌浩克 - 2008年钢铁侠2 - 2010年雷神 - 2011年复仇者联盟 - 2012年雷神2 - 2013年钢铁侠3 - 2013年美国队长2 - 2014年复仇者联盟2 - 2015年银河护卫队 - 2017年蚁人 - 2015年美国队长3 - 2016年奇异博士 - 2016年银河护卫队2 - 2017..._漫威电影观看顺序时间线

PhotoZoom Classic 7中的新功能-程序员宅基地

文章浏览阅读142次。众所周知PhotoZoom Classic是家庭使用理想的放大图像软件。目前很多用户还在使用PhotoZoom Classic 6,对于PhotoZoom Classic 7还是有点陌生。其实在6代衍生下出了7代,7代比6代多了很多适用的功能。下面我们就介绍一下PhotoZoom Classic 7中的新功能。PhotoZoom Classic 6的功能我们就不过多介绍,主要介绍7代中特有的功..._photozoon的作用

tensorflow中tf.keras.models.Sequential()用法-程序员宅基地

文章浏览阅读4.6w次,点赞75次,收藏349次。tensorflow中tf.keras.models.Sequential()用法Sequential()方法是一个容器,描述了神经网络的网络结构,在Sequential()的输入参数中描述从输入层到输出层的网络结构model = tf.keras.models.Sequential([网络结构]) #描述各层网络网络结构举例:拉直层:tf.keras.layers.Flatten() #拉直层可以变换张量的尺寸,把输入特征拉直为一维数组,是不含计算参数的层全连接层:tf.ker._tf.keras.models.sequential

Java递归实现Fibonacci数列计算_用递归方法编程计算fibonacci数列:(n=10),fac.jpg-程序员宅基地

文章浏览阅读2.8k次。实现代码如下:public static int factorial(int n){ if (n <= 1){ return 1; } return factorial(n-1) + factorial(n-2); }测试代码如下:System.out.println(factorial(40));测..._用递归方法编程计算fibonacci数列:(n=10),fac.jpg

scratch班级名称 电子学会图形化编程scratch等级考试四级真题和答案解析B卷2020-9-程序员宅基地

文章浏览阅读1.3k次。scratch班级名称一、题目要求1、准备工作 保留小猫角色,白色背景 2、功能实现 点击绿旗后,询问请输入年级数,等待输入年级数 询问请输入班级数,等待输入班级数 定义列表“全校班级”,假设每个班级的班级数相同,所有班级名称自动生成并保存到全校班级中。 例如,输入年级数为5,输入班级数为8,可以看到舞台上列表全校班级的内容为:1(1)班、1(2)班、...5(7)班、5(8)班 二、案例分析1、角色分析角色:小猫2、背景_scratch班级名称

郁金香2021年游戏辅助技术中级班(七)_squad辅助科技-程序员宅基地

文章浏览阅读379次。郁金香2021年游戏辅助技术中级班(七)058-C,C++写代码HOOK分析封包数据格式A059-C,C++写代码HOOK分析封包数据格式B-detours劫持060-C,C++写代码HOOK分析封包数据格式C-过滤和格式化061-C,C++写代码HOOK分析封包数据格式D-写入配置文件062-C,C++写代码HOOK分析封包数据格式D-读取配置文件058-C,C++写代码HOOK分析封包数据格式A_squad辅助科技

推荐文章

热门文章

相关标签