解决Mysql同步到ES时date和time字段类型转换问题

寻技术 Mysql 2023年11月16日 178

这个问题中发生问题的es字段映射类型为date类型!!!
字符串类型的会好很多,关键我这边是通过logstash生成索引,并且全量同步的,不好改索引类型!!!

问题描述:

这个报错我该怎么办呢2023-07-10 03:01:39.104 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"ORDER_ID":"4444","SETTLE_DATE_LOC":"20200329","TRANS_DATE":"20200329","TRANS_TIME":"165440","USER_ID":null,"SPACE_NO":null,"SPACE_NAME":null,"PARK_NO":"JC01331","PARK_NAME":"DELETED","CARNO":"沪A666666","CAR_IN_SEQ":"20200329165438071000000000785843","CAR_OUT_SEQ":null,"START_TIME":"20181126103000","END_TIME":null,"LENGTH":null,"OVER_VALID_TIME":null,"LEAVE_TIME":null,"STATUS":"03","PAY_STYLE":null,"TRANS_AMT":"0","PRE_AMT":"0","BALANCE":"0","FAV_AMT":"0","RET_AMT":"0","RET_STATUS":null,"SETTLE_STATUS":null,"SETTLE_DATE":null,"SETTLE_SEQ":null,"SETTLE_AMT":null,"REC_CRT_TIME":"20200329165440","REC_UPD_TIME":"20200329165440","OPR_FLAG":"0","RESVD1":null,"RESVD2":"11111111111111111111111111111111","IF_HOLD":"0","LAST_UPD_TIME":null,"LAST_UPD_RESV":null,"RESVD3":null,"RESVD4":null,"RESVD5":null,"RESVD6":null,"IN_TYPE":"00","IF_RENT":null,"ORG_ID":"0","MCHNT_NO":"1000058","MCHNT_NAME":"捷顺总部大厦停车场","IN_OUT_DATE":"2018-11-26","LEAVE_DATE":null,"IF_SCORE":"0","EXCEP_TYPE":null,"EXCEP_DATE":null,"EXCEP_TIME":null,"EXCEP_STATUS":null,"IF_PAYBACK":null,"DEVICE_TYPE":"112","SUB_DEVICE_TYPE":"r"}],"database":"cstc245_test","destination":"example","es":1688929250000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["ORDER_ID","IN_OUT_DATE"],"sql":"","table":"tbl_ps_order_in","ts":1688929299104,"type":"INSERT"}
2023-07-10 03:01:39.107 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"ORDER_ID":"4444","SETTLE_DATE_LOC":"20200329","TRANS_DATE":"20200329","TRANS_TIME":"165440","USER_ID":null,"SPACE_NO":null,"SPACE_NAME":null,"PARK_NO":"JC01331","PARK_NAME":"DELETED","CARNO":"沪A666666","CAR_IN_SEQ":"20200329165438071000000000785843","CAR_OUT_SEQ":null,"START_TIME":"20181126103000","END_TIME":null,"LENGTH":null,"OVER_VALID_TIME":null,"LEAVE_TIME":null,"STATUS":"03","PAY_STYLE":null,"TRANS_AMT":"0","PRE_AMT":"0","BALANCE":"0","FAV_AMT":"0","RET_AMT":"0","RET_STATUS":null,"SETTLE_STATUS":null,"SETTLE_DATE":null,"SETTLE_SEQ":null,"SETTLE_AMT":null,"REC_CRT_TIME":"20200329165440","REC_UPD_TIME":"20200329165440","OPR_FLAG":"0","RESVD1":null,"RESVD2":"11111111111111111111111111111111","IF_HOLD":"0","LAST_UPD_TIME":null,"LAST_UPD_RESV":null,"RESVD3":null,"RESVD4":null,"RESVD5":null,"RESVD6":null,"IN_TYPE":"00","IF_RENT":null,"ORG_ID":"0","MCHNT_NO":"1000058","MCHNT_NAME":"捷顺总部大厦停车场","IN_OUT_DATE":"2018-11-26","LEAVE_DATE":null,"IF_SCORE":"0","EXCEP_TYPE":null,"EXCEP_DATE":null,"EXCEP_TIME":null,"EXCEP_STATUS":null,"IF_PAYBACK":null,"DEVICE_TYPE":"112","SUB_DEVICE_TYPE":"r"}],"database":"cstc245_test","destination":"example","es":1688929250000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["ORDER_ID","IN_OUT_DATE"],"sql":"","table":"tbl_ps_order_in","ts":1688929299104,"type":"INSERT"} 
Affected indexes: tbl_ps_order_in 
2023-07-10 03:01:39.116 [pool-2-thread-1] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - ES sync commit error ElasticsearchException[Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [START_TIME] of type [date] in document with id '4444'. Preview of field's value: '2018-11-26 10:30:00']]; nested: ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=failed to parse date field [2018-11-26 10:30:00] with format [strict_date_optional_time||epoch_millis]]]; nested: ElasticsearchException[Elasticsearch exception [type=date_time_parse_exception, reason=Failed to parse with all enclosed parsers]];
java.lang.RuntimeException: ES sync commit error ElasticsearchException[Elasticsearch exception [type=mapper_parsing_exception, reason=failed to parse field [START_TIME] of type [date] in document with id '4444'. Preview of field's value: '2018-11-26 10:30:00']]; nested: ElasticsearchException[Elasticsearch exception [type=illegal_argument_exception, reason=failed to parse date field [2018-11-26 10:30:00] with format [strict_date_optional_time||epoch_millis]]]; nested: ElasticsearchException[Elasticsearch exception [type=date_time_parse_exception, reason=Failed to parse with all enclosed parsers]];
    at com.alibaba.otter.canal.client.adapter.es7x.support.ESConnection$ES7xBulkResponse.processFailBulkResponse(ESConnection.java:470) ~[na:na]
    at com.alibaba.otter.canal.client.adapter.es7x.support.ES7xTemplate.commit(ES7xTemplate.java:175) ~[na:na]
    at com.alibaba.otter.canal.client.adapter.es.core.service.ESSyncService.commit(ESSyncService.java:873) ~[na:na]
    at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.sync(ESAdapter.java:86) ~[na:na]
    at com.alibaba.otter.canal.client.adapter.ProxyOuterAdapter.sync(ProxyOuterAdapter.java:42) ~[client-adapter.common-1.1.7-SNAPSHOT.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.batchSync(AdapterProcessor.java:139) ~[client-adapter.launcher-1.1.7-SNAPSHOT.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$1(AdapterProcessor.java:97) ~[client-adapter.launcher-1.1.7-SNAPSHOT.jar:na]
    at java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:895) ~[na:1.8.0_262]
    at com.alibaba.otter.canal.adapter.launcher.loader.AdapterProcessor.lambda$null$2(AdapterProcessor.java:94) ~[client-adapter.launcher-1.1.7-SNAPSHOT.jar:na]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_262]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_262]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_262]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_262]
