家用电脑做网站能备案,上海网站推广联盟,武安网站制作,分享几个x站好用的关键词1、What is Canal#xff1f;canal [kənl]#xff0c;中文翻译为 水道/管道/沟渠/运河#xff0c;主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析#xff0c;是阿里巴巴开发并开源的#xff0c;采用Java语言开发#xff1b;历史背景是早期阿里巴巴因为杭州…1、What is Canalcanal [kənæl]中文翻译为 水道/管道/沟渠/运河主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析是阿里巴巴开发并开源的采用Java语言开发历史背景是早期阿里巴巴因为杭州和美国双机房部署存在跨机房数据同步的业务需求实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步由此衍生出了canal项目Githubhttps://github.com/alibaba/canal2、工作原理传统MySQL主从复制工作原理从上层来看复制分成三步MySQL的主从复制将经过如下步骤1、当 master 主服务器上的数据发生改变时则将其改变写入二进制事件日志文件中2、salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测探测其是否发生过改变如果探测到 master 主服务器的二进制事件日志发生了改变则开始一个 I/O Thread 请求 master 二进制事件日志3、同时 master 主服务器为每个 I/O Thread 启动一个dump Thread用于向其发送二进制事件日志4、slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中5、salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志在本地重放使得其数据和主服务器保持一致6、最后 I/O Thread 和 SQL Thread 将进入睡眠状态等待下一次被唤醒canal 工作原理1、canal 模拟 MySQL slave 的交互协议把自己伪装为 MySQL slave向 MySQL master 发送dump 协议2、MySQL master 收到 dump 请求开始推送 binary log 给 slave (即canal )3、canal 解析 binary log 对象 (原始数据为byte流)3、Canal使用场景Canal是基于MySQL变更日志增量订阅和消费的组件可以使用在如下一些一些应用场景数据库实时备份业务cache刷新search build价格变化等重要业务消息带业务逻辑的增量数据处理跨数据库的数据备份(异构数据同步)例如mysql oraclemysqlmongomysql redismysql elasticsearch等当前canal 主要是支持源端 MySQL(也支持mariaDB)版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.xCanal搭建环境1、准备好MySQL运行环境2、开启 MySQL的binlog写入功能配置 binlog-format 为 ROW 模式my.cnf中配置如下:[mysqld]log-binmysql-bin #开启 binlogbinlog-formatROW #选择 ROW 模式server_id1 #配置MySQL replaction需要定义不要和canal的 slaveId重复3、授权canal连接MySQL账号具有作为MySQL slave的权限, 如果已有账户可直接 grant授权启动MySQL服务器登录mysql./mysql -uroot -p -h127.0.0.1 -P3306CREATE USER canal IDENTIFIED BY canal;GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal%;FLUSH PRIVILEGES;4、下载 canal部署程序Wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gztar -zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal.deployer-1.1.45、配置修改vim conf/example/instance.properties主要是修改配置文件中与自己的数据库配置相关的信息6、启动Canal./startup.sh7、查看进程ps -ef | grep canal8、查看 server 日志cat logs/canal/canal.log9、查看 instance 的日志vi logs/example/example.log10、关闭Canal./stop.shcanal server的默认端口号为11111如果需要调整的话可以去到\conf目录底下的canal.properties文件中进行修改相关命令#是否启用了日志show variables like log_bin;#怎样知道当前的日志show master status;#查看mysql binlog模式show variables like binlog_format;#获取binlog文件列表show binary logs;#查看当前正在写入的binlog文件show master status\G#查看指定binlog文件的内容show binlog events in mysql-bin.000002;注意binlog日志格式要求为row格式Binlog的三种基本类型分别为ROW模式 除了记录sql语句之外还会记录每个字段的变化情况能够清楚的记录每行数据的变化历史但是会占用较多的空间需要使用mysqlbinlog工具进行查看STATEMENT模式只记录了sql语句但是没有记录上下文信息在进行数据恢复的时候可能会导致数据的丢失情况MIX模式比较灵活的记录例如说当遇到了表结构变更的时候就会记录为statement模式。当遇到了数据更新或者删除情况下就会变为row模式启动了canal的server之后,便是基于java的客户端搭建了代码集成方式com.alibaba.ottercanal.client1.1.4package com.unwulian.search.engine.suggestion.service;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry.*;import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;import java.util.List;/*** canal测试** author shiye* create 2020-11-30 14:22*/public class CanalTest {public static void main(String[] args) {String ip 192.168.2.165;int port 11111;String destination example;String username ;String password ;CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(ip, port), destination, username, password);try {connector.connect();connector.subscribe(.*\\..*);//跳转到上次进行读取日志的地方connector.rollback();while (true) {//获取指定数量的数据Message message connector.getWithoutAck(1);long id message.getId();int size message.getEntries().size();if (id -1 || size 0) {//如果没有获取到数据就睡眠疫苗Thread.sleep(1000);} else {System.out.println(messge id: id);printEntry(message.getEntries());}//提交确认connector.ack(id);// connector.rollback(batchId); // 处理失败, 回滚数据}} catch (Exception e) {e.printStackTrace();} finally {connector.disconnect();}}private static void printEntry(List entrys) {for (Entry entry : entrys) {if (entry.getEntryType() EntryType.TRANSACTIONBEGIN || entry.getEntryType() EntryType.TRANSACTIONEND) {continue;}RowChange rowChage null;try {rowChage RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException(ERROR ## parser of eromanga-event has an error , data: entry.toString(), e);}EventType eventType rowChage.getEventType();System.out.println(String.format( binlog日志偏移量[%s:%s] , 库名,表名[%s,%s] , 操作类型 : %s,entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (RowData rowData : rowChage.getRowDatasList()) {if (eventType EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println(------- before);printColumn(rowData.getBeforeColumnsList());System.out.println(------- after);printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List columns) {for (Column column : columns) {System.out.println(column.getName() : column.getValue() update column.getUpdated());}}}springboo集成canal# 阿里binlog canal配置canal:ip: 192.168.2.13 #192.168.2.165subscribe: undev.t_bas_xxx1,undev.t_bas_xxx2#配置你要监听的表port: 11111destination: devusername:password:package com.unwulian.search.engine.suggestion.config;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.context.annotation.Configuration;import java.io.Serializable;/*** binlog canal的配置** author shiye* create 2020-07-17 19:30*/ConfigurationConfigurationProperties(prefix canal)public class CanalConfig implements Serializable {/*** ip*/private String ip;/*** mq监听表*/private String subscribe;/*** 端口*/private int port;/*** 目的地*/private String destination;/*** 用户名*/private String username ;/*** 密码*/private String password;public String getSubscribe() {return subscribe;}public void setSubscribe(String subscribe) {this.subscribe subscribe;}public String getIp() {return ip;}public void setIp(String ip) {this.ip ip;}public int getPort() {return port;}public void setPort(int port) {this.port port;}public String getDestination() {return destination;}public void setDestination(String destination) {this.destination destination;}public String getUsername() {return username;}public void setUsername(String username) {this.username username;}public String getPassword() {return password;}public void setPassword(String password) {this.password password;}}package com.unwulian.search.engine.suggestion.schedule;import com.alibaba.otter.canal.client.CanalConnector;import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;import com.alibaba.otter.canal.protocol.Message;import com.github.structlog4j.ILogger;import com.github.structlog4j.SLoggerFactory;import com.unwulian.search.engine.suggestion.config.CanalConfig;import com.unwulian.search.engine.suggestion.service.CardService;import com.unwulian.search.engine.suggestion.service.CommunityStructService;import com.unwulian.search.engine.suggestion.service.HouseService;import com.unwulian.search.engine.suggestion.service.RoomService;import org.springframework.beans.factory.InitializingBean;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import java.net.InetSocketAddress;import java.util.List;/*** 项目启动的时候就初始化canal启动一个线程去监听canal server** author shiye* create 2020-11-30 15:11*/Componentpublic class CanalTask implements InitializingBean {private static final ILogger logger SLoggerFactory.getLogger(CanalTask.class);Autowiredprivate CanalConfig canalConfig;Overridepublic void afterPropertiesSet() throws Exception {/*** 启动一下线程一直监听canal server*/new Thread(() - {logger.info(start Thread to listent canal success....);CanalConnector connector CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getIp(), canalConfig.getPort()),canalConfig.getDestination(),canalConfig.getUsername(),canalConfig.getPassword());connector.connect();connector.subscribe(canalConfig.getSubscribe());//跳转到上次进行读取日志的地方connector.rollback();try {while (true) {//获取指定数量的数据Message message connector.getWithoutAck(1);long id message.getId();int size message.getEntries().size();if (id -1 || size 0) {//如果没有获取到数据就睡眠1stry {Thread.sleep(1000);} catch (InterruptedException e) {logger.error(sleep 1000ms error.... e.getMessage());}} else {//处理消息//logger.info(messge id: id);handlerEntry(message.getEntries());}//提交确认connector.ack(id);// connector.rollback(batchId); // 处理失败, 回滚数据}} finally {//关闭connector.disconnect();}}).start();logger.info(start Thread to listent canal ....);}/*** 处理消息** param entrys*/private void handlerEntry(List entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() EntryType.TRANSACTIONBEGIN || entry.getEntryType() EntryType.TRANSACTIONEND) {//类型是事务开始事务结束不做处理continue;}//库名//String databaseName entry.getHeader().getSchemaName();//表名String tableName entry.getHeader().getTableName();RowChange rowChage null;try {rowChage RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {logger.error(ERROR 数据转换异常, data: entry.toString(), e);}switch (tableName) {case t_bas_xxx1://进行你的业务处理break;case t_bas_xxx2://进行你的业务处理break;default:return;}}}private static void printColumn(List columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() : column.getValue() 不做处理 column.getUpdated());}}}