Flink 自定义source 写入 Kafka_flinksql org.apache.kafka.connect.source.sourcerec-程序员宅基地

技术标签: flink  java  kafka  Flink  

添加依赖

<dependency>
	<groupId>org.apache.flink</groupId>
	<artifactId>flink-connector-kafka_2.12</artifactId>
	<version>1.13.2</version>
	<scope>provided</scope>
</dependency>

基于 Flink 服务提交任务并执行时需要的依赖包

基于 flink 服务器提交任务前,先上传依赖包到 flink 的 lib 目录下;然后重启 flink 服务,使 jar 进行加载;否则会出现 ClassNoFoundException 的异常。

  • flink-connector-kafka_2.12-1.13.2.jar
  • kafka-clients-2.4.1.jar

启动前注意

确保 topic 在 kafka 中是真实存在的,否则将会产生如下的执行异常:

  • 运行逻辑:先获取kafka中全部的topic list,再进行正则匹配,得到指定的topic list 调试发现,获取kafka全部topic list返回null。然后产生下述异常,此时创建对应的 topic,等待下次任务重启后将可正常运行。
java.lang.RuntimeException: Unable to retrieve any partitions with KafkaTopicsDescriptor: Topic Regex Pattern (WYSXT_47_(.+)_47_other_47_property_47_post)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:156)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:577)
	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)

构建KafkaSource参数实例

/**
 * kafka source 参数实例
 * @author yinlilan
 *
 */
public class KafkaSource implements Serializable {
    

	private static final long serialVersionUID = 6060562931782343343L;

	private String bootStrapServers;
	
	private String groupId;
	
	private String topic;
	
	public String getBootStrapServers() {
    
		return bootStrapServers;
	}

	public String getGroupId() {
    
		return groupId;
	}
	
	public String getTopic() {
    
		return topic;
	}

	public KafkaSource(Object obj) {
    
		final JSONObject json = JSONObject.parseObject(obj.toString());
		this.bootStrapServers = json.getString("bootStrapServers");
		this.groupId = json.getString("groupId");
		this.topic = json.getString("topic");
	}
	
}

构建自定义KafkaMQSource

基于FlinkKafkaConsumer< T > 类实现KafkaSource,其中KafkaDeserializationSchema< T >类型是用于数据反序列化的,可以将数据组装成你想要的方式然后传递出去。

import java.io.Serializable;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerRecord;

/**
 * kafka source初始化
 * @author yinlilan
 *
 */
public class KafkaMessageSource implements Serializable {
    
	
	private static final long serialVersionUID = -1128615689349479275L;
	
	private FlinkKafkaConsumer<Map<String, String>> consumer;
	
	public KafkaMessageSource(final String bootStrapServers, final String groupId, final String topic){
    
    	Properties properties = new Properties();
    	properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
    	// Flink Kafka Consumer 支持发现动态创建的 Kafka 分区,并使用精准一次的语义保证去消耗它们
    	properties.setProperty("flink.partition-discovery.interval-millis", "10000");//
    	properties.setProperty("group.id", groupId);
    	
    	
		//TODO: 自定义反序列化
		final KafkaDeserializationSchema<Map<String, String>> deserializer = new KafkaDeserializationSchema<Map<String, String>>(){
    
			
			private static final long serialVersionUID = 1574406844851249992L;
			
			private String encoding = "UTF-8";
    		
			@Override
			public TypeInformation<Map<String, String>> getProducedType() {
    
				return TypeInformation.of(new TypeHint<Map<String, String>>(){
    });
			}

			@Override
			public boolean isEndOfStream(Map<String, String> nextElement) {
    
				return false;
			}

			@Override
			public Map<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
    
				final Map<String, String> result = new ConcurrentHashMap<>();
				result.put("topic", record.topic());
				result.put("value", new String(record.value(), encoding));
				return result;
			}
    	};
    			
    	// 构建source
    	Pattern pattern = Pattern.compile(topic);
    	consumer = new FlinkKafkaConsumer<>(pattern, deserializer, properties);
	}

	public FlinkKafkaConsumer<Map<String, String>> getConsumer() {
    
		return consumer;
	}
}
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/zx711166/article/details/122231934

智能推荐

springmvc/springboot关于 PropertyEditor的一个坑-程序员宅基地