2023-07-10 03:01:39.116 [Thread-4] ERROR c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - Outer adapter sync failed!  Error sync and rollback, execute times: 17
^C

根据错误日志,似乎是Elasticsearch无法解析日期字段START_TIME。这个字段的格式是YYYY-MM-DD HH:MM:SS,而Elasticsearch期望的格式是strict_date_optional_time||epoch_millis

strict_date_optional_time格式的例子是2015-01-01T12:10:30Z,而epoch_millis则是自1970-01-01 00:00:00 UTC以来的毫秒数。错误提示的意思是Elasticsearch试图使用这两种格式解析START_TIME字段,但是都失败了。

从这个错误信息来看,你可能需要将MySQL中的日期时间数据转换为Elasticsearch可以接受的格式。你可以尝试将日期时间数据转换为ISO 8601格式,或者转换为epoch_millis格式。

如果你有权限更改Elasticsearch索引的映射,你也可以更改START_TIME字段的类型为text,这样Elasticsearch就会将其当作普通字符串处理,而不会尝试将其解析为日期。

这种情况下,你可能需要更改日期字段的处理方式,你可以在SQL查询中使用UNIX_TIMESTAMP()函数将日期时间转换为epoch格式,或者在数据同步到Elasticsearch之前进行格式转换。具体解决方案需要根据你的实际情况来确定。

