Skip to content

Commit 03fc6c9

Browse files
committed
Merge remote-tracking branch 'origin/1.8_release_4.0.x' into temp_1.10_4.0_merge_2
# Conflicts: # flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerConnection.java
2 parents 7ce1e77 + f7be544 commit 03fc6c9

File tree

4 files changed

+25
-9
lines changed

4 files changed

+25
-9
lines changed

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerConnection.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,23 @@ public void connect() {
143143
oracleVersion = connection.getMetaData().getDatabaseMajorVersion();
144144
isOracle10 = oracleVersion == 10;
145145

146-
if (isOracle10) {
147-
//oracle10开启logMiner之前 需要设置会话级别的日期格式 否则sql语句会含有todate函数 而不是todate函数计算后的值
146+
//修改session级别的 NLS_DATE_FORMAT 值为 "YYYY-MM-DD HH24:MI:SS",否则在解析日志时 redolog出现 TO_DATE('18-APR-21', 'DD-MON-RR')
147+
boolean isAlterDateFormat = false;
148+
try {
149+
try (PreparedStatement preparedStatement = connection.prepareStatement(SqlUtil.SQL_QUERY_NLS_DATE_FORMAT)) {
150+
try (ResultSet resultSet = preparedStatement.executeQuery(SqlUtil.SQL_QUERY_NLS_DATE_FORMAT)) {
151+
while (resultSet.next()) {
152+
String nlsDateFormat = resultSet.getString(1);
153+
isAlterDateFormat = !nlsDateFormat.equalsIgnoreCase("YYYY-MM-DD HH24:MI:SS");
154+
LOG.info("nlsDateFormat {}, isAlterDateFormat {}", nlsDateFormat, isAlterDateFormat);
155+
}
156+
}
157+
}
158+
} catch (Exception e) {
159+
LOG.info("query nlsDateFormat failed ,exception is {}", ExceptionUtil.getErrorMessage(e));
160+
}
161+
162+
if (isAlterDateFormat) {
148163
try (PreparedStatement preparedStatement = connection.prepareStatement(SqlUtil.SQL_ALTER_DATE_FORMAT)) {
149164
preparedStatement.execute();
150165
}

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void run() {
117117
try {
118118
if (logMinerConnection.hasNext()) {
119119
log = logMinerConnection.next();
120-
queue.put(logParser.parse(log, logMinerConnection.isOracle10));
120+
queue.put(logParser.parse(log));
121121
} else {
122122
logMinerConnection.closeStmt();
123123
logMinerConnection.startOrUpdateLogMiner(positionManager.getPosition());

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogParser.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,16 @@ public void visit(final EqualsTo expr){
142142
});
143143
}
144144

145-
public QueueData parse(QueueData pair, boolean isOracle10) throws JSQLParserException {
145+
public QueueData parse(QueueData pair) throws JSQLParserException {
146146
Map<String, Object> logData = pair.getData();
147147
String schema = MapUtils.getString(logData, "schema");
148148
String tableName = MapUtils.getString(logData, "tableName");
149149
String operation = MapUtils.getString(logData, "operation");
150150
String sqlLog = MapUtils.getString(logData, "sqlLog");
151+
LOG.debug("before alter sqlLog, sqlLog = {}",sqlLog);
151152
String sqlRedo = sqlLog.replace("IS NULL", "= NULL");
152-
//只有oracle10需要进行toDate toTimestamp转换
153-
LOG.debug("before parse toDate/toTimestamp sqlRedo = {}",sqlRedo);
154-
if (isOracle10) {
155-
sqlRedo = parseToTimeStamp(parseToDate(sqlRedo));
156-
}
153+
sqlRedo = parseToTimeStamp(parseToDate(sqlRedo));
154+
157155
Timestamp timestamp = (Timestamp)MapUtils.getObject(logData, "opTime");
158156

159157
Map<String,Object> message = new LinkedHashMap<>();

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/util/SqlUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,9 @@ public class SqlUtil {
266266

267267
public final static String SQL_GET_LOG_FILE_START_POSITION_BY_TIME_10 = "select min(FIRST_CHANGE#) FIRST_CHANGE# from (select FIRST_CHANGE# from v$log where TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS') > FIRST_TIME union select FIRST_CHANGE# from v$archived_log where TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS') between FIRST_TIME and NEXT_TIME and standby_dest='NO' and name is not null)";
268268

269+
//查询会话级别 NLS_DATE_FORMAT 参数值
270+
public final static String SQL_QUERY_NLS_DATE_FORMAT="select VALUE from nls_session_parameters where parameter = 'NLS_DATE_FORMAT'";
271+
269272
//修改当前会话的date日期格式
270273
public final static String SQL_ALTER_DATE_FORMAT ="ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'";
271274

0 commit comments

Comments
 (0)