DataX MysqlReader插件深度解析

一、概述

MysqlReaderDataX 框架中专门用于从 MySQL 数据库读取数据的核心插件。它通过 JDBC 连接器与远程 MySQL 数据库建立连接,执行 SQL 查询语句,并将结果数据转换为 DataX 统一的数据格式传递给下游 Writer 插件处理。

对于用户配置Table、Column、Where的信息,MysqlReader将其拼接为SQL语句发送到Mysql数据库;对于用户配置querySql信息,MysqlReader直接将其发送到Mysql数据库。

二、核心特性

  • JDBC连接支持:基于标准 JDBC 协议连接 MySQL 数据库
  • 灵活配置:支持表名、列名、筛选条件等多种配置方式
  • 并发读取:通过 splitPk 参数实现数据分片并发读取
  • 自定义SQL:支持通过 querySql 参数执行复杂查询语句
  • 类型转换:自动处理 MySQL 数据类型到 DataX 内部类型的映射

三、配置详解

3.1 基础配置示例

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "splitPk": "db_id",
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

核心参数说明

必需参数

  • jdbcUrl:MySQL 数据库的 JDBC 连接字符串,支持配置多个 IP 地址进行连接探测
  • username:数据库用户名
  • password:数据库密码
  • table:需要同步的表名,支持多表配置

可选参数

  • column:需要同步的列名集合,支持列裁剪、列换序和常量配置
  • splitPk:数据分片主键,用于并发读取,仅支持整型数据
  • where:数据筛选条件,用于增量同步
  • querySql:自定义 SQL 查询语句,优先级高于 tablecolumnwhere 配置

四、类型转换映射

MysqlReader 支持大部分 MySQL 数据类型,转换规则如下:

DataX 内部类型 MySQL 数据类型
Long int, tinyint, smallint, mediumint, int, bigint
Double float, double, decimal
String varchar, char, tinytext, text, mediumtext, longtext, year
Date date, datetime, timestamp, time
Boolean bit, bool
Bytes tinyblob, mediumblob, blob, longblob, varbinary

注意事项:

  • 除上述类型外,其他类型均不支持
  • tinyint(1) 被视为整型
  • year 被视为字符串类型
  • bit 类型属于未定义行为

五、性能优化

并发读取配置

通过配置 splitPk 参数启用并发读取:

{
    "reader": {
        "name": "mysqlreader",
        "parameter": {
            "splitPk": "id",
            "connection": [
                {
                    "table": ["user_table"],
                    "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]
                }
            ]
        }
    }
}

性能测试数据

单表性能测试

通道数 是否按主键切分 速度(Rec/s) 流量(MB/s)
1 183,185 18.11
4 329,733 32.60
8 549,556 54.33

分表性能测试(32张分表)

通道数 速度(Rec/s) 流量(MB/s)
1 202,241 20.06
4 726,358 72.04
8 1,074,405 106.56
16 1,227,892 121.79

六、约束与限制

数据一致性

由于并发读取机制,多个分片任务不属于同一事务,因此无法保证强一致性。解决方案:

  1. 单线程同步:关闭 splitPk,保证数据一致性但速度较慢
  2. 静态数据:锁表或关闭写入,确保数据在同步期间不变

增量同步

支持两种增量同步方式:

  1. 时间戳方式:通过 modify_time 字段追踪数据变更
  2. 自增ID方式:通过最大 id 值同步新增数据

编码处理

自动适配各类编码,但对编码设置混乱的数据库可能导致乱码问题。

七、最佳实践

增量数据同步配置

{
    "reader": {
        "name": "mysqlreader",
        "parameter": {
            "where": "gmt_create > '2023-01-01'",
            "connection": [
                {
                    "table": ["order_table"],
                    "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]
                }
            ]
        }
    }
}

自定义复杂查询

{
    "reader": {
        "name": "mysqlreader",
        "parameter": {
            "querySql": [
                "SELECT a.id, b.name FROM table_a a JOIN table_b b ON a.id = b.a_id WHERE a.status = 1"
            ],
            "connection": [
                {
                    "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/database"]
                }
            ]
        }
    }
}

八、常见问题

Q: 连接报错如何排查?

A: 使用命令行测试连接:

mysql -u<username> -p<password> -h<ip> -D<database> -e "select * from <表名>"

Q: 为什么并发配置不生效?

A: 检查 splitPk 是否配置了有效的整型主键字段。

Q: where条件不生效?

A: 检查 是否配置了querySql 字段,querySql 字段优先级高于 tablecolumnwhere 配置。