提问: 试了没用!!!

将包含 'date' 的字段使用 DATE_FORMAT 转换,只需要年月日就行,包含time的字段需要年月日时分秒并且尝试将日期时间数据转换为ISO 8601格式
SELECT 
    a.ORDER_ID as _id,
    a.ORDER_ID,
    a.SETTLE_DATE_LOC,
    a.TRANS_DATE,
    a.TRANS_TIME,
    a.USER_ID,
    a.SPACE_NO,
    a.SPACE_NAME,
    a.PARK_NO,
    a.PARK_NAME,
    a.CARNO,
    a.CAR_IN_SEQ,
    a.CAR_OUT_SEQ,
    a.START_TIME,
    a.END_TIME,
    a.LENGTH,
    a.OVER_VALID_TIME,
    a.LEAVE_TIME,
    a.STATUS,
    a.PAY_STYLE,
    a.TRANS_AMT,
    a.PRE_AMT,
    a.BALANCE,
    a.FAV_AMT,
    a.RET_AMT,
    a.RET_STATUS,
    a.SETTLE_STATUS,
    a.SETTLE_DATE,
    a.SETTLE_SEQ,
    a.SETTLE_AMT,
    a.REC_CRT_TIME,
    a.REC_UPD_TIME,
    a.OPR_FLAG,
    a.RESVD1,
    a.RESVD2,
    a.IF_HOLD,
    a.LAST_UPD_TIME,
    a.LAST_UPD_RESV,
    a.RESVD3,
    a.RESVD4,
    a.RESVD5,
    a.RESVD6,
    a.IN_TYPE,
    a.IF_RENT,
    a.ORG_ID,
    a.MCHNT_NO,
    a.MCHNT_NAME,
    a.IN_OUT_DATE,
    a.LEAVE_DATE,
    a.IF_SCORE,
    a.EXCEP_TYPE,
    a.EXCEP_DATE,
    a.EXCEP_TIME,
    a.EXCEP_STATUS,
    a.IF_PAYBACK,
    a.DEVICE_TYPE,
    a.SUB_DEVICE_TYPE
FROM tbl_ps_order_in a

回答

你想将查询中包含 ‘date’ 的字段用 DATE_FORMAT 转换为仅包含年月日的格式,同时,包含 ‘time’ 的字段需要年月日时分秒,而且要将日期时间数据转换为ISO 8601格式。以下是你可以使用的SQL语句:

