为什么要建设学校网站,网站建设没有签定合同,安阳住房与城乡建设局官方网站,招聘网站排名文章目录Maxwell 概述1.1 Maxwell 定义1.2 Maxwell工作原理1.2.1 MySQL主从复制过程1.2.2 Maxwell的工作原理1.2.3 **MySQL** **的** binlog1.3 Maxwell和Canal的对比Maxwell使用2.1 Maxwell安装2.1.1 安装地址2.1.2 安装部署2.1.3 MySQL环境准备2.1.4 初始化Maxwell元数据库2.…
文章目录Maxwell 概述1.1 Maxwell 定义1.2 Maxwell工作原理1.2.1 MySQL主从复制过程1.2.2 Maxwell的工作原理1.2.3 **MySQL** **的** binlog1.3 Maxwell和Canal的对比Maxwell使用2.1 Maxwell安装2.1.1 安装地址2.1.2 安装部署2.1.3 MySQL环境准备2.1.4 初始化Maxwell元数据库2.1.5 Maxwell进程启动使用命令行参数启动 Maxwell 进程修改配置文件定制化启动 Maxwell 进程2.2 Maxwell入门案例2.2.1 监控 **Mysql** 数据并在控制台打印2.2.2 监控 Mysql 数据输出到 kafka实现步骤kafka 主题数据的分区控制2.2.3 监控 Mysql 指定表数据输出控制台2.2.4 监控 Mysql 指定表全量数据输出控制台数据初始化Maxwell 概述
1.1 Maxwell 定义
Maxwell 是由美国 Zendesk 开源用 Java 编写的 MySQL 实时抓取软件。 实时读取
MySQL 二进制日志 Binlog并生成 JSON 格式的消息作为生产者发送给 KafkaKinesis、
RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。
官网地址http://maxwells-daemon.io/
1.2 Maxwell工作原理
1.2.1 MySQL主从复制过程 Master 主库将改变记录写到二进制日志(binary log)中 Slave 从库向 mysql master 发送 dump 协议将 master 主库的 binary log events 拷贝
到它的中继日志(relay log)
Slave 从库读取并重做中继日志中的事件将改变的数据同步到自己的数据库。 1.2.2 Maxwell的工作原理
Maxwell 的工作原理很简单就是把自己伪装成 MySQL 的一个 slave然后以 slave
的身份假装从 MySQL(master)复制数据。
1.2.3 MySQL 的 binlog 什么是 binlog MySQL 的二进制日志可以说 MySQL 最重要的日志了它记录了所有的 DDL 和 DML(除 了数据查询语句)语句以事件形式记录还包含语句所执行的消耗的时间MySQL 的二进 制日志是事务安全型的。 一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景: 其一MySQL Replication 在 Master 端开启 binlogMaster 把它的二进制日志传递 给 slaves 来达到 master-slave 数据一致的目的。 其二自然就是数据恢复了通过使用 mysqlbinlog 工具来使恢复数据。 二进制日志包括两类文件二进制日志索引文件文件名后缀为.index用于记录所有 的二进制文件二进制日志文件文件名后缀为.00000*记录数据库所有的 DDL 和 DML(除 了数据查询语句)语句事件。 binlog 的开启 找到 MySQL 配置文件的位置 Linux: /etc/my.cnf 如果/etc 目录下没有可以通过 locate my.cnf 查找位置 Windows: \my.ini 在 mysql 的配置文件下,修改配置 在[mysqld] 区块设置/添加 log-binmysql-bin 这个表示 binlog 日志的前缀是 mysql-bin以后生成的日志文件就是 mysql-bin.000001 的文件后面的数字按顺序生成每次 mysql 重启或者到达单个文件大小的阈值时新生一个 文件按顺序编号。 binlog 的分类设置 mysql binlog 的格式有三种分别是 STATEMENT,MIXED,ROW。 在配置文件中可以选择配置 binlog_format statement|mixed|row 三种格式的区别 statement 语句级binlog 会记录每次一执行写操作的语句。 相对 row 模式节省空间但是可能产生不一致性比如 update test set create_datenow(); 如果用 binlog 日志进行恢复由于执行时间不同可能产生的数据就不同。 优点 节省空间 缺点 有可能造成数据不一致。 row 行级 binlog 会记录每次操作后每行记录的变化。 优点保持数据的绝对一致性。因为不管 sql 是什么引用了什么函数他只记录 执行后的效果。 缺点占用较大空间。 mixed 混合级别statement 的升级版一定程度上解决了 statement 模式因为一些情况 而造成的数据不一致问题。 默认还是 statement在某些情况下譬如 当函数中包含 UUID() 时 包含 AUTO_INCREMENT 字段的表被更新时 执行 INSERT DELAYED 语句时 用 UDF 时 会按照 ROW 的方式进行处理 优点节省空间同时兼顾了一定的一致性。 缺点还有些极个别情况依旧会造成不一致另外 statement 和 mixed 对于需要对 binlog 监控的情况都不方便。 综合上面对比Maxwell 想做监控分析选择 row 格式比较合适
1.3 Maxwell和Canal的对比
对比CanalMaxwell语言javaJava数据格式格式自由json数据采集模式增量增量/全量数据落地定制支持Kafka等多种平台HA支持支持
Maxwell使用
2.1 Maxwell安装
2.1.1 安装地址
Maxwell 官网地址http://maxwells-daemon.io/
文档地址https://maxwells-daemon.io/quickstart/
2.1.2 安装部署 这里我们选择1.29.2版本进行介绍该版本使用jdk1.8版本之后的都是11 上传 maxwell-1.29.2.tar.gz 到/opt/software 下 解压 maxwell-1.29.2.tar.gz 的安装包到/opt/module 下 [atguiguhadoop102 software]$ tar -zxvf maxwell-1.29.2.tar.gz -
C /opt/module/2.1.3 MySQL环境准备 修改 mysql 的配置文件开启 MySQL Binlog 设置 atguiguhadoop102 software]$ sudo vim /etc/my.cnf
在[mysqld]模块下添加一下内容
[mysqld]
server_id1
log-binmysql-bin
binlog_formatrow
#设置多个数据库监控不能使用逗号分隔复制多行即可
#binlog-do-dbtest_maxwell并重启 Mysql 服务
[atguiguhadoop102 software]$ sudo systemctl restart mysqld
登录 mysql 并查看是否修改完成
[atguiguhadoop102 ~]$ mysql -uroot -p123456
mysql show variables like %binlog%;
查看下列属性
binlog_format | ROW进入/var/lib/mysql 目录查看 MySQL 生成的 binlog 文件 [atguiguhadoop102 ~]$ cd /var/lib/mysql
[atguiguhadoop102 mysql]$ sudo ls -l
总用量 188500
-rw-r-----. 1 mysql mysql 154 11 月 17 16:30 mysqlbin.000001
-rw-r-----. 1 mysql mysql 19 11 月 17 16:30 mysqlbin.index注MySQL 生成的 binlog 文件初始大小一定是 154 字节然后前缀是 log-bin 参数配 置的后缀是默认从.000001然后依次递增。除了 binlog 文件文件以外MySQL 还会额外 生产一个.index 索引文件用来记录当前使用的 binlog 文件。
2.1.4 初始化Maxwell元数据库 在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据 [atguiguhadoop102 module]$ mysql -uroot -p123456
mysql CREATE DATABASE maxwell;设置 mysql 用户密码安全级别 mysql set global validate_password_length4;
mysql set global validate_password_policy0;分配一个账号可以操作该数据库 mysql GRANT ALL ON maxwell.* TO maxwell% IDENTIFIED BY 123456;分配这个账号可以监控其他数据库的权限 mysql GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON
*.* TO maxwell%;刷新 mysql 表权限 mysql flush privileges;MySQL 8 #创建账户
create user root172.16.10.203 identified by password#赋予权限with grant option这个选项表示该用户可以将自己拥有的权限授权给别人
grant all privileges on *.* to root172.16.10.203 with grant option#改密码授权超用户flush privileges 命令本质上的作用是将当前user和privilige表中的用户信息/权限设置从mysql库(MySQL数据库的内置库)中提取到内存里
flush privileges;-------CREATE DATABASE maxwell;
create user maxwell% identified by 123456
grant all privileges on *.* to maxwell% with grant option
flush privileges; 2.1.5 Maxwell进程启动
Maxwell 进程启动方式有如下两种
使用命令行参数启动 Maxwell 进程
[atguiguhadoop102 maxwell-1.29.2]$ bin/maxwell --
usermaxwell --password123456 --hosthadoop102 --
producerstdout–user 连接 mysql 的用户
–password 连接 mysql 的用户的密码
–host mysql 安装的主机名/IP
–producer 生产者模式(stdout控制台 kafkakafka 集群)
修改配置文件定制化启动 Maxwell 进程
[atguiguhadoop102 maxwell-1.29.2]$ cp
config.properties.example config.properties
[atguiguhadoop102 maxwell-1.29.2]$ vim config.properties
[atguiguhadoop102 maxwell-1.29.2]$ bin/maxwell --
config ./config.properties2.2 Maxwell入门案例
2.2.1 监控 Mysql 数据并在控制台打印 运行 maxwell 来监控 mysql 数据更新 [atguiguhadoop102 maxwell-1.29.2]$ bin/maxwell --
usermaxwell --password123456 --hosthadoop102 --
producerstdout向 mysql 的 test_maxwell 库的 test 表插入一条数据查看 maxwell 的控制台输出 mysql insert into test values(1,aaa);
{database: test_maxwell, --库名table: test, --表名type: insert, --数据更新类型ts: 1637244821, --操作时间xid: 8714, --操作 idcommit: true, --提交成功data: { --数据id: 1,name: aaa}
}向 mysql 的 test_maxwell 库的 test 表同时插入 3 条数据控制台出现了 3 条 json 日志说明 maxwell 是以数据行为单位进行日志的采集的。 mysql INSERT INTO test VALUES(2,bbb),(3,ccc),(4,ddd);
{database:test_maxwell,table:test,type:insert,ts
:1637245127,xid:9129,xoffset:0,data:{id:2,name:bbb
}}
{database:test_maxwell,table:test,type:insert,ts
:1637245127,xid:9129,xoffset:1,data:{id:3,name:ccc
}}
{database:test_maxwell,table:test,type:insert,ts
:1637245127,xid:9129,commit:true,data:{id:4,name:dd
d}}
mysql update test set namezaijian where id 1;
{database:test_maxwell,table:test,type:update,ts
:1631618614,xid:535,commit:true,data:{id:1,name:zai
jian},old:{name:nihao}}修改 test_maxwell 库的 test 表的一条数据查看 maxwell 的控制台输出 mysql update test set nameabc where id 1;
{database: test_maxwell,table: test,type: update,ts: 1637245338,xid: 9418,commit: true,data: { --修改后的数据id: 1,name: abc},old: { --修改前的数据name: aaa}
}删除 test_maxwell 库的 test 表的一条数据查看 maxwell 的控制台输出 mysql DELETE FROM test WHERE id 1;
{database: test_maxwell,table: test,type: delete,ts: 1637245630,xid: 9816,commit: true,data: {id: 1,name: abc}
}2.2.2 监控 Mysql 数据输出到 kafka
实现步骤 启动 zookeeper 和 kafka [atguiguhadoop102 bin]$ jpsallhadoop102
3511 QuorumPeerMain
4127 Kafkahadoop103
1885 Kafka
1342 QuorumPeerMainhadoop104
1345 QuorumPeerMain
1886 Kafka启动 Maxwell 监控 binlog atguiguhadoop102 maxwell-1.29.2]$ bin/maxwell --usermaxwell
--password123456 --hosthadoop102 --producerkafka --
kafka.bootstrap.servershadoop102:9092 --kafka_topicmaxwell打开 kafka 的控制台的消费者消费 maxwell 主题 [atguiguhadoop102 ~]$ kafka-console-consumer.sh --bootstrapserver hadoop102:9092 --topic maxwell
4向 test_maxwell 库的 test 表再次插入一条数据向 test_maxwell 库的 test 表再次插入一条数据 mysql insert into test values (5,eee);通过 kafka 消费者来查看到了数据说明数据成功传入 kafka {database:test_maxwell,table:test,type:insert,ts
:1637245889,xid:10155,commit:true,data:{id:5,name:e
ee}}kafka 主题数据的分区控制
在公司生产环境中我们一般都会用 maxwell 监控多个 mysql 库的数据然后将这
些数据发往 kafka 的一个主题 Topic并且这个主题也肯定是多分区的为了提高并发度。
那么如何控制这些数据的分区问题就变得至关重要实现步骤如下 修改 maxwell 的配置文件定制化启动 maxwell 进程 [atguiguhadoop102 maxwell-1.29.2]$ vim config.properties
# tl;dr config
log_levelinfo
producerkafka
kafka.bootstrap.servershadoop102:9092
# mysql login info
hosthadoop102
usermaxwell
password123456
# *** kafka ***
# list of kafka brokers
#kafka.bootstrap.servershosta:9092,hostb:9092
# kafka topic to write to
# this can be static, e.g. maxwell, or dynamic, e.g.
namespace_%{database}_%{table}
# in the latter case database and table will be replaced
with the values for the row being processed
kafka_topicmaxwell3
# *** partitioning ***
# What part of the data do we partition by?
#producer_partition_bydatabase # [database, table,
primary_key, transaction_id, column]
producer_partition_bydatabase
控制数据分区模式可选模式有 库名表名主键列名
# specify what fields to partition by when using
producer_partition_bycolumn
# column separated list.
#producer_partition_columnsname
# when using producer_partition_bycolumn, partition by this
when
# the specified column(s) dont exist.
#producer_partition_by_fallbackdatabase手动创建一个 3 个分区的 topic名字就叫做 maxwell3 [atguiguhadoop102 maxwell-1.29.2]$ kafka-topics.sh --zookeeper
hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka --create --
replication-factor 2 --partitions 3 --topic maxwell3利用配置文件启动 Maxwell 进程 [atguiguhadoop102 maxwell-1.29.2]$ bin/maxwell --
config ./config.properties向 test_maxwell 库的 test 表再次插入一条数据 mysql insert into test_maxwell.test values (6,fff);通过 kafka tool 工具查看此条数据进入了 maxwell3 主题的 1 号分区 向 test 库的 aaa 表插入一条数据 mysql insert into test_maxwell2.test values (23,dd);通过 kafka tool 工具查看此条数据进入了 maxwell3 主题的 0 号分区,说明库名 会对数据进入的分区造成影响。 2.2.3 监控 Mysql 指定表数据输出控制台 运行 maxwell 来监控 mysql 指定表数据更新 [atguiguhadoop102 maxwell-1.29.2]$ bin/maxwell --
usermaxwell --password123456 --hosthadoop102 --filter
exclude: *.*, include:test_maxwell.test --producerstdout向 test_maxwell.test 表插入一条数据查看 maxwell 的监控 mysql insert into test_maxwell.test values(7,ggg);
{database:test_maxwell,table:test,type:insert,ts
:1637247760,xid:11818,commit:true,data:{id:7,name:g
gg}}向 test_maxwell.test2 表插入一条数据查看 maxwell 的监控 mysql insert into test1 values(1,nihao);
本次没有收到任何信息
说明 include 参数生效只能监控指定的 mysql 表的信息注还可以设置 include:test_maxwell.*通过此种方式来监控 mysql 某个库的所有 表也就是说过滤整个库。读者可以自行测试。
2.2.4 监控 Mysql 指定表全量数据输出控制台数据初始化
Maxwell 进程默认只能监控 mysql 的 binlog 日志的新增及变化的数据但是Maxwell 是支持数据初始化的可以通过修改 Maxwell 的元数据来对 MySQL 的某张表进行数据初始化也就是我们常说的全量同步。具体操作步骤如下
需求将 test_maxwell 库下的 test2 表的四条数据全量导入到 maxwell 控制台进行打印。 修改 Maxwell 的元数据触发数据初始化机制在 mysql 的 maxwell 库中 bootstrap表中插入一条数据写明需要全量数据的库名和表名。 mysql insert into maxwell.bootstrap(database_name,table_name)
values(test_maxwell,test2);启动 maxwell 进程此时初始化程序会直接打印 test2 表的所有数据 [atguiguhadoop102 maxwell-1.29.2]$ bin/maxwell --
usermaxwell --password123456 --hosthadoop102 --
producerstdout
Using kafka version: 1.0.0
23:15:38,841 WARN MaxwellMetrics - Metrics will not be
exposed: metricsReportingType not configured.
23:15:39,110 INFO Maxwell - Maxwell v1.22.0 is booting
(StdoutProducer), starting at Position[BinlogPosition[mysqlbin.000004:611096], lastHeartbeat1637248429242]
23:15:39,194 INFO MysqlSavedSchema - Restoring schema id 6
(last modified at Position[BinlogPosition[mysqlbin.000004:517625], lastHeartbeat1637246435111])
23:15:39,299 INFO MysqlSavedSchema - Restoring schema id 1
(last modified at Position[BinlogPosition[mysqlbin.000004:158612], lastHeartbeat0])
23:15:39,342 INFO MysqlSavedSchema - beginning to play
deltas...
23:15:39,343 INFO MysqlSavedSchema - played 5 deltas in 1ms
{database:test_maxwell,table:test2,type:bootstrapstart,ts:1637248539,data:{}}
23:15:39,367 INFO SynchronousBootstrapper - bootstrapping
started for test_maxwell.test2
23:15:39,369 INFO BinlogConnectorReplicator - Setting initial
binlog pos to: mysql-bin.000004:611096
{database:test_maxwell,table:test2,type:bootstrapinsert,ts:1637248539,data:{id:1,name:aa}}
{database:test_maxwell,table:test2,type:bootstrapinsert,ts:1637248539,data:{id:2,name:bb}}
{database:test_maxwell,table:test2,type:bootstrapinsert,ts:1637248539,data:{id:3,name:cc}}
{database:test_maxwell,table:test2,type:bootstrapinsert,ts:1637248539,data:{id:4,name:dd}}
{database:test_maxwell,table:test2,type:bootstrapcomplete,ts:1637248539,data:{}}
23:15:39,387 INFO SynchronousBootstrapper - bootstrapping
ended for #8 test_maxwell.test2
23:15:39,465 INFO BinaryLogClient - Connected to
hadoop102:3306 at mysql-bin.000004/611096 (sid:6379, cid:108)
23:15:39,465 INFO BinlogConnectorLifecycleListener - Binlog
connected.当数据全部初始化完成以后Maxwell 的元数据会变化 is_complete 字段从 0 变为 1 start_at 字段从 null 变为具体时间(数据同步开始时间) complete_at 字段从 null 变为具体时间(数据同步结束时间)