DataX 数据转换(Transformer)详解

概述

DataX TransformerDataX 框架中实现 ETL(Extract, Transform, Load)流程中
T(Transform)环节的核心组件。在数据同步和传输过程中,用户常常需要对数据进行特殊定制化处理,如裁剪列、转换列格式、数据过滤等操作,这些都可以通过
Transformer 来实现。

运行模型

TransformerDataX 的数据处理流程中处于中间位置:

Reader → Transformer → Writer

数据从 Reader 读取后,在传输到 Writer 之前,会经过 Transformer 进行一系列转换处理。

内置 UDF 函数手册

1. dx_substr - 字符串截取

  • 参数:字段编号、开始位置、截取长度
  • 功能:从指定位置截取指定长度的字符串
  • 示例
    dx_substr(1,"2","5")  // "dataxTest" => "taxTe"
    

2. dx_pad - 字符串填充

  • 参数:字段编号、填充方向(“l”/“r”)、目标长度、填充字符
  • 功能:在字符串头部或尾部填充指定字符
  • 示例
    dx_pad(1,"l","4","A")  // "xyz" => "Axyz"
    

3. dx_replace - 字符串替换

  • 参数:字段编号、开始位置、替换长度、替换字符串
  • 功能:从指定位置替换指定长度的字符串
  • 示例
    dx_replace(1,"2","4","****")  // "dataxTest" => "da****est"
    

4. dx_filter - 数据过滤

  • 参数:字段编号、运算符、比较值
  • 功能:根据条件过滤数据行
  • 支持运算符like, not like, >, =, <, >=, !=, <=
  • 示例
    dx_filter(1,"like","dataTest")
    dx_filter(1,">=","10")
    

5. dx_digest - 数据摘要

  • 参数:字段编号、哈希算法(md5/sha1)、大小写(toUpperCase/toLowerCase)
  • 功能:生成指定类型的哈希值
  • 示例
    dx_digest(1,"md5","toUpperCase")  // "xyzzzzz" => "9CDFFC4FA4E45A99DB8BBCD762ACFFA2"
    

6. dx_groovy - Groovy 脚本执行

  • 参数:Groovy 代码、额外包导入
  • 功能:执行自定义 Groovy 脚本进行复杂转换
  • 特点
    • 只能调用一次
    • 支持 java.langjava.util
    • 可访问 record 和各种 Column 对象
  • 示例
    dx_groovy("Column column = record.getColumn(1); String newValue = column.asString().substring(0, 3); record.setColumn(1, new StringColumn(newValue)); return record;")
    

Job 配置示例

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "column": [
              ...
            ],
            "sliceRecordCount": 100
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "print": false,
            "encoding": "UTF-8"
          }
        },
        "transformer": [
          {
            "name": "dx_substr",
            "parameter": {
              "columnIndex": 5,
              "paras": [
                "1",
                "3"
              ]
            }
          },
          {
            "name": "dx_replace",
            "parameter": {
              "columnIndex": 4,
              "paras": [
                "3",
                "4",
                "****"
              ]
            }
          }
        ]
      }
    ]
  }
}

计量和脏数据处理

运行时计量展示

Total 1000000 records, 22000000 bytes | Transform 100000 records(in), 10000 records(out) | Speed 2.10MB/s, 100000 records/s | Error 0 records, 0 bytes | Percentage 100.00%

作业完成计量展示

任务启动时刻                    : 2015-03-10 17:34:21
任务结束时刻                    : 2015-03-10 17:34:31
转换输入总数                    :             1000000
转换输出总数                    :             1000000
同步失败总数                    :                   0

Transformer 的不足点

1. 功能局限性

  • 缺乏复杂数据处理能力:内置 UDF 函数相对简单,难以处理复杂的业务逻辑
  • 不支持多字段联合判断:如 dx_filter 无法进行多个字段的组合条件判断
  • 缺少聚合计算功能:不支持 sum、count、group by 等聚合操作

2. 性能瓶颈

  • 逐行处理开销:每个 transformer 都需要对每条记录进行处理,增加了数据传输的延迟
  • 内存占用较高:在处理大量数据时,转换过程会占用较多内存资源
  • 无法并行优化:转换操作通常是串行执行,难以充分利用多核 CPU 资源

3. 可维护性问题

  • 脚本调试困难:特别是 dx_groovy 脚本,缺乏有效的调试工具
  • 错误定位复杂:当转换链路较长时,很难快速定位问题所在
  • 版本兼容风险:自定义 Groovy 脚本可能在 DataX 版本升级时出现兼容性问题

4. 扩展性限制

  • 第三方库支持有限dx_groovy 不支持第三方 jar 包,限制了功能扩展
  • 包导入受限:只能使用有限的 Java 标准库,无法引入业务相关的工具类
  • API 接口封闭recordcolumn 的操作接口相对固定,不够灵活

5. 配置复杂性

  • JSON 配置冗长:复杂的转换逻辑会导致配置文件过于庞大
  • 参数传递不便:缺乏参数化配置能力,相似的转换逻辑需要重复配置
  • 依赖管理缺失:无法很好地管理转换逻辑之间的依赖关系

最佳实践建议

  1. 合理规划转换链路:尽量减少不必要的转换步骤
  2. 优先使用内置函数:除非必要,避免使用 dx_groovy 降低维护成本
  3. 关注性能监控:密切监控转换过程的性能指标
  4. 做好异常处理:合理配置错误容忍度,避免因少量脏数据导致整个任务失败