SELECT 
    a.ORDER_ID as _id,
    a.ORDER_ID,
    DATE_FORMAT(a.SETTLE_DATE_LOC, '%Y-%m-%d') as SETTLE_DATE_LOC,
    DATE_FORMAT(a.TRANS_DATE, '%Y-%m-%d') as TRANS_DATE,
    DATE_FORMAT(a.TRANS_TIME, '%Y-%m-%dT%H:%i:%s') as TRANS_TIME,
    a.USER_ID,
    a.SPACE_NO,
    a.SPACE_NAME,
    a.PARK_NO,
    a.PARK_NAME,
    a.CARNO,
    a.CAR_IN_SEQ,
    a.CAR_OUT_SEQ,
    DATE_FORMAT(a.START_TIME, '%Y-%m-%dT%H:%i:%s') as START_TIME,
    DATE_FORMAT(a.END_TIME, '%Y-%m-%dT%H:%i:%s') as END_TIME,
    a.LENGTH,
    a.OVER_VALID_TIME,
    DATE_FORMAT(a.LEAVE_TIME, '%Y-%m-%dT%H:%i:%s') as LEAVE_TIME,
    a.STATUS,
    a.PAY_STYLE,
    a.TRANS_AMT,
    a.PRE_AMT,
    a.BALANCE,
    a.FAV_AMT,
    a.RET_AMT,
    a.RET_STATUS,
    a.SETTLE_STATUS,
    DATE_FORMAT(a.SETTLE_DATE, '%Y-%m-%d') as SETTLE_DATE,
    a.SETTLE_SEQ,
    a.SETTLE_AMT,
    DATE_FORMAT(a.REC_CRT_TIME, '%Y-%m-%dT%H:%i:%s') as REC_CRT_TIME,
    DATE_FORMAT(a.REC_UPD_TIME, '%Y-%m-%dT%H:%i:%s') as REC_UPD_TIME,
    a.OPR_FLAG,
    a.RESVD1,
    a.RESVD2,
    a.IF_HOLD,
    DATE_FORMAT(a.LAST_UPD_TIME, '%Y-%m-%dT%H:%i:%s') as LAST_UPD_TIME,
    a.LAST_UPD_RESV,
    a.RESVD3,
    a.RESVD4,
    a.RESVD5,
    a.RESVD6,
    a.IN_TYPE,
    a.IF_RENT,
    a.ORG_ID,
    a.MCHNT_NO,
    a.MCHNT_NAME,
    DATE_FORMAT(a.IN_OUT_DATE, '%Y-%m-%d') as IN_OUT_DATE,
    DATE_FORMAT(a.LEAVE_DATE, '%Y-%m-%d') as LEAVE_DATE,
    a.IF_SCORE,
    a.EXCEP_TYPE,
    DATE_FORMAT(a.EXCEP_DATE, '%Y-%m-%d') as EXCEP_DATE,
    DATE_FORMAT(a.EXCEP_TIME, '%Y-%m-%dT%H:%i:%s') as EXCEP_TIME,
    a.EXCEP_STATUS,
    a.IF_PAYBACK,
    a.DEVICE_TYPE,
    a.SUB_DEVICE_TYPE
FROM tbl_ps_order_in a

注意,这个 SQL 查询是针对 MySQL 数据库的,假设所有的时间字段都是以字符串形式存储,如果字段的存储形式不同,这个查询可能需要做相应的调整。此外,‘%Y-%m-%dT%H:%i:%s’ 是 ISO 8601 时间格式的 MySQL 版本,如果你的数据库不支持这种格式,你可能需要在应用层面进行处理。

这个貌似解决不了!!!!!

下面是重头戏

你可以使用UNIX_TIMESTAMP()函数将日期和时间字段转换为epoch格式。这个函数将日期和时间的值转换为自1970-01-01 00:00:00 UTC以来的秒数。以下是你的SQL查询的一部分,展示了如何进行这种转换:

SELECT 
    a.ORDER_ID as _id,
    a.ORDER_ID,
    UNIX_TIMESTAMP(a.SETTLE_DATE_LOC) as SETTLE_DATE_LOC,
    UNIX_TIMESTAMP(a.TRANS_DATE) as TRANS_DATE,
    UNIX_TIMESTAMP(STR_TO_DATE(a.TRANS_TIME, '%Y-%m-%d %H:%i:%s')) as TRANS_TIME,
    a.USER_ID,
    ...
    UNIX_TIMESTAMP(STR_TO_DATE(a.START_TIME, '%Y-%m-%d %H:%i:%s')) as START_TIME,
    UNIX_TIMESTAMP(STR_TO_DATE(a.END_TIME, '%Y-%m-%d %H:%i:%s')) as END_TIME,
    ...
    UNIX_TIMESTAMP(a.SETTLE_DATE) as SETTLE_DATE,
    ...
    UNIX_TIMESTAMP(STR_TO_DATE(a.REC_CRT_TIME, '%Y-%m-%d %H:%i:%s')) as REC_CRT_TIME,
    UNIX_TIMESTAMP(STR_TO_DATE(a.REC_UPD_TIME, '%Y-%m-%d %H:%i:%s')) as REC_UPD_TIME,
    ...
    UNIX_TIMESTAMP(STR_TO_DATE(a.LAST_UPD_TIME, '%Y-%m-%d %H:%i:%s')) as LAST_UPD_TIME,
    ...
    UNIX_TIMESTAMP(a.IN_OUT_DATE) as IN_OUT_DATE,
    UNIX_TIMESTAMP(a.LEAVE_DATE) as LEAVE_DATE,
    ...
    UNIX_TIMESTAMP(a.EXCEP_DATE) as EXCEP_DATE,
    UNIX_TIMESTAMP(STR_TO_DATE(a.EXCEP_TIME, '%Y-%m-%d %H:%i:%s')) as EXCEP_TIME,
    ...