定义一个名为Problem实体类,如下:public class Problem implements java.io.Serializable { private static final long serialVersionUID = 5454155825314635342L; /** * id */ private java.lang.String id; pr...

android复杂列表滑动卡顿,Android 列表滑动性能优化总结-程序员宅基地

列表滑动性能优化是一个老生常谈的问题,最近在做项目的时候又遇到了列表滑动卡顿的问题,我在经过多次思考和尝试后,终于找到了滑动卡顿的元凶,于是将经验总结下来。ViewHolder先说说最常规的ViewHolder。ViewHolder的出现是为了解决在绑定视图数据的时候使用findViewById遍历视图树(以深度优先的方式)查找视图引起耗时操作的问题,将第一次查找到的视图放入静态的ViewHold..._android recycleview调用notifydatasetchanged的时候,滑动列表会卡

SQL Server索引语法_sql server 索引语法-程序员宅基地

转载自: https://www.cnblogs.com/kissdodog/archive/2013/06/12/3133345.html从CREATE开始  通过显式的CREATE INDEX命令  在创建约束时作为隐含的对象  随约束创建的隐含索引  当向表中添加如下两种约束之一时,就会创建隐含索引。  主键约束(聚集索引)  唯一约束(唯一索引)一、CR_sql server 索引语法

HttpClient实现HTTP文件通用下载类_httpclient 下载文件类型-程序员宅基地

import java.io.File; import java.io.FileOutputStream; import java.io.InputStream; import org.apache.http.Header; import org.apache.http.HeaderElement; import org.apache.http.HttpEntity; ..._httpclient 下载文件类型

C++:纯虚函数和抽象类 | 虚函数和纯虚函数区别_抽象类与纯虚函数有什么特点?__索伦的博客-程序员宅基地

纯虚函数和抽象类虚函数和纯虚函数区别_抽象类与纯虚函数有什么特点?

webdriver已获取得到一个元素,怎么得到该元素下的所有子节点和父节点_根据.webdriver.findelements查找子元素-程序员宅基地

webdriver已获取得到一个元素,怎么得到该元素下的所有子节点和父节点parent = current.findElement(By.xpath("./.."));// 找到父元素children = current.findElements(By.xpath("./*"));// 找到所有子元素_根据.webdriver.findelements查找子元素

随便推点

喜报!爱博精电再次荣获建筑电气品牌竞争力十强企业!_Accuenergy的博客-程序员宅基地

2023年4月21日,“第11届建筑电气技术交流大会暨第二届双碳战略下的建筑电气技术发展论坛”在长沙西雅温德姆酒店成功举办。此次会议旨在共同探讨双碳目标下,如何推进建筑电气行业的绿色健康和可持续发展。当前,我国正处于应对气候变化、实现“双碳”目标的关键时期。建筑行业是能源消耗和碳排放的重要领域,其中建筑电气作为关键环节,近年来也在积极探索绿色健康和可持续发展的新路径。作为一家致力于智能配电和能源管理领域的企业,爱博精电积极推动能源管理的创新和应用,以提高能源利用效率和减少碳排放。我们的产品通

高精度IP地址定位接口的使用场景_高精准ip位置定位_tianyanshuju的博客-程序员宅基地

说到高精度IP地址定位,其实一直不太为大家所知,一般也是通过影视作品了解到,原来可以通过IP地址对犯罪分子进行一定程度的精确定位,日常使用最多的还是基于城市定位的基础IP定位功能,这也使得一些企业机构忽视了IP地址精确定位技术对改善业务模式的价值,错失了一些发展良机,下面介绍下ip地址精确定位技术的常见使用场景。1、基于位置的精准营销由于固定网络市场的保有量巨大,网络用户通过固定网络接入互联网或者使用移动终端在连接WIFI的情况下,高精度IP地址定位都可以定位到互联网用户所在位置,网络平台可以应用位_高精准ip位置定位

mysql command为sleep时项目可以连接_mysql数据库常连接造成大量sleep状态怎么办-程序员宅基地

设置max_execution_time 来阻止太长的读SQL。那可能存在的问题是会把所有长SQL都给KILL 掉。有些必须要执行很长时间的也会被误杀。自己写个脚本检测这类语句,比如order by rand(), 超过一定时间用Kill query thread_id 给杀掉。那能不能不要杀掉而让他正常运行,但是又不影响其他的请求呢?那mysql 8.0 引入的资源组(resource grou..._mysql command sleep

python画折线图代码-python画折线示意图实例代码-程序员宅基地

python画折线图方法前做PPT要用到折线图,嫌弃EXCEL自带的看上去不好看,就用python写了一个画折线图的程序。import matplotlib.pyplot as pltx=[1,2,3,4,5,6]y1=[35000,85000,120000]y2=[45000,85000,100000]y3=[25000,65000,90000]point1=180180point2=20000..._python画折线图的代码

BRCGS认证|消费品BRCGS审核七大内容与要求_CALL13392150338的博客-程序员宅基地

概述2016年发布的《消费品标准》第4期已全面修订为:认识到消费品中包含的产品的多样性。将标准与说明符和品牌所有者的实际购买预期保持一致,同时鼓励持续改进。简化需求和术语,使其更容易理解。扩大范围,包括批发销售的产品和零部件/原材料。确保需求基于风险。引入初级基础水平和更高水平,以鼓励持续改进。为了实现这些目标,我们将消费品分为两个独立的标准:消费品全球标准-百货消费品全球标准-个人护理和家庭每项标准都可以在基础水平或更高水平上获得认证。这使得灵活性能够根据产品部门和_brcgs

ICE常见编译和运行(异常)错误_ice.connectionlostexception-程序员宅基地

在编译和Ice应用相关的文件中,经常因为ice相关的文件包含关系而导致编译无法通过,此时的错误一般提示和handle.h相关。然而想要解决这样的错误,一般只需要把在无法编译成.o文件的.cpp文件中和ice文件相关的头文件放在该.cpp文件的起始行即可。也就是说,根据提示,把.c_ice.connectionlostexception

推荐文章

热门文章

相关标签