DataX MysqlReader插件深度解析

一、概述
MysqlReader 是 DataX 框架中专门用于从 MySQL 数据库读取数据的核心插件。它通过 JDBC 连接器与远程 MySQL 数据库建立连接,执行 SQL 查询语句,并将结果数据转换为 DataX 统一的数据格式传递给下游 Writer 插件处理。
对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。
二、核心特性
- JDBC连接支持:基于标准 JDBC 协议连接 MySQL 数据库
- 灵活配置:支持表名、列名、筛选条件等多种配置方式
- 并发读取:通过
splitPk参数实现数据分片并发读取 - 自定义SQL:支持通过
querySql参数执行复杂查询语句 - 类型转换:自动处理 MySQL 数据类型到 DataX 内部类型的映射
三、配置详解
3.1 基础配置示例
{
"job": {
"setting": {
"speed": {
"channel": 3
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"name"
],
"splitPk": "db_id",
"connection": [
{
"table": [
"table"
],
"jdbcUrl": [
"jdbc:mysql://127.0.0.1:3306/database"
]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print":true
}
}
}
]
}
}
核心参数说明
必需参数
jdbcUrl:MySQL 数据库的 JDBC 连接字符串,支持配置多个 IP 地址进行连接探测username:数据库用户名password:数据库密码table:需要同步的表名,支持多表配置
可选参数
column:需要同步的列名集合,支持列裁剪、列换序和常量配置splitPk:数据分片主键,用于并发读取,仅支持整型数据where:数据筛选条件,用于增量同步querySql:自定义 SQL 查询语句,优先级高于table、column、where配置
四、类型转换映射
MysqlReader 支持大部分 MySQL 数据类型,转换规则如下:
| DataX 内部类型 | MySQL 数据类型 |
|---|---|
Long |
int, tinyint, smallint, mediumint, int, bigint |
Double |
float, double, decimal |
String |
varchar, char, tinytext, text, mediumtext, longtext, year |
Date |
date, datetime, timestamp, time |
Boolean |
bit, bool |
Bytes |
tinyblob, mediumblob, blob, longblob, varbinary |
注意事项:
- 除上述类型外,其他类型均不支持
tinyint(1)被视为整型year被视为字符串类型bit类型属于未定义行为
五、性能优化
并发读取配置
通过配置 splitPk 参数启用并发读取:
{
"reader": {
"name": "mysqlreader",
"parameter": {
"splitPk": "id",
"connection": [
{
"table": ["user_table"],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]
}
]
}
}
}
性能测试数据
单表性能测试
| 通道数 | 是否按主键切分 | 速度(Rec/s) | 流量(MB/s) |
|---|---|---|---|
| 1 | 否 | 183,185 | 18.11 |
| 4 | 是 | 329,733 | 32.60 |
| 8 | 是 | 549,556 | 54.33 |
分表性能测试(32张分表)
| 通道数 | 速度(Rec/s) | 流量(MB/s) |
|---|---|---|
| 1 | 202,241 | 20.06 |
| 4 | 726,358 | 72.04 |
| 8 | 1,074,405 | 106.56 |
| 16 | 1,227,892 | 121.79 |
六、约束与限制
数据一致性
由于并发读取机制,多个分片任务不属于同一事务,因此无法保证强一致性。解决方案:
- 单线程同步:关闭
splitPk,保证数据一致性但速度较慢 - 静态数据:锁表或关闭写入,确保数据在同步期间不变
增量同步
支持两种增量同步方式:
- 时间戳方式:通过
modify_time字段追踪数据变更 - 自增ID方式:通过最大
id值同步新增数据
编码处理
自动适配各类编码,但对编码设置混乱的数据库可能导致乱码问题。
七、最佳实践
增量数据同步配置
{
"reader": {
"name": "mysqlreader",
"parameter": {
"where": "gmt_create > '2023-01-01'",
"connection": [
{
"table": ["order_table"],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]
}
]
}
}
}
自定义复杂查询
{
"reader": {
"name": "mysqlreader",
"parameter": {
"querySql": [
"SELECT a.id, b.name FROM table_a a JOIN table_b b ON a.id = b.a_id WHERE a.status = 1"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]
}
]
}
}
}
八、常见问题
Q: 连接报错如何排查?
A: 使用命令行测试连接:
mysql -u<username> -p<password> -h<ip> -D<database> -e "select * from <表名>"
Q: 为什么并发配置不生效?
A: 检查 splitPk 是否配置了有效的整型主键字段。
Q: where条件不生效?
A: 检查 是否配置了querySql 字段,querySql 字段优先级高于 table、column、where 配置。
评论