Spark—结构化流Structured Streaming编程指南-Streaming Query_spark structured streaming foreachbatch-程序员宅基地

技术标签: spark  big data  大数据  

定义了最终结果DataFrame/Dataset之后,剩下的就是开始流计算了,为此,必须使用Dataset.writeStream()方法返回的DataStreamWriter。而且必须在这个接口中指定一个或多个以下内容:

1.输出接收器的详细信息:数据格式、位置等。

2.输出模式:指定写入输出接收器的内容。

3.查询名称:可选,为标识指定查询的唯一名称。

4.触发间隔:可选,指定触发间隔。如果没有指定,系统将在之前的处理完成后立即检查新数据的可用性。如果由于之前的处理没有完成而错过了触发时间,那么系统将立即触发处理。

5.检查点位置:对于一些可以保证端到端容错的输出接收器,指定系统将写入所有检查点信息的位置。这应该是一个hdfs兼容的容错文件系统中的目录。

输出模式
输出模式有以下几种类型:

1.Append模式(默认)——这是默认模式,在这种模式下,只有在最后一个触发器之后添加到结果表中的新行才会输出到接收器。只适用于那些添加到结果表中的行永远不会更改的查询。因此,这种模式保证每行只输出一次。例如,只有select、where、map、flatMap、filter、join等的查询将支持Append模式。

2.Complete模式——每次触发器执行后都将整个结果表输出到接收器后。

3.Update模式——(从Spark 2.1.1开始可用)只有在最后一个触发器之后更新到结果表中的行才会输出到接收器。

不同类型的流查询支持不同的输出模式。下面是适配的矩阵表。

输出接收器

下面是几种类型的内置输出接收器:

1.文件接收器-----将输出存储到目录

df.writeStream
    .format("parquet")  // 可以是"orc", "json", "csv"等等格式
    .option("path", "path/to/destination/dir")
    .start()

2.Kafka 接收器——将输出发送到Kafka

writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("topic", "t_user_action")
    .start()

3.Foreach接收器——在输出中的记录上运行任意计算。有关更多细节,请参见后面。 

writeStream
    .foreach(...)
    .start()

4.控制台接收器(console)——每次有触发器时将输出输出到console/stdout。这两种模式都支持Append和Complete输出模式。这应该用于在低数据量上进行调试,因为在每个触发器之后,将收集整个输出并存储在Driver程序的内存中

writeStream
    .format("console")
    .start()

5.内存接收器(memory)——输出作为内存中的表存储在内存中。这两种模式都支持Append和Complete输出模式。整个输出被收集并存储在Driver程序的内存中,这应该用于在低数据量上进行调试。因此,请谨慎使用。 

writeStream
    .format("memory")
    .queryName("tableName")
    .start()

有些接收器不能容错,因为它们不能保证输出的持久性,并且只用于调试目的。请参阅前面博客关于容错语义的部分。以下是Spark中所有接收器的详细信息。

 需要注意的是,必须手动调用start()方法来开始查询的执行,start()会返回一个StreamingQuery对象,这个对象是连续运行执行的句柄。我们可以使用它来管理查询,下面会详细介绍。现在看几个例子。

// ========== 没有聚合操作的DF ==========
val noAggDF = deviceDataDf.select("device").where("signal > 10")   
 
// 打印新数据到console
noAggDF
  .writeStream
  .format("console")
  .start()
 
// 打印新数据到Parquet文件
noAggDF
  .writeStream
  .format("parquet")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .option("path", "path/to/destination/dir")
  .start()
 
// ========== 有聚合操作的DF ==========
val aggDF = df.groupBy("device").count()
 
// 将更新后的聚合打印到console
aggDF
  .writeStream
  .format("console")
  .outputMode("complete") 
  .start()
 
// 将所有聚合保存在内存的表中
aggDF
  .writeStream
  .format("memory")
  .outputMode("complete")
  .queryName("aggregates")    // queryName的值就是Table的名称
  .start()
 
spark.sql("select * from aggregates").show()   // 交互式地查询保存在内存中的表

使用Foreach和ForeachBatch

