最近在使用 DataX 进行 PostgreSQL 和 PostgreSQL 之间的数据同步,在数据同步过程中, 遇到了一个问题,在本文简单记录下问题和相应的解决方案。

问题

在一次数据同步中,DataX执行失败,错误信息如下:

具体错误信息为:com.alibaba.datax.common.exception.DataXException: Code:[DBUtilErrorCode-12], Description:[不支持的数据库类型. 请注意查看 DataX 已经支持的数据库类型以及数据库版本.].  - 您的配
置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[country], 字段名称:[1111], 字段Java类型:[java.lang.String]. 请尝试使用数据库函数将其转换datax支持的类型 

我的配置如下:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "postgresqlreader",
          "parameter": {
            "username": "xasdas",
            "password": "xxx",
            "column": [
              "id",
              "country"
            ],
            "connection": [
              {
                "table": [
                  "xx"
                ],
                "jdbcUrl": [
                  "jdbc:postgresql://xxx:5432/xxxx"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "postgresqlwriter",
          "parameter": {
            "username": "xxxx",
            "password": "x",
            "column": [
              "id",
              "country"
            ],
            "preSql": [
            ],
            "postSql": [
             ],
            "connection": [
              {
                "jdbcUrl": "jdbc:postgresql://xxx:5432/xxxx",
                "table": [
                  "xx"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

通过检查数据库字段,发现 country 字段是 jsonb 类型,DataX不支持此类型,DataX 的支持列表:

DataX 内部类型PostgreSQL 数据类型
Longbigint, bigserial, integer, smallint, serial
Doubledouble precision, money, numeric, real
Stringvarchar, char, text, bit, inet
Datedate, time, timestamp
Booleanbool
Bytesbytea

那该如何解决呢,我查看了相关的 issues 和 pr , 发现了一些通过修改源代码的解决方案和一些不修改代码的方案,由于对 java 不熟悉, 就不采用修改代码的方式来解决问题。

解决方案

接下来我来介绍尝试的几种方案和最终采用的方案。

修改列类型

最方便的可能是直接将列的类型改为支持的类型,但由于数据库已经大规模使用了,修改数据库类型会影响正在使用的程序, 所以不采用此方法。

reader 修改字段类型

第二种方法是在 DataX reader 时将列类型转换为 varchar 类型, writer 列不动,看看 postgresql 能否自动将数据类型转换为 jsonb 类型。

修改后的配置如下:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "postgresqlreader",
          "parameter": {
            "username": "xxxx",
            "password": "xxxx",
            "column": [
              "id",
              "country::varchar"
            ],
            "connection": [
              {
                "table": [
                  "xxxx"
                ],
                "jdbcUrl": [
                  "jdbc:postgresql://xxxx3:5432/xxxx"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "postgresqlwriter",
          "parameter": {
            "username": "xxxx",
            "password": "xxx",
            "column": [
              "id",
              "country"
            ],
            "preSql": [
            ],
            "postSql": [
             ],
            "connection": [
              {
                "jdbcUrl": "jdbc:postgresql://xxx:5432/xxx",
                "table": [
                  "xx"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

运行后还是没有成功同步,出现报错:

[不支持的数据库类型. 请注意查看 DataX 已经支持的数据库类型以及数据库版本.].  - 您的配置文件中的列配置信息有误.  
因为DataX 不支持数据库写入这种字段类型. 字段名:[country], 字段类型:[1111], 字段Java类型:[jsonb]. 请修改表中该字段的类型或者不同步该字段

临时列写入

第三种方法是第二种的扩展,除了在 reader 时进行数据转换,writer 时将 reader 转换的数据写入临时列中,写入后通过 postSql 将临时列的值更新至正确的列上。

具体的配置如下:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "postgresqlreader",
          "parameter": {
            "username": "postgres",
            "password": "123456",
            "column": [
              "id",
              "country::varchar"
            ],
            "connection": [
              {
                "table": [
                  "xx"
                ],
                "jdbcUrl": [
                  "jdbc:postgresql://xxx:5432/xxx"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "postgresqlwriter",
          "parameter": {
            "username": "postgres",
            "password": "xx",
            "column": [
              "id",
              "country_tmp"
            ],
            "preSql": [
            ],
            "postSql": [
              "UPDATE matches SET country = country_tmp::JSONB;"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:postgresql://xxxx:5432/xxx",
                "table": [
                  "xxx"
                ]
              }
            ]
          }
        }
      }
    ]
  }
}

运行后顺利同步成功了。此方案就是我采用的最终解决方案。

小结

本文主要介绍了在 postgresql 和 postgresql 之间遇到的一个 jsonb 数据类型不支持的问题,本文简单介绍了这个问题,并给出了不修改代码的解决方案,

参考