FROM tbl_ps_order_in a

这里,我将所有的日期时间字段都用UNIX_TIMESTAMP()函数进行了处理。对于时间字段(如TRANS_TIMESTART_TIME等),我还使用了STR_TO_DATE()函数,将其从字符串格式转换为日期时间格式,然后再调用UNIX_TIMESTAMP()

注意,UNIX_TIMESTAMP()函数返回的是秒数,而Elasticsearch的epoch_millis格式是毫秒数。如果你需要毫秒数,你可以将UNIX_TIMESTAMP()函数的结果乘以1000。

以上的方案适用于MySQL。如果你的数据库是其他类型,可能需要用不同的方法来进行转换。

**提问:**试了有用!!!!

SELECT 
    a.ORDER_ID as _id,
    a.ORDER_ID,
    a.SETTLE_DATE_LOC,
    a.TRANS_DATE,
    a.TRANS_TIME,
    a.USER_ID,
    a.SPACE_NO,
    a.SPACE_NAME,
    a.PARK_NO,
    a.PARK_NAME,
    a.CARNO,
    a.CAR_IN_SEQ,
    a.CAR_OUT_SEQ,
    a.START_TIME,
    a.END_TIME,
    a.LENGTH,
    a.OVER_VALID_TIME,
    a.LEAVE_TIME,
    a.STATUS,
    a.PAY_STYLE,
    a.TRANS_AMT,
    a.PRE_AMT,
    a.BALANCE,
    a.FAV_AMT,
    a.RET_AMT,
    a.RET_STATUS,
    a.SETTLE_STATUS,
    a.SETTLE_DATE,
    a.SETTLE_SEQ,
    a.SETTLE_AMT,
    a.REC_CRT_TIME,
    a.REC_UPD_TIME,
    a.OPR_FLAG,
    a.RESVD1,
    a.RESVD2,
    a.IF_HOLD,
    a.LAST_UPD_TIME,
    a.LAST_UPD_RESV,
    a.RESVD3,
    a.RESVD4,
    a.RESVD5,
    a.RESVD6,
    a.IN_TYPE,
    a.IF_RENT,
    a.ORG_ID,
    a.MCHNT_NO,
    a.MCHNT_NAME,
    a.IN_OUT_DATE,
    a.LEAVE_DATE,
    a.IF_SCORE,
    a.EXCEP_TYPE,
    a.EXCEP_DATE,
    a.EXCEP_TIME,
    a.EXCEP_STATUS,
    a.IF_PAYBACK,
    a.DEVICE_TYPE,
    a.SUB_DEVICE_TYPE
FROM tbl_ps_order_in a将包含 'date' 的字段使用 DATE_FORMAT 转换,只需要年月日就行,包含time的字段需要格式化成年月日时分秒,并且用UNIX_TIMESTAMP()函数进行处理,然后还要将UNIX_TIMESTAMP()函数的结果乘以1000

回答

根据您的需求,我将对包含 ‘date’ 的字段使用 DATE_FORMAT 进行年月日的格式化,对包含 ‘time’ 的字段,先将其格式化为年月日时分秒,然后使用 UNIX_TIMESTAMP() 函数处理,最后将结果乘以1000。以下是更新后的 SQL 查询:

SELECT 
    a.ORDER_ID as _id,
    a.ORDER_ID,
    DATE_FORMAT(a.SETTLE_DATE_LOC, '%Y-%m-%d') as SETTLE_DATE_LOC,
    DATE_FORMAT(a.TRANS_DATE, '%Y-%m-%d') as TRANS_DATE,
    UNIX_TIMESTAMP(DATE_FORMAT(a.TRANS_TIME, '%Y-%m-%d %H:%i:%s')) * 1000 as TRANS_TIME,
    a.USER_ID,
    a.SPACE_NO,
    a.SPACE_NAME,
    a.PARK_NO,
    a.PARK_NAME,
    a.CARNO,
    a.CAR_IN_SEQ,
    a.CAR_OUT_SEQ,
    UNIX_TIMESTAMP(DATE_FORMAT(a.START_TIME, '%Y-%m-%d %H:%i:%s')) * 1000 as START_TIME,
    UNIX_TIMESTAMP(DATE_FORMAT(a.END_TIME, '%Y-%m-%d %H:%i:%s')) * 1000 as END_TIME,
    a.LENGTH,
    a.OVER_VALID_TIME,
    UNIX_TIMESTAMP(DATE_FORMAT(a.LEAVE_TIME, '%Y-%m-%d %H:%i:%s')) * 1000 as LEAVE_TIME,
    a.STATUS,
    a.PAY_STYLE,
    a.TRANS_AMT,
    a.PRE_AMT,
    a.BALANCE,
    a.FAV_AMT,
    a.RET_AMT,
    a.RET_STATUS,
    a.SETTLE_STATUS,
    DATE_FORMAT(a.SETTLE_DATE, '%Y-%m-%d') as SETTLE_DATE,
    a.SETTLE_SEQ,
    a.SETTLE_AMT,
    UNIX_TIMESTAMP(DATE_FORMAT(a.REC_CRT_TIME, '%Y-%m-%d %H:%i:%s')) * 1000 as REC_CRT_TIME,
    UNIX_TIMESTAMP(DATE_FORMAT(a.REC_UPD_TIME, '%Y-%m-%d %H:%i:%s')) * 1000 as REC_UPD_TIME,
    a.OPR_FLAG,
    a.RESVD1,
    a.RESVD2,
    a.IF_HOLD,
    UNIX_TIMESTAMP(DATE_FORMAT(a.LAST_UPD_TIME, '%Y-%m-%d %H:%i:%s')) * 1000 as LAST_UPD_TIME,
    a.LAST_UPD_RESV,
    a.RESVD3,
    a.RESVD4,
    a.RESVD5,
    a.RESVD6,
    a.IN_TYPE,
    a.IF_RENT,
    a.ORG_ID,
    a.MCHNT_NO,
    a.MCHNT_NAME,
    DATE_FORMAT(a.IN_OUT_DATE, '%Y-%m-%d') as IN_OUT_DATE,
    DATE_FORMAT(a.LEAVE_DATE, '%Y-%m-%d') as LEAVE_DATE,
    a.IF_SCORE,
    a.EXCEP_TYPE,
    DATE_FORMAT(a.EXCEP_DATE, '%Y-%m-%d') as EXCEP_DATE,
    UNIX_TIMESTAMP(DATE_FORMAT(a.EXCEP_TIME, '%Y-%m-%d %H:%i:%s')) * 1000 as EXCEP_TIME,
    a.EXCEP_STATUS,
    a.IF_PAYBACK,
    a.DEVICE_TYPE,
    a.SUB_DEVICE_TYPE
FROM tbl_ps_order_in a

注意:上述查询是在假设所有包含 ‘time’ 的字段都是 TIMESTAMP 类型,可以被转换为 ‘%Y-%m-%d %H:%i:%s’ 这种格式。如果不是这种类型,可能需要进行一些调整。

关闭

用微信“扫一扫”