foreach和foreachBatch操作允许我们对流查询的输出应用任意操作和编写逻辑,他们的用法稍微有些不一样的地方——虽然foreach允许在每一行上定制写逻辑,但是foreachBatch允许在每个微批处理的输出上应用任意操作和定制逻辑

ForeachBatch

foreachBatch(…)允许指定在流查询的每个微批处理的输出数据上执行的函数。从Spark 2.4开始支持这个特性。它接受两个参数:具有微批处理的输出数据的DataFrame或Dataset,以及微批处理的唯一ID。

streamingDF.writeStream
    .foreachBatch((batchDF: DataFrame, batchId: Long) => {
     // Transform and write batchDF 
    }
).start()

使用foreach批处理,可以执行以下操作。

1.重复用现有批处理数据源——对于许多存储系统,可能还没有现成可用的流接收器,但可能已经存在用于批处理查询的数据写入器。使用foreachBatch,可以在每个微批处理的输出上使用批处理数据编写器。

2.写入多个位置——如果想要将流查询的输出写入多个位置,那么只需多次写入输出DataFrame/Dataset即可。但是,每次写入尝试都可能导致重新计算输出数据(包括可能重新读取输入数据)。为了避免重新计算,我们应该缓存输出DataFrame/Dataset,将其写入多个位置,然后取消缓存。

streamingDF.writeStream
    .foreachBatch ((batchDF: DataFrame, batchId: Long) => {
        batchDF.persist() 
        batchDF.write.format(…).save(…) // 位置1 
        batchDF.write.format(…).save(…) // 位置2 
        batchDF.unpersist()
    }
)

3.应用额外的DataFrame操作——流Dataframe中不支持许多DataFrame和Dataset操作,因为Spark不支持在这些情况下生成增量数据。使用foreachBatch,可以对每个微批处理输出应用非streaming的dataframe的一些算子操作。但是,必须自己考虑执行该操作的端到端语义。

注意事项

默认情况下,foreachBatch只提供至少一次的写保证。但是,我们可以使用提供给该函数的batchId来消除重复的输出,并获得一次准确的保证。

foreachBatch不支持连续处理模式,因为它基本上依赖于流查询的微批处理执行。如果要使用连续模式编写数据,就使用foreach。

Foreach

如果foreachBatch不是一个好的选择(例如,对应的批处理数据写入器不存在,或者是连续处理模式),那么还可以使用foreach来表示定制的写入器逻辑。具体地说,可以将数据写入逻辑分为三种方法来表示:open、process和close。从Spark 2.4开始,foreach可以使用。

在scala中必须继承类ForeachWriter :

streamingDatasetOfString.writeStream
    .foreach( new ForeachWriter[String] {
 
        def open(partitionId: Long, version: Long): Boolean = {
          // 建立连接
        }
 
        def process(record: String): Unit = {
          // 往连接中写入数据
        }
 
        def close(errorOrNull: Throwable): Unit = {
          // 关闭连接
        }
  }
).start()

在启动流查询时,Spark以以下方式调用函数或对象的方法:

1.对象的单一副本负责查询中单个任务生成的所有数据。换句话说,一个实例负责处理以分布式方式生成的数据的一个分区。

2.对象必须是可序列化的,因为每个任务将获得所提供对象的一个新的序列化反序列化副本。因此,强烈建议对写入数据进行初始化(例如打开连接或启动事务)是在调用open()方法之后完成的,这意味着任务已经准备好生成数据。

3.如果存在open()方法并且在调用后成功返回(不管返回值如何),那么一定要调用close()方法(如果它存在),除非JVM或Python进程中途崩溃。

4.对应的生命周期:

     a)方法open(partitionId, epochId)被调用。

     b)如果open(…)返回true,那么对于分区和批处理/epoch中的每一行,将调用方法process(row)。

     c)方法close(error)在处理行时被调用,如果出现错误。

注意:open()方法中的partitionId和epochId可用于在故障导致某些输入数据的重新处理时消除生成的数据的重复。这取决于查询的执行模式。如果流查询是以微批处理模式执行的,那么由唯一元组(partition_id, epoch_id)表示的每个分区都保证具有相同的数据。因此,(partition_id, epoch_id)可用于取消重复和/或事务提交数据,并实现一次准确的保证。但是,如果流查询是在连续模式下执行的,那么这一保证不成立,因此不应该用于重复数据删除。

