CanalSharp是阿里巴巴开源项目mysql数据库binlog的增量订阅&消费组件 Canal 的.NET客户端,关于什么是 Canal?又能做什么?我会在后文为大家一一介绍。CanalSharp 这个项目,是由我和 WithLin (主要贡献) 完成,并将一直进行维护的Canal的.NET客户端项目。目前开源在github:https://github.com/CanalSharp/CanalSharp/ 希望大家多多支持,旨在为.NET开发者提供一个友好的对接Canal的选择,为.NET社区生态做贡献。
早期,阿里巴巴B2B公司因为存在杭州和美国双机房部署,存在跨机房同步的业务需求。不过早期的数据库同步业务,主要是基于trigger的方式获取增量变更,不过从2010年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务,从此开启了一段新纪元。
基于日志增量订阅&消费支持的业务:
从上层来看,复制分成三步:
原理相对比较简单:
以上内容摘自Canal项目官方资料 https://github.com/alibaba/canal
Canal的安装以及使用请查阅官方文档,本文不在赘述。 https://github.com/alibaba/canal/wiki
CanalSharp 是 Canal 的 .NET 客户端,它与 Canal 是采用的Socket来进行通信的,传输协议是TCP,交互协议采用的是 Google Protocol Buffer 3.0。
1.Canal连接到mysql数据库,模拟slave
2.CanalSharp与Canal建立连接
2.数据库发生变更写入到binlog
5.Canal向数据库发送dump请求,获取binlog并解析
4.CanalSharp向Canal请求数据库变更
4.Canal发送解析后的数据给CanalSharp
5.CanalSharp收到数据,消费成功,发送回执。(可选)
6.Canal记录消费位置。
以一张图来表示:
CanalSharp作为Canal的客户端,其应用场景就是Canal的应用场景。关于应用场景在Canal介绍一节已有概述。这里我举一些实际的使用例子:
1.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。
2.根据数据库的变更实时更新搜索引擎,比如电商场景下商品信息发生变更,实时同步到商品搜索引擎 Elasticsearch、solr等
3.根据数据库的变更实时更新缓存,比如电商场景下商品价格、库存发生变更实时同步到redis
4.数据库异地备份、数据同步
5.根据数据库变更触发某种业务,比如电商场景下,创建订单超过xx时间未支付被自动取消,我们获取到这条订单数据的状态变更即可向用户推送消息。
6.将数据库变更整理成自己的数据格式发送到kafka等消息队列,供消息队列的消费者进行消费。
使用 CanalSharp 之前,必然要先准备好mysql数据库以及Canal才行,这个步骤请直接查阅Canal官方文档 https://github.com/alibaba/canal/wiki 。但是为了让大家能快速跑通CanalSharp,CanalSharp 项目为大家提供了一个通过 docker-compose 同时运行 mysql和canal。
git clone https://github.com/CanalSharp/CanalSharp.git
cd docker
docker-compose up -d
出现下图表示运行成功:
ip:运行docker的服务器ip
mysql用户:root
mysql密码:000000
mysql端口:4406
默认提供了一个test数据库,然后有一张名为test的表。
Install-Package CanalSharp.Client
也可以直接下载源码运行 Sample 项目 https://github.com/CanalSharp/CanalSharp/tree/master/sample/CanalSharp.SimpleClient
(1)建立连接
//canal 配置的 destination,默认为 example
var destination = "example";
//创建一个简单CanalClient连接对象(此对象不支持集群)传入参数分别为 canal地址、端口、destination、用户名、密码
var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", "");
//连接 Canal
connector.Connect();
//订阅,同时传入Filter,如果不传则以Canal的Filter为准。Filter是一种过滤规则,通过该规则的表数据变更才会传递过来
connector.Subscribe(".*\\\\..*");
(2)获取数据
while (true)
{
//获取数据 1024表示数据大小 单位为字节
var message = connector.Get(1024);
//批次id 可用于回滚
var batchId = message.Id;
if (batchId == -1 || message.Entries.Count <= 0)
{
Thread.Sleep(300);
continue;
}
PrintEntry(message.Entries);
}
(3)输出数据
/// <summary>
/// 输出数据
/// </summary>
/// <param name="entrys">一个entry表示一个数据库变更</param>
private static void PrintEntry(List<Entry> entrys)
{
foreach (var entry in entrys)
{
if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
{
continue;
}
RowChange rowChange = null;
try
{
//获取行变更
rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
}
catch (Exception e)
{
}
if (rowChange != null)
{
//变更类型 insert/update/delete 等等
EventType eventType = rowChange.EventType;
//输出binlog信息 表名 数据库名 变更类型
Console.WriteLine(
$"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");
//输出 insert/update/delete 变更类型列数据
foreach (var rowData in rowChange.RowDatas)
{
if (eventType == EventType.Delete)
{
PrintColumn(rowData.BeforeColumns.ToList());
}
else if (eventType == EventType.Insert)
{
PrintColumn(rowData.AfterColumns.ToList());
}
else
{
Console.WriteLine("-------> before");
PrintColumn(rowData.BeforeColumns.ToList());
Console.WriteLine("-------> after");
PrintColumn(rowData.AfterColumns.ToList());
}
}
}
}
}
/// <summary>
/// 输出每个列的详细数据
/// </summary>
/// <param name="columns"></param>
private static void PrintColumn(List<Column> columns)
{
foreach (var column in columns)
{
//输出列明 列值 是否变更
Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}");
}
}
首次运行会输出一堆数据,那些都是初始化运行创建表的数据,忽略即可
运行项目,然后一次执行sql观察输出:
insert into test values(1000,'111');
update test set name='222' where id=1000;
delete from test where id=1000;
通过新标签页打开图片查看大图
可以看见我们分别执行 insert、update、delete 语句,我们的CanalSharp都获取到了数据库变更。
1.mysql数据库版本有要求:5.7.13, 5.6.10,、5.5.18和5.1.40/48,不一定非要满足小版本号的要求,比如 5.7.x、5.6.x、5.5.x都应该可以,但是实际需要自己做测试。前面的具体版本号是Canal官方提供的资料,但是博主公司用的mysql 的版本是5.5.60,是可以正常使用Canal的。
2.mysql数据binlog的格式强烈建议设置为row
3.Canal并非必须连接到master数据库,它同样可以连接到slave数据库,只是从库出了需要开启写入binlog以外还需要设置 log-slave-updates
开启。
4.如果生产环境已经存在mysql集群,且集群主库的binlog格式为mixed,mysql数据库集群的主库binlog格式可以不用改依然为 mixed,设置某一个从库binlog格式配置为 row,让Canal连接从库,这样可以避免对生产环境的mysql集群产生影响。
5.mysql支持Statement,MiXED,以及ROW三种格式的binlog为什么推荐使用row格式binlog,经过博主实际测试,使用row格式兼容性是最好的,实际可以自己测试。
CanalSharp的介绍到这里就结束了,如果觉得这个项目有用的欢迎大家来个 star 。后续将会写几篇文章介绍更详细的使用方法以及实战。
CanalSharp 开源地址:https://github.com/CanalSharp/CanalSharp
Canal 开源地址:https://github.com/alibaba/canal
文章浏览阅读1.6k次。安装配置gi、安装数据库软件、dbca建库见下:http://blog.csdn.net/kadwf123/article/details/784299611、检查集群节点及状态:[root@rac2 ~]# olsnodes -srac1 Activerac2 Activerac3 Activerac4 Active[root@rac2 ~]_12c查看crs状态
文章浏览阅读1.3w次,点赞45次,收藏99次。我个人用的是anaconda3的一个python集成环境,自带jupyter notebook,但在我打开jupyter notebook界面后,却找不到对应的虚拟环境,原来是jupyter notebook只是通用于下载anaconda时自带的环境,其他环境要想使用必须手动下载一些库:1.首先进入到自己创建的虚拟环境(pytorch是虚拟环境的名字)activate pytorch2.在该环境下下载这个库conda install ipykernelconda install nb__jupyter没有pytorch环境
文章浏览阅读5.2k次,点赞19次,收藏28次。选择scoop纯属意外,也是无奈,因为电脑用户被锁了管理员权限,所有exe安装程序都无法安装,只可以用绿色软件,最后被我发现scoop,省去了到处下载XXX绿色版的烦恼,当然scoop里需要管理员权限的软件也跟我无缘了(譬如everything)。推荐添加dorado这个bucket镜像,里面很多中文软件,但是部分国外的软件下载地址在github,可能无法下载。以上两个是官方bucket的国内镜像,所有软件建议优先从这里下载。上面可以看到很多bucket以及软件数。如果官网登陆不了可以试一下以下方式。_scoop-cn
文章浏览阅读4.5k次,点赞2次,收藏3次。首先要有一个color-picker组件 <el-color-picker v-model="headcolor"></el-color-picker>在data里面data() { return {headcolor: ’ #278add ’ //这里可以选择一个默认的颜色} }然后在你想要改变颜色的地方用v-bind绑定就好了,例如:这里的:sty..._vue el-color-picker
文章浏览阅读640次。基于芯片日益增长的问题,所以内核开发者们引入了新的方法,就是在内核中只保留函数,而数据则不包含,由用户(应用程序员)自己把数据按照规定的格式编写,并放在约定的地方,为了不占用过多的内存,还要求数据以根精简的方式编写。boot启动时,传参给内核,告诉内核设备树文件和kernel的位置,内核启动时根据地址去找到设备树文件,再利用专用的编译器去反编译dtb文件,将dtb还原成数据结构,以供驱动的函数去调用。firmware是三星的一个固件的设备信息,因为找不到固件,所以内核启动不成功。_exynos 4412 刷机
文章浏览阅读2w次,点赞24次,收藏42次。Linux系统配置jdkLinux学习教程,Linux入门教程(超详细)_linux配置jdk
文章浏览阅读3.3k次,点赞5次,收藏19次。xlabel('\delta');ylabel('AUC');具体符号的对照表参照下图:_matlab微米怎么输入
文章浏览阅读119次。顺序读写指的是按照文件中数据的顺序进行读取或写入。对于文本文件,可以使用fgets、fputs、fscanf、fprintf等函数进行顺序读写。在C语言中,对文件的操作通常涉及文件的打开、读写以及关闭。文件的打开使用fopen函数,而关闭则使用fclose函数。在C语言中,可以使用fread和fwrite函数进行二进制读写。 Biaoge 于2024-03-09 23:51发布 阅读量:7 ️文章类型:【 C语言程序设计 】在C语言中,用于打开文件的函数是____,用于关闭文件的函数是____。
文章浏览阅读3.4k次,点赞2次,收藏13次。跟随鼠标移动的粒子以grid(SOP)为partical(SOP)的资源模板,调整后连接【Geo组合+point spirit(MAT)】,在连接【feedback组合】适当调整。影响粒子动态的节点【metaball(SOP)+force(SOP)】添加mouse in(CHOP)鼠标位置到metaball的坐标,实现鼠标影响。..._touchdesigner怎么让一个模型跟着鼠标移动
文章浏览阅读178次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:Springboot + mybatis + Maven +mysql5.7或8.0+html+css+js等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。_基于java技术的停车场管理系统实现与设计
文章浏览阅读3.5k次。前言对于MediaPlayer播放器的源码分析内容相对来说比较多,会从Java-&amp;gt;Jni-&amp;gt;C/C++慢慢分析,后面会慢慢更新。另外,博客只作为自己学习记录的一种方式,对于其他的不过多的评论。MediaPlayerDemopublic class MainActivity extends AppCompatActivity implements SurfaceHolder.Cal..._android多媒体播放源码分析 时序图
文章浏览阅读2.4k次,点赞41次,收藏13次。java 数据结构与算法 ——快速排序法_快速排序法