Skip to content

Commit f7be544

Browse files
author
gituser
committed
Merge branch 'hotfix_1.8_4.0_x_36431' into 1.8_release_4.0.x
2 parents c8b3d4f + 4b2e2af commit f7be544

File tree

4 files changed

+33
-15
lines changed

4 files changed

+33
-15
lines changed

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerConnection.java

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public LogMinerConnection(LogMinerConfig logMinerConfig) {
124124
}
125125

126126
public void connect() {
127-
PreparedStatement preparedStatement = null;
127+
128128
try {
129129
ClassUtil.forName(logMinerConfig.getDriverName(), getClass().getClassLoader());
130130

@@ -133,19 +133,36 @@ public void connect() {
133133
oracleVersion = connection.getMetaData().getDatabaseMajorVersion();
134134
isOracle10 = oracleVersion == 10;
135135

136-
if(isOracle10){
137-
//oracle10开启logMiner之前 需要设置会话级别的日期格式 否则sql语句会含有todate函数 而不是todate函数计算后的值
138-
preparedStatement = connection.prepareStatement(SqlUtil.SQL_ALTER_DATE_FORMAT);
139-
preparedStatement.execute();
140-
preparedStatement = connection.prepareStatement(SqlUtil.NLS_TIMESTAMP_FORMAT);
141-
preparedStatement.execute();
136+
//修改session级别的 NLS_DATE_FORMAT 值为 "YYYY-MM-DD HH24:MI:SS",否则在解析日志时 redolog出现 TO_DATE('18-APR-21', 'DD-MON-RR')
137+
boolean isAlterDateFormat = false;
138+
try {
139+
try (PreparedStatement preparedStatement = connection.prepareStatement(SqlUtil.SQL_QUERY_NLS_DATE_FORMAT)) {
140+
try (ResultSet resultSet = preparedStatement.executeQuery(SqlUtil.SQL_QUERY_NLS_DATE_FORMAT)) {
141+
while (resultSet.next()) {
142+
String nlsDateFormat = resultSet.getString(1);
143+
isAlterDateFormat = !nlsDateFormat.equalsIgnoreCase("YYYY-MM-DD HH24:MI:SS");
144+
LOG.info("nlsDateFormat {}, isAlterDateFormat {}", nlsDateFormat, isAlterDateFormat);
145+
}
146+
}
147+
}
148+
} catch (Exception e) {
149+
LOG.info("query nlsDateFormat failed ,exception is {}", ExceptionUtil.getErrorMessage(e));
150+
}
151+
152+
if (isAlterDateFormat) {
153+
try (PreparedStatement preparedStatement = connection.prepareStatement(SqlUtil.SQL_ALTER_DATE_FORMAT)) {
154+
preparedStatement.execute();
155+
}
156+
try (PreparedStatement preparedStatement = connection.prepareStatement(SqlUtil.NLS_TIMESTAMP_FORMAT)) {
157+
preparedStatement.execute();
158+
}
142159
}
143160

144161
LOG.info("get connection successfully, url:{}, username:{}, Oracle version:{}", logMinerConfig.getJdbcUrl(), logMinerConfig.getUsername(), oracleVersion);
145162
} catch (Exception e){
146163
String message = String.format("get connection failed,url:[%s], username:[%s], e:%s", logMinerConfig.getJdbcUrl(), logMinerConfig.getUsername(), ExceptionUtil.getErrorMessage(e));
147164
LOG.error(message);
148-
closeResources(null, preparedStatement, connection);
165+
closeResources(null, null, connection);
149166
throw new RuntimeException(message, e);
150167
}
151168
}

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerListener.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void run() {
117117
try {
118118
if (logMinerConnection.hasNext()) {
119119
log = logMinerConnection.next();
120-
queue.put(logParser.parse(log, logMinerConnection.isOracle10));
120+
queue.put(logParser.parse(log));
121121
} else {
122122
logMinerConnection.closeStmt();
123123
logMinerConnection.startOrUpdateLogMiner(positionManager.getPosition());

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogParser.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,18 +142,16 @@ public void visit(final EqualsTo expr){
142142
});
143143
}
144144

145-
public QueueData parse(QueueData pair, boolean isOracle10) throws JSQLParserException {
145+
public QueueData parse(QueueData pair) throws JSQLParserException {
146146
Map<String, Object> logData = pair.getData();
147147
String schema = MapUtils.getString(logData, "schema");
148148
String tableName = MapUtils.getString(logData, "tableName");
149149
String operation = MapUtils.getString(logData, "operation");
150150
String sqlLog = MapUtils.getString(logData, "sqlLog");
151+
LOG.debug("before alter sqlLog, sqlLog = {}",sqlLog);
151152
String sqlRedo = sqlLog.replace("IS NULL", "= NULL");
152-
//只有oracle10需要进行toDate toTimestamp转换
153-
LOG.debug("before parse toDate/toTimestamp sqlRedo = {}",sqlRedo);
154-
if (isOracle10) {
155-
sqlRedo = parseToTimeStamp(parseToDate(sqlRedo));
156-
}
153+
sqlRedo = parseToTimeStamp(parseToDate(sqlRedo));
154+
157155
Timestamp timestamp = (Timestamp)MapUtils.getObject(logData, "opTime");
158156

159157
Map<String,Object> message = new LinkedHashMap<>();

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/util/SqlUtil.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,9 @@ public class SqlUtil {
285285

286286
public final static String SQL_GET_LOG_FILE_START_POSITION_BY_TIME_10 = "select min(FIRST_CHANGE#) FIRST_CHANGE# from (select FIRST_CHANGE# from v$log where TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS') > FIRST_TIME union select FIRST_CHANGE# from v$archived_log where TO_DATE(?, 'YYYY-MM-DD HH24:MI:SS') between FIRST_TIME and NEXT_TIME and standby_dest='NO' and name is not null)";
287287

288+
//查询会话级别 NLS_DATE_FORMAT 参数值
289+
public final static String SQL_QUERY_NLS_DATE_FORMAT="select VALUE from nls_session_parameters where parameter = 'NLS_DATE_FORMAT'";
290+
288291
//修改当前会话的date日期格式
289292
public final static String SQL_ALTER_DATE_FORMAT ="ALTER SESSION SET NLS_DATE_FORMAT = 'YYYY-MM-DD HH24:MI:SS'";
290293

0 commit comments

Comments
 (0)