Streaming Queries触发器
流查询的触发器设置定义了流数据处理的时间,该查询是作为具有固定批处理间隔的微批处理查询执行,还是作为连续处理查询执行。下面是支持的不同类型的触发器:

请看代码示例:

import org.apache.spark.sql.streaming.Trigger
 
// 默认触发器 (一次微批处理完成后立即执行下一次)
df.writeStream
  .format("console")
  .start()
 
// 具有2秒微批处理间隔的ProcessingTime触发器
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()
 
// 只执行一次
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()
 
// 具有一秒检查点间隔的连续触发器
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()

管理流查询(Managing Streaming Queries)

启动查询时创建的StreamingQuery对象可用于监视和管理查询。下面列出常用的一些管理方法:

val query = df.writeStream.format("console").start()   // 获得StreamingQuery对象
 
query.id          // 获取正在运行的查询的唯一标识符,该标识符在从检查点获取数据重新启动时保持不变
 
query.runId       // 获取查询此次运行的唯一id,该id将在每次启动/重新启动时变更
 
query.name        // 获取自动生成的名称或用户指定的名称
 
query.explain()   // 打印查询的详细说明
 
query.stop()      // 停止查询
 
query.awaitTermination()   // 阻塞查询,直到使用stop()或错误来终止查询
 
query.exception       // 如果查询已被错误终止,则获取异常信息
 
query.recentProgress  // 流查询的最近更新的数组
 
query.lastProgress    // 流查询的最近一次更新

我们可以在一个SparkSession中启动任意数量的查询。它们将同时运行,共享集群资源。还可以使用sparkSession.streams()来获得StreamingQueryManager,它可以用来管理当前运行中的查询。

val spark: SparkSession = ...
 
spark.streams.active    // 取当前运行中的流查询的列表
 
spark.streams.get(id)   // 通过流查询惟一id获取流查询对象
 
spark.streams.awaitAnyTermination()   // 阻塞,直到其中任何一个终止

监控流查询(Monitoring Streaming Queries)
有多种方法可以监视运行中的流查询。我们可以使用Spark的Dropwizard指标将指标推送到外部系统,也可以通过编程访问它们。

1.系统指标的直接获取
可以使用streamingQuery.lastProgress()和streamingQuery.status()直接获取运行中查询的当前状态和指标。lastProgress()返回一个StreamingQueryProgress对象,它包含关于流的最后一个触发器所进行的更新的所有信息——处理了哪些数据、处理速率、延迟等等。还有streamingQuery.recentProgress,它返回最近几次更新的数组。

此外,streamingQuery.status()返回一个StreamingQueryStatus对象,它提供了关于查询正在执行的操作的信息——是活动的触发器还是正在处理的数据,等等。
 

val query: StreamingQuery = ...
 
println(query.lastProgress)
 
/* 打印出的数据格式如下
 
{
  "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
  "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
  "name" : "MyQuery",
  "timestamp" : "2016-12-14T18:45:24.873Z",
  "numInputRows" : 10,
  "inputRowsPerSecond" : 120.0,
  "processedRowsPerSecond" : 200.0,
  "durationMs" : {
    "triggerExecution" : 3,
    "getOffset" : 2
  },
  "eventTime" : {
    "watermark" : "2016-12-14T18:45:24.873Z"
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-0]]",
    "startOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 1,
        "1" : 1,
        "3" : 1,
        "0" : 1
      }
    },
    "endOffset" : {
      "topic-0" : {
        "2" : 0,
        "4" : 115,
        "1" : 134,
        "3" : 21,
        "0" : 534
      }
    },
    "numInputRows" : 10,
    "inputRowsPerSecond" : 120.0,
    "processedRowsPerSecond" : 200.0
  } ],
  "sink" : {
    "description" : "MemorySink"
  }
}
*/
 
 
println(query.status)
 
/*  打印出的数据格式如下
{
  "message" : "Waiting for data to arrive",
  "isDataAvailable" : false,
  "isTriggerActive" : false
}
*/

