flink-cdc解析:BaronND/connectors
flink-cdc解析
要想深入学习,先去哥的GitHub上去下载源码
起源背景
(相关资料图)
数据库的更改对于客户端来说是没有感知的,你需要开启线程去查询,才知道数据有没有更新,但是就算是查询,如果是直接select * from ....,这样获取的结果还要和上次获取的结果对比,才知道数据有没有发生变化,耗时大。要想实时监控mysql数据,要用到mysql binlog日志处理流程,binlog里保存了mysql的DDL和DML,而且是追加模式的,很适合流失数据的处理。我们知道每个taskmanager都有两个网关:输入和输出。他们之间通过netty进行通讯,有了一个buffer数据就可以下发。Source机制其实就是类似,是append模式,是一条一条的追加,不是批处理那样一次全部加载后交给下游,所以他能支持的数据源就不多,而mysql的binglog刚好合适这样的一个模式。
例如canal监听binlog把日志写入到kafka中。Apache Flink实时消费Kakfa的数据实现mysql数据的同步或其他内容等。整体的处理链路较长,需要用到的组件也比较多。虽然kafka是能够用来解耦了,但是也会造成磁盘资源和时间的消耗。Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析。简单来说链路会变成这样。也就是说数据不再通过canal与kafka进行同步,而flink直接进行处理mysql的数据。节省了canal与kafka的过程。
Flink 1.11中实现了mysql-cdc与postgre-CDC,也就是说在Flink 1.11中我们可以直接通过Flink来直接消费mysql,postgresql的数据进行业务的处理。替代了之前的canal+kafka节点.直接通过sql的方式来实现对mysql数据的同步。
CDC介绍CDC简介
CDC(Change Data Capture)变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称为 CDC 。业界主要有基于查询的 CDC 和基于日志的 CDC ,目前flink支持两种内置的connector,PostgreSQL和mysql。应用场景有如下。
1.使用flink sql进行数据同步,可以将数据从一个数据同步到其他的地方,比如mysql、elasticsearch等。
2.可以在源数据库上实时的物化一个聚合视图
3.因为只是增量同步,所以可以实时的低延迟的同步数据
4.维表join
解决方案
业务系统经常会遇到需要更新数据到多个存储的需求。例如:一个订单系统刚刚开始只需要写入数据库即可完成业务使用。某天 BI 团队期望对数据库做全文索引,于是我们同时要写多一份数据到 ES 中,改造后一段时间,又有需求需要写入到 Redis 缓存中。
很明显这种模式是不可持续发展的,这种双写到各个数据存储系统中可能导致不可维护和扩展,数据一致性问题等,需要引入分布式事务,成本和复杂度也随之增加。我们可以通过 CDC(Change Data Capture)工具进行解除耦合,同步到下游需要同步的存储系统。通过这种方式提高系统的稳健性,也方便后续的维护。
Mysql案例分析
为了设置MySQL CDC连接器,下表提供了使用构建自动化工具(例如Maven或SBT)和带有SQL JAR捆绑包的SQL Client的两个项目的依赖项信息。
1、Maven依赖
com.alibaba.ververica
flink-connector-mysql-cdc
1.1.0
2、SQL客户端JAR
下载flink-sql-connector-mysql-cdc-1.1.0.jar并将其放在下/lib/。
创建MySQL用户
必须定义一个对Debezium MySQL连接器监视的所有数据库具有适当权限的MySQL用户。
mysql> CREATE USER "user"@"localhost" IDENTIFIED BY "password";
2、向用户授予所需的权限
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "user" IDENTIFIED BY "password";
3、最终确定用户的权限
mysql> FLUSH PRIVILEGES;
源码解析
其实最主要的就是用MySQLSource类build一个DebeziumSourceFunction出来,也就是MysqlSource就是一个Builder,包括table模块中的MysqlTableSource也会调用MysqlSource来build一个DebeziumSourceFunction,所以今天的主角就是DebeziumSourceFunction。因为上一篇博客flink connector源码分析分析过sourcefunction,所以我们就直接看open和run的逻辑,就不细节讲open和run是如何以及何时调用的,总结上一篇博客来说就是先回调用open初始化sourcefunction一些属性,run就是把数据拉取过来然后emit出去。
1、步骤分析
DebeziumSourceFunction中的open:可以看到open这里就是创建一个线程池,非常简单。接下来看看run:这些属性当然是为了给debenium使用。在他的run方法里创建了一个DebeziumChangeConsumer,以及用properties和DebeziumChangeConsumer创建了DebeziumEngine,最后用线程池来执行DebeziumEngine,但是看看DebeziumChangeConsumer和DebeziumEngine两个类,就知道这个sourcefunction很简单。DebeziumChangeConsumer类的实现接口只有一个方法handleBatch,可以看到这个逻辑非常简单,就是先把debenium获取到的cdc数据先反序列化一波,直接emit到下游了,那问题来了,handleBatch中的参数数据是如何获取的呢,既然是debenium获取的,而且只有一个DebeziumEngine(这个是个runnable)。那咱们就先看看DebeziumEngine,因为DebeziumEngine是debezium的组件跟flink没关系,刚才咱们知道DebeziumEngine是个Runnable(其实也是个接口,默认实现为EmbeddedEngine)。既然是runnable那就主要看看他的run方法。DebeziumEngine的run方法里首先会创建一个task,然后启动他,明显是启动task去获取任务。接下启动任务之后就会在循环里面poll数据,说明task里面肯定有一个组赛队列,接着handler会处理获取的数据,还记得刚才咱们说的DebeziumChangeConsumer吗,他就是咱们的handler呀,正好刚才咱们还愁着handleBatch中的参数从哪里来,现在看到了吧,就是从这里来。现在咱们知道原来数据是从task的阻塞队列里面的,那么,task启动之后肯定是把数据方法阻塞队列中了,基于这样的猜想咱们来看看task。这里咱们主要看看task的start做了啥task.start里第一个start不用看,第二个start是MySqlConnectorTask实现的,看类名明显知道这是处理mysql的,其实在start里面会创建好多Reader(BinlogReader用于增量获取,SnapshotReader用于第一次全量拉取),然后放到ChainedReader中。
2、核心逻辑总结
MySqlConnectorTask的ChainedReader包含多个Reader,这些reader就是用来获取全量数据和增量数据,这些数据会放进抽象类AbstractReader中的BlockingQueue中在Debezium的run方法中会从task中poll数据task会从Reader中的blockingQueue拿数据数据拿到之后会交给DebezinumConsumer,DebezinumConsumer会先反序列化数据,然后emit给下游MySQL CDC表的创建
1、Sql的方式:
-- register a MySQL table "orders" in Flink SQLCREATE TABLE orders ( order_id INT, order_date TIMESTAMP(0), customer_name STRING, price DECIMAL(10, 5), product_id INT, order_status BOOLEAN) WITH ( "connector" = "mysql-cdc", "hostname" = "localhost", "port" = "3306", "username" = "root", "password" = "123456", "database-name" = "mydb", "table-name" = "orders");-- read snapshot and binlogs from orders tableSELECT * FROM orders;
2、Stream API:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;public class MySqlBinlogSourceExample { public static void main(String[] args) throws Exception { SourceFunctionsourceFunction = MySQLSource.builder() .hostname("localhost") .port(3306) .databaseList("inventory") // monitor all tables under inventory database .username("flinkuser") .password("flinkpw") .deserializer(new StringDebeziumDeserializationSchema()) // SourceRecord to String .build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.addSource(sourceFunction) .print().setParallelism(1); env.execute(); }}
特征和常见问题
1.特征:
1、Exactly-Once Processing 一次处理 MySQL CDC连接器是Flink Source连接器,它将首先读取数据库快照,然后即使发生故障,也将以完全一次的处理继续读取二进制日志。请阅读连接器如何执行数据库快照。
2、Single Thread Reading 单线程阅读 MySQL CDC源无法并行读取,因为只有一个任务可以接收Binlog事件。
2.常见问题:
1、如何跳过快照并仅从binlog中读取?可以通过选项进行控制debezium.snapshot.mode,您可以将其设置为:
never:指定连接永远不要使用快照,并且在第一次使用逻辑服务器名称启动时,连接器应该从binlog的开头读取;请谨慎使用,因为只有在binlog保证包含数据库的整个历史记录时才有效。
schema_only:如果自连接器启动以来不需要数据的连续快照,而只需要它们进行更改,则可以使用该schema_only选项,其中连接器仅对模式(而不是数据)进行快照。
2、如何读取包含多个表(例如user_00,user_01,...,user99)的共享数据库?该table-name选项支持正则表达式以监视多个与正则表达式匹配的表。因此,您可以设置table-name为user.*监视所有user_前缀表。database-name选项相同。请注意,共享表应该在相同的架构中。
3、ConnectException:收到用于处理的DML"...",binlog可能包含使用语句或基于混合的复制格式生成的事件 如果有上述异常,请检查是否binlog_format为ROW,您可以通过show variables like "%binlog_format%"在MySQL客户端中运行来进行检查。请注意,即使binlog_format您的数据库配置为ROW,也可以通过其他会话更改此配置,例如SET SESSION binlog_format="MIXED"; SET SESSION tx_isolation="REPEATABLE-READ"; COMMIT;。还请确保没有其他会话正在更改此配置
代码demo
package com.flink.java.cdc.mysql;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;public class MySqlBinlogDemo { public static void main(String[] args) { SourceFunctionsourceFunction = MySQLSource.builder() .hostname("192.168.100.1") .port(3306) // 监视库存数据库下的所有表// .databaseList("test") .tableList("test.user_test") .username("root") .password("123456") // 将SourceRecord转换为String .deserializer(new StringDebeziumDeserializationSchema()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()); // 对接收器使用并行性1以保持消息顺序 env.addSource(sourceFunction).print().setParallelism(1); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } }}
2.mysql
package com.flink.java.cdc.mysql;import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;import org.apache.flink.connector.jdbc.JdbcOutputFormat;import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.sink.SinkFunction;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.types.Row;import java.sql.Types;public class Mysql2Mysql { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SourceFunctionsourceFunction = MySQLSource.builder() .hostname("192.168.100.1") .port(3306) .tableList("test.user_test2") .username("root") .password("123456") // 将SourceRecord转换为Row .deserializer(new DebeziumDeserialization()) .build(); DataStreamSourcestreamSource = env.addSource(sourceFunction);// String query = "INSERT INTO test.user_test(id,name) VALUES (?,?) on duplicate key update id=VALUES(id);"; String query = "INSERT INTO test.user_test(id,name) VALUES (?,?) on duplicate key update id=VALUES(id);"; JdbcOutputFormat jdbcOutputFormat = JdbcOutputFormat.buildJdbcOutputFormat() .setDrivername("com.mysql.cj.jdbc.Driver") .setDBUrl("jdbc:mysql://192.168.100.1:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false") .setUsername("root") .setPassword("123456") .setQuery(query) .setSqlTypes(new int[]{Types.VARCHAR, Types.VARCHAR}) .setBatchSize(1) .finish(); streamSource.print(); SinkFunctionjdbcSinkFunction = new GenericJdbcSinkFunction<>(jdbcOutputFormat); streamSource.addSink(jdbcSinkFunction); try { env.execute(); } catch (Exception e) { e.printStackTrace(); } }}
package com.flink.java.cdc.mysql;import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.types.Row;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.header.Headers;import org.apache.kafka.connect.source.SourceRecord;import java.util.Map;public class DebeziumDeserialization implements DebeziumDeserializationSchema{ private static final long serialVersionUID = -3168848963265670603L; @Override public void deserialize(SourceRecord record, Collectorout) throws Exception { //SourceRecord MapsourcePartition = record.sourcePartition(); MapsourceOffset = record.sourceOffset(); //ConnectRecord Object value = record.value(); Row row = new Row(2); Struct struct = (Struct) value; struct = struct.getStruct("after"); //id,name String id = struct.getString("id"); String name = struct.getString("name"); row.setField(0, id); row.setField(1, name); out.collect(row); } @Override public TypeInformationgetProducedType() { return BasicTypeInfo.of(Row.class); }}
需要额外加入的依赖
mysqlmysql-connector-java8.0.22 org.apache.flinkflink-avro-confluent-registry1.11.1com.alibaba.ververicaflink-connector-mysql-cdc1.1.0
标签:
相关推荐:
最新新闻:
- 如何将图像数据编码为比特流?转换方法步骤
- PF的关键字顺序有多灵活?PF防火墙最详细教程
- 天天速讯:页面自动跳转怎么操作?javascript实现网页自动跳转的5种方法
- 国外开发API,api地址、Authentication认证
- hcie培训价格多少钱?线上培训和线下面授的区别在哪里?|重点聚焦
- 华为t2010怎么刷机?华为t2010刷机教程及评测_全球今日报
- windows7桌面图标怎么改大小?修改方法步骤
- iphone4s怎么设置彩信?iphone4s联通卡彩信设置方法
- 什么软件修图较好?图片处理有哪些技巧?
- 国内常用的ntp服务器 国内常用NTP服务器地址及IP_环球新视野
- rocketdock怎么操作?rocketdock教程之程序设置-环球热消息
- ssh/authorized_keys软件安装教程 天天视讯
- 当前速读:QQ火炬手图标怎么关闭?关闭图标方法介绍
- 【世界速看料】戴森v7v8v10什么区别?V6和V8的区别介绍
- linux主机的详细介绍 linux主机安装的八个步骤_全球讯息
- 环球速读:富可视m310怎么刷机?富可视m310刷机教程
- 天天观焦点:Docker基于镜像 可以秒级启动各种容器
- 手机wifi密码破解器哪个好用?手机wifi密码破解器介绍
- 天天即时:淘宝拥有注册会员1.7亿 注册用户不断增长
- 手摇甘蔗榨汁机怎么样?品牌有哪些? 天天视讯
- androidstudio发现环境坏了 jdk出问题了?
- 天天信息:linux内核编译ccflag,linux-内核编译 centisecs文件控制内核参数
- 天天视点!SVN系列教程-第四章-TortoiseSVN使用大全
- flink-cdc解析:BaronND/connectors
- 全球今头条!连连支付怎么样?连连靠谱吗?
- 每日关注!identity字段怎么使用?identity字段的基本用法及使用方法
- 惠普笔记本电脑 设备管理器中双击未知设备的解决方案|每日播报
- 【盗梦空间】潜行凶间(港)全面启动
- AngularJS中的refresher该如何使用?使用技巧|动态
- 联想s720i配置怎么样?联想s720i配置总结|热文
- 【证书】PFX证书、CER证书申请流程详解
- dockerexec-itoracle11g创建容器实例分享
- PocketTrail实现拖尾效果的脚本及说明|焦点短讯
- 当前报道:功放如何连接?功放机接线图详解
- 手机无限重启或无法开机怎么办?索爱st25i强刷教程及注意事项 快报
- 头条:绿色出行新方式:共享汽车APP的交互体验
- 环球观焦点:B站视频播放源地址获取及B站视频下载
- 复旦壁纸:手绘正校门1024、7681280
- 佳能IP1180怎么样?佳能IP1180详情介绍
- 天天热门:象棋里的卧槽马是什么?典型案例分享
- 无法访问文件夹怎么办?无法访问文件夹的原因和解决方法-焦点速看
- 设计带构造函数的Dog类 对数据成员进行初始化
- 焦点报道:薛定谔的猫和EPR佯谬——量子力学史上的经典术语
- 网银无法登陆怎么办?网上银行登录安全控件
- 联想昭阳E46G能玩魔兽世界吗?显卡集显特效开起来根本没办法玩
- 最新:在百度里下载格式工厂——KuGo格式
- speedtest-cli|网速测试工具命令行方式
- 【数据更新】全国上网卡专属号段数据汇总
- 全球新动态:Calendar类:日历字段的转换方法
- 数据字典中的同义词:user_synonyms
- 音游《最终幻想:节奏剧场》试玩版2月1日上线
- 【环球速看料】《死亡空间:重制版》开发者讨论该系列未来发展
- 索尼承诺在PSVR2发行前“增加PS5主机供应”:全球要闻
- 财报预警!三大航空公司合计预亏超千亿,昔日养猪巨头亏损超百亿
- 郭明錤:可折叠iPad或明年问世,今年苹果可能不会发新品,出货至少下降10%
- 热点在线丨《死亡空间》重制版不支持Steam deck
- 三星新款翻转笔记本曝光:高配13代i9-13900H
- NBC季度亏损10亿!好于华尔街预期
- 无人驾驶出租?旧金山表示先缓缓_世界新资讯
- 联想拯救者新品将用上超频内存 支持DDR5-6000_环球快看点
- 突发!伦敦街头惊现《战神》利维坦之斧:外观霸气 地面开裂
- 全球观热点:青海春天药用资源科技股份有限公司 2022年度业绩预亏暨风险提示公告
- Xbox金会员2月会免游戏公布 《为了吾王》等
- 兔年首个交易日,两大股东公告“抢筹”兴业银行
- 【世界热闻】曝百度将推人工智能聊天工具:类似ChatGPT
- 环球微动态丨腾讯游戏春节7天吸金破4.5亿:《王者荣耀》收入超3千万!
- 国产《时灵:星辰愚者》2月14日发售 登陆PS5/PC/NS
- 暴雪游戏国服“头七” 你找到合适的平替游戏了吗?
- 《流浪地球2》MOSS和末日铁拳是一个配音演员:给破球来个上勾拳!:世界快消息
- 【环球速看料】PS5广告:奎爷利维坦巨斧现身英国伦敦
- 效仿印尼,全球第二大镍生产国菲律宾考虑对镍出口征税
- 摩根士丹利“大空头”警告:不要参与反弹行情,美股熊市还没结束
- 消费者遭遇特步反向抹零 特步回应:系统设置四舍五入
- 超4000万!Capcom称当前财年销量有望是史上最高一年:全球即时看
- 最新:我爱我家2022年业绩预告:聚焦品质服务 推进精细运营