使用环境

操作系统:CentOS7
Mysql版本:5.7
Python:2.7

安装配置Mysql

利用Yum Repository来安装Mysql

下载安装可用的mysql-server rpm包

1
2
wget -i -c http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm
yum localinstall -y mysql57-community-release-el7-10.noarch.rpm

最终显示下图为成功
avatar

安装Mysql Server

1
yum -y install mysql-community-server

最终显示下图为成功
avatar

进行Mysql相关配置

启动Mysql服务

1
systemctl start mysqld.service

查看Mysql状态

1
systemctl status mysqld.service

avatar

查看Mysql初始密码,获取得qOK%qwS7<jI/

1
grep "password" /var/log/mysqld.log

avatar

登录数据库进行口令修改,此处修改为12@sadAAdsa242

1
2
mysql -uroot -p
ALTER USER 'root'@'localhost' IDENTIFIED BY 'new password';

最终显示下图为修改成功
avatar

设置允许外部连接(如果没有外部需求此项可不操作)

1
2
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'your password' WITH GRANT OPTION;
flush privileges;

avatar

修改Mysql字符集

1
vim /etc/my.cnf

增加以下内容

1
2
3
4
5
[client]
default-character-set=utf8
[mysqld]
character-set-server=utf8
collation-server=utf8_general_ci

avatar

重启Mysql

1
systemctl restart mysqld.service

查看修改结果

1
2
mysql -uroot -p
status

avatar

开启Mysql Binlog

编辑以下mysql配置文件

1
vim /etc/my.cnf

增加以下内容

1
2
3
4
[mysqld]
server-id=1
log-bin=mysql-bin
binlog_format=ROW

avatar

重启Mysql

1
systemctl restart mysqld.service

使用mysql-replication监控MySQL的binlog变动

设置同步账号权限

1
2
3
mysql -uroot -p
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY 'your password';
flush privileges;

最终显示下图为设置成功
avatar

安装mysql-replication库

CentOS7默认没有pip,需要先进行安装

1
2
yum -y install epel-release
yum -y install python-pip

Python2.7已无法使用21及以上pip,因此升级时需要限定版本

1
pip install --upgrade "pip < 21.0"

安装mysql-replication库

1
pip install mysql-replication

avatar

Python脚本demo

将下述demo保存成脚本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# -*- coding: utf-8 -*-

import datetime
import json

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
)


class DateEncoder(json.JSONEncoder):
"""
自定义类,解决报错:
TypeError: Object of type 'datetime' is not JSON serializable
"""

def default(self, obj):
if isinstance(obj, datetime.datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')

elif isinstance(obj, datetime.date):
return obj.strftime("%Y-%m-%d")

else:
return json.JSONEncoder.default(self, obj)


# 配置数据库信息
mysql_settings = {
'host': 'your ip',
'port': 3306,
'user': 'root',
'passwd': 'your password'
}


def main():
# 实例化binlog 流对象
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=100, # slave标识,唯一
blocking=True, # 阻塞等待后续事件
# 设定只监控写操作:增、删、改
only_events=[
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent
]
)

for binlogevent in stream:
# binlogevent.dump() # 打印所有信息

for row in binlogevent.rows:
# 打印 库名 和 表名
event = {"schema": binlogevent.schema, "table": binlogevent.table}

if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["data"] = row["values"]

elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["data"] = row["after_values"] # 注意这里不是values

elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["data"] = row["values"]

print(json.dumps(event, cls=DateEncoder))
# sys.stdout.flush()

# stream.close() # 如果使用阻塞模式,这行多余了


if __name__ == '__main__':
main()
"""
输出数据格式
{
"schema": "database_name", # 数据库名
"table": "table_name", # 表名
"action": "update", # 动作 insert、delete、update
"data": { # 数据,里边包含所有字段
"id": 1,
"data": "data",
"data2": "data2",
"update_time": "2019-06-06 16:59:06"
}
}
"""

启动进行监测binlog文件,使用以下命令进行测试

1
2
3
4
5
6
CREATE DATABASE test;
use test;
CREATE TABLE test1 (id int NOT NULL AUTO_INCREMENT, data VARCHAR(255), data2 VARCHAR(255), update_time TIMESTAMP, PRIMARY KEY(id));
INSERT INTO test1 (data,data2) VALUES ("param1", "param2");
UPDATE test1 SET data = "new_param1", data2="new_param2" WHERE id = 1;
DELETE FROM test1 WHERE id = 1;

avatar

查看实际监控效果
avatar