2.使用异步api以编程方式报告系统指标
    我们还可以通过给SparkSession设置一个StreamingQueryListener监听器来异步的监听所有的查询,这样在启动和停止查询以及运行中的查询中进行更新时,都会回调监听器的方法,使用方法是自定义一个StreamingQueryListener监听器并通过sparkSession.streams.attachListenter()方法来注册监听器。代码示例:
 

val spark: SparkSession = ...
 
spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
    }
    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        println("Query terminated: " + queryTerminated.id)
    }
    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        println("Query made progress: " + queryProgress.progress)
    }
})

3.使用Dropwizard报告系统指标

Spark支持使用Dropwizard库报告系统指标。要同时报告结构化流查询的指标,必须启用配置spark.sql.streaming.metricsEnabled

spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
// 或者
spark.sql("SET spark.sql.streaming.metricsEnabled=true")

在启用此配置之后,在SparkSession中启动的所有查询都将通过Dropwizard向已配置的任何接收器(例如Ganglia, Graphite, JMX等)报告指标。 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/lehsyh/article/details/124661411

智能推荐

什么是内部类?成员内部类、静态内部类、局部内部类和匿名内部类的区别及作用?_成员内部类和局部内部类的区别-程序员宅基地

文章浏览阅读3.4k次,点赞8次,收藏42次。一、什么是内部类?or 内部类的概念内部类是定义在另一个类中的类;下面类TestB是类TestA的内部类。即内部类对象引用了实例化该内部对象的外围类对象。public class TestA{ class TestB {}}二、 为什么需要内部类?or 内部类有什么作用?1、 内部类方法可以访问该类定义所在的作用域中的数据,包括私有数据。2、内部类可以对同一个包中的其他类隐藏起来。3、 当想要定义一个回调函数且不想编写大量代码时,使用匿名内部类比较便捷。三、 内部类的分类成员内部_成员内部类和局部内部类的区别

分布式系统_分布式系统运维工具-程序员宅基地

文章浏览阅读118次。分布式系统要求拆分分布式思想的实质搭配要求分布式系统要求按照某些特定的规则将项目进行拆分。如果将一个项目的所有模板功能都写到一起,当某个模块出现问题时将直接导致整个服务器出现问题。拆分按照业务拆分为不同的服务器,有效的降低系统架构的耦合性在业务拆分的基础上可按照代码层级进行拆分(view、controller、service、pojo)分布式思想的实质分布式思想的实质是为了系统的..._分布式系统运维工具

用Exce分析l数据极简入门_exce l趋势分析数据量-程序员宅基地

文章浏览阅读174次。1.数据源准备2.数据处理step1:数据表处理应用函数:①VLOOKUP函数; ② CONCATENATE函数终表:step2:数据透视表统计分析(1) 透视表汇总不同渠道用户数, 金额(2)透视表汇总不同日期购买用户数,金额(3)透视表汇总不同用户购买订单数,金额step3:讲第二步结果可视化, 比如, 柱形图(1)不同渠道用户数, 金额(2)不同日期..._exce l趋势分析数据量

宁盾堡垒机双因素认证方案_horizon宁盾双因素配置-程序员宅基地

文章浏览阅读3.3k次。堡垒机可以为企业实现服务器、网络设备、数据库、安全设备等的集中管控和安全可靠运行,帮助IT运维人员提高工作效率。通俗来说,就是用来控制哪些人可以登录哪些资产(事先防范和事中控制),以及录像记录登录资产后做了什么事情(事后溯源)。由于堡垒机内部保存着企业所有的设备资产和权限关系,是企业内部信息安全的重要一环。但目前出现的以下问题产生了很大安全隐患:密码设置过于简单,容易被暴力破解;为方便记忆,设置统一的密码,一旦单点被破,极易引发全面危机。在单一的静态密码验证机制下,登录密码是堡垒机安全的唯一_horizon宁盾双因素配置

谷歌浏览器安装(Win、Linux、离线安装)_chrome linux debian离线安装依赖-程序员宅基地

文章浏览阅读7.7k次,点赞4次,收藏16次。Chrome作为一款挺不错的浏览器,其有着诸多的优良特性,并且支持跨平台。其支持(Windows、Linux、Mac OS X、BSD、Android),在绝大多数情况下,其的安装都很简单,但有时会由于网络原因,无法安装,所以在这里总结下Chrome的安装。Windows下的安装:在线安装:离线安装:Linux下的安装:在线安装:离线安装:..._chrome linux debian离线安装依赖

烤仔TVの尚书房 | 逃离北上广?不如押宝越南“北上广”-程序员宅基地

文章浏览阅读153次。中国发达城市榜单每天都在刷新,但无非是北上广轮流坐庄。北京拥有最顶尖的文化资源,上海是“摩登”的国际化大都市,广州是活力四射的千年商都。GDP和发展潜力是衡量城市的数字指...

随便推点

java spark的使用和配置_使用java调用spark注册进去的程序-程序员宅基地

文章浏览阅读3.3k次。前言spark在java使用比较少,多是scala的用法,我这里介绍一下我在项目中使用的代码配置详细算法的使用请点击我主页列表查看版本jar版本说明spark3.0.1scala2.12这个版本注意和spark版本对应,只是为了引jar包springboot版本2.3.2.RELEASEmaven<!-- spark --> <dependency> <gro_使用java调用spark注册进去的程序

汽车零部件开发工具巨头V公司全套bootloader中UDS协议栈源代码,自己完成底层外设驱动开发后,集成即可使用_uds协议栈 源代码-程序员宅基地

文章浏览阅读4.8k次。汽车零部件开发工具巨头V公司全套bootloader中UDS协议栈源代码,自己完成底层外设驱动开发后,集成即可使用,代码精简高效,大厂出品有量产保证。:139800617636213023darcy169_uds协议栈 源代码

AUTOSAR基础篇之OS(下)_autosar 定义了 5 种多核支持类型-程序员宅基地

文章浏览阅读4.6k次,点赞20次,收藏148次。AUTOSAR基础篇之OS(下)前言首先,请问大家几个小小的问题,你清楚:你知道多核OS在什么场景下使用吗?多核系统OS又是如何协同启动或者关闭的呢?AUTOSAR OS存在哪些功能安全等方面的要求呢?多核OS之间的启动关闭与单核相比又存在哪些异同呢?。。。。。。今天,我们来一起探索并回答这些问题。为了便于大家理解,以下是本文的主题大纲:[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JCXrdI0k-1636287756923)(https://gite_autosar 定义了 5 种多核支持类型

VS报错无法打开自己写的头文件_vs2013打不开自己定义的头文件-程序员宅基地

文章浏览阅读2.2k次,点赞6次,收藏14次。原因:自己写的头文件没有被加入到方案的包含目录中去,无法被检索到,也就无法打开。将自己写的头文件都放入header files。然后在VS界面上,右键方案名,点击属性。将自己头文件夹的目录添加进去。_vs2013打不开自己定义的头文件

【Redis】Redis基础命令集详解_redis命令-程序员宅基地

文章浏览阅读3.3w次,点赞80次,收藏342次。此时,可以将系统中所有用户的 Session 数据全部保存到 Redis 中,用户在提交新的请求后,系统先从Redis 中查找相应的Session 数据,如果存在,则再进行相关操作,否则跳转到登录页面。此时,可以将系统中所有用户的 Session 数据全部保存到 Redis 中,用户在提交新的请求后,系统先从Redis 中查找相应的Session 数据,如果存在,则再进行相关操作,否则跳转到登录页面。当数据量很大时,count 的数量的指定可能会不起作用,Redis 会自动调整每次的遍历数目。_redis命令

URP渲染管线简介-程序员宅基地

文章浏览阅读449次,点赞3次,收藏3次。URP的设计目标是在保持高性能的同时,提供更多的渲染功能和自定义选项。与普通项目相比,会多出Presets文件夹,里面包含着一些设置,包括本色,声音,法线,贴图等设置。全局只有主光源和附加光源,主光源只支持平行光,附加光源数量有限制,主光源和附加光源在一次Pass中可以一起着色。URP:全局只有主光源和附加光源,主光源只支持平行光,附加光源数量有限制,一次Pass可以计算多个光源。可编程渲染管线:渲染策略是可以供程序员定制的,可以定制的有:光照计算和光源,深度测试,摄像机光照烘焙,后期处理策略等等。_urp渲染管线

推荐文章

热门文章

相关标签