Skip to content

Commit 1182415

Browse files
committed
Merge remote-tracking branch 'origin/1.10_release_4.0.x' into temp_1.10_4.1_merge
# Conflicts: # flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/util/SqlUtil.java
2 parents fa856d5 + 53f2204 commit 1182415

File tree

3 files changed

+26
-96
lines changed

3 files changed

+26
-96
lines changed

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogFile.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class LogFile {
3737
/** 日志文件状态 https://docs.oracle.com/cd/B12037_01/server.101/b10755/dynviews_1132.htm V$LOGMNR_LOGS里的status */
3838
private int status;
3939

40+
//是归档日志 还是online日志
4041
private String type;
4142

4243
/** 文件大小 **/

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerConnection.java

Lines changed: 21 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,6 @@ public class LogMinerConnection {
8181
public boolean isGBK = false;
8282
boolean isOracle10;
8383

84-
public static final long MAX_SCN = 281474976710655L;
85-
8684
public static final List<String> PRIVILEGES_NEEDED = Arrays.asList(
8785
"CREATE SESSION",
8886
"LOGMINING",
@@ -249,37 +247,26 @@ public void startOrUpdateLogMiner(BigDecimal startScn) {
249247
if (logMinerConfig.getSupportAutoAddLog()) {
250248
startSql = isOracle10 ? SqlUtil.SQL_START_LOG_MINER_AUTO_ADD_LOG_10 : SqlUtil.SQL_START_LOG_MINER_AUTO_ADD_LOG;
251249
} else {
252-
//第一次加载或者已经没有归档日志加载 只有online日志加载 此时maxScn为空
250+
//第一次加载,此时maxScn为空
253251
if (null == maxScn) {
254-
if (null != minScn && startScn.compareTo(minScn) < 0) {
255-
maxScn = minScn;
256-
} else {
257-
maxScn = startScn;
258-
}
252+
maxScn = startScn;
259253
}
260254

261255
List<LogFile> newLogFiles = queryLogFiles(maxScn);
262256
if (addedLogFiles.equals(newLogFiles)) {
263257
return;
264258
} else {
265-
// LOG.info("Log group changed, new log group = {}", GsonUtil.GSON.toJson(newLogFiles));
266259
addedLogFiles = newLogFiles;
267-
// startSql = isOracle10 ? SqlUtil.SQL_START_LOG_MINER_10 : SqlUtil.SQL_START_LOG_MINER;
268-
startSql = null == maxScn ? SqlUtil.SQL_START_LOGMINER_NO_MAX_LIMIT : SqlUtil.SQL_START_LOGMINER_HAS_MAX_LIMIT;
260+
startSql = SqlUtil.SQL_START_LOGMINER;
269261
}
270262
}
271263

272-
closeStmt(logMinerStartStmt);
273-
274-
logMinerStartStmt = connection.prepareCall(startSql);
275-
configStatement(logMinerStartStmt);
264+
resetLogminerStmt(startSql);
276265
if (logMinerConfig.getSupportAutoAddLog()) {
277266
logMinerStartStmt.setBigDecimal(1, startScn);
278267
} else {
279268
logMinerStartStmt.setBigDecimal(1, minScn);
280-
if (null != maxScn) {
281-
logMinerStartStmt.setBigDecimal(2, maxScn);
282-
}
269+
logMinerStartStmt.setBigDecimal(2, maxScn);
283270
}
284271

285272
logMinerStartStmt.execute();
@@ -294,12 +281,8 @@ public void startOrUpdateLogMiner(BigDecimal startScn) {
294281
}
295282
}
296283

297-
298284
public void startOrUpdateLogminer(BigDecimal startScn, BigDecimal endScn) throws SQLException {
299-
closeStmt(logMinerStartStmt);
300-
301-
logMinerStartStmt = connection.prepareCall(SqlUtil.SQL_START_LOGMINER_HAS_MAX_LIMIT);
302-
configStatement(logMinerStartStmt);
285+
resetLogminerStmt(SqlUtil.SQL_START_LOGMINER);
303286

304287
logMinerStartStmt.setBigDecimal(1, startScn);
305288
logMinerStartStmt.setBigDecimal(2, endScn);
@@ -316,17 +299,12 @@ public void startOrUpdateLogminer(BigDecimal startScn, BigDecimal endScn) throws
316299
*/
317300
public void queryData(BigDecimal startScn, String logMinerSelectSql) {
318301
try {
319-
if (null != maxScn) {
320-
logMinerSelectSql += "AND scn < ?";
321-
}
322302
logMinerSelectStmt = connection.prepareStatement(logMinerSelectSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
323303
configStatement(logMinerSelectStmt);
324304

325305
logMinerSelectStmt.setFetchSize(logMinerConfig.getFetchSize());
326306
logMinerSelectStmt.setBigDecimal(1, startScn);
327-
if (null != maxScn) {
328-
logMinerSelectStmt.setBigDecimal(2, maxScn);
329-
}
307+
logMinerSelectStmt.setBigDecimal(2, maxScn);
330308
logMinerData = logMinerSelectStmt.executeQuery();
331309

332310
LOG.debug("query Log miner data, offset:{}", startScn);
@@ -606,7 +584,7 @@ private List<LogFile> queryLogFiles(BigDecimal scn) throws SQLException {
606584
if (CollectionUtils.isEmpty(tempList)) {
607585
break;
608586
}
609-
//找到最小的firstSCN和最小的nextSCN 需要排除掉线上日志
587+
//找到最小的firstSCN和最小的nextSCN
610588
BigDecimal minFirstScn = tempList.stream().sorted(Comparator.comparing(LogFile::getFirstChange)).collect(Collectors.toList()).get(0).getFirstChange();
611589
BigDecimal minNextScn = tempList.stream().sorted(Comparator.comparing(LogFile::getNextChange)).collect(Collectors.toList()).get(0).getNextChange();
612590

@@ -624,7 +602,8 @@ private List<LogFile> queryLogFiles(BigDecimal scn) throws SQLException {
624602
}
625603
//如果最小的nextScn都是onlineNextChange,就代表加载所有的日志文件
626604
if (tempMinNextScn.equals(onlineNextChange)) {
627-
tempMinNextScn = null;
605+
//解决logminer偶尔丢失数据问题,读取online日志的时候,需要将endScn置为当前SCN
606+
tempMinNextScn = getCurrentScn();
628607
logFiles = logFileLists;
629608
}
630609
maxScn = tempMinNextScn;
@@ -647,8 +626,8 @@ private List<LogFile> queryAddedLogFiles() throws SQLException {
647626
logFile.setNextChange(rs.getBigDecimal("next_scn"));
648627
logFile.setThread(rs.getLong("thread_id"));
649628
logFile.setBytes(rs.getLong("filesize"));
650-
logFile.setStatus(rs.getInt("STATUS"));
651-
logFile.setType(rs.getString("TYPE"));
629+
logFile.setStatus(rs.getInt("status"));
630+
logFile.setType(rs.getString("type"));
652631
logFileLists.add(logFile);
653632
}
654633
}
@@ -995,19 +974,6 @@ public RecordLog recursionQueryDataForRollback(RecordLog rollbackRecord) throws
995974
queryDataForRollbackConnection.connect();
996975
}
997976

998-
//不需要每次拉起一个新的queryDataForRollbackConnection 只需要移除加载的日志文件即可 (status为4的 其实是logminer提示缺少日志文件) https://docs.oracle.com/cd/B12037_01/server.101/b10755/dynviews_1132.htm
999-
List<LogFile> files = queryDataForRollbackConnection.queryAddedLogFiles().stream().filter(i -> i.getStatus() != 4).collect(Collectors.toList());
1000-
if(CollectionUtils.isNotEmpty(files)){
1001-
for (LogFile file : files) {
1002-
try{
1003-
removeLogFile(file.getFileName());
1004-
}catch (SQLException e){
1005-
queryDataForRollbackConnection.disConnect();
1006-
queryDataForRollbackConnection.connect();
1007-
}
1008-
}
1009-
}
1010-
1011977
//查找出当前加载归档日志文件里的最小scn 递归查找此scn之前的文件
1012978
List<LogFile> logFiles = queryAddedLogFiles().stream().filter(i->i.getStatus() != 4 && i.getType().equalsIgnoreCase(LOG_TYPE_ARCHIVED)).collect(Collectors.toList());
1013979

@@ -1037,11 +1003,15 @@ public RecordLog recursionQueryDataForRollback(RecordLog rollbackRecord) throws
10371003
return null;
10381004
}
10391005

1040-
public void removeLogFile(String fileName) throws SQLException {
1041-
try(PreparedStatement statement = connection.prepareStatement(SqlUtil.SQL_REMOVE_ADDED_LOG)) {
1042-
statement.setString(1,fileName);
1043-
statement.execute();
1044-
}
1006+
/**
1007+
* 重置 启动logminer的statement
1008+
* @param startSql
1009+
* @throws SQLException
1010+
*/
1011+
public void resetLogminerStmt(String startSql) throws SQLException {
1012+
closeStmt(logMinerStartStmt);
1013+
logMinerStartStmt = connection.prepareCall(startSql);
1014+
configStatement(logMinerStartStmt);
10451015
}
10461016

10471017
public enum ReadPosition {

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/util/SqlUtil.java

Lines changed: 4 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,14 @@ public class SqlUtil {
168168
"FROM\n" +
169169
" v$logmnr_contents\n" +
170170
"WHERE\n" +
171-
" scn > ? \n";
171+
" scn > ? \n" +
172+
" AND scn < ? \n";
172173

173174

174175
/**
175176
* 加载包含startSCN和endSCN之间日志的日志文件
176177
*/
177-
public final static String SQL_START_LOGMINER_HAS_MAX_LIMIT = "DECLARE \n" +
178+
public final static String SQL_START_LOGMINER = "DECLARE \n" +
178179
" st BOOLEAN := true;\n" +
179180
" start_scn NUMBER := ?;\n" +
180181
" endScn NUMBER := ?;\n" +
@@ -192,7 +193,7 @@ public class SqlUtil {
192193
" v$log l \n"+
193194
" INNER JOIN v$logfile f ON l.group# = f.group# \n"+
194195
" WHERE (l.STATUS = 'CURRENT' OR l.STATUS = 'ACTIVE' )\n"+
195-
" AND first_change# <= start_scn \n"+
196+
" AND first_change# < endScn \n"+
196197
" UNION \n"+
197198
" SELECT \n"+
198199
" name, \n"+
@@ -214,48 +215,6 @@ public class SqlUtil {
214215
" end;";
215216

216217

217-
/**
218-
* 加载比startSCN大的日志,即nextChange比startSCN大的日志文件都需要加载
219-
*/
220-
public final static String SQL_START_LOGMINER_NO_MAX_LIMIT = "DECLARE \n" +
221-
" st BOOLEAN := true;\n" +
222-
" start_scn NUMBER := ?;\n" +
223-
"BEGIN\n" +
224-
" FOR l_log_rec IN (\n" +
225-
" SELECT\n" +
226-
" MIN(name) name,\n" +
227-
" first_change#\n" +
228-
" FROM\n" +
229-
" (\n" +
230-
" SELECT \n"+
231-
" member AS name, \n"+
232-
" first_change# \n"+
233-
" FROM \n"+
234-
" v$log l \n"+
235-
" INNER JOIN v$logfile f ON l.group# = f.group# \n"+
236-
" WHERE l.STATUS = 'CURRENT' OR l.STATUS = 'ACTIVE' \n"+
237-
" UNION \n"+
238-
" SELECT \n"+
239-
" name, \n"+
240-
" first_change# \n"+
241-
" FROM \n"+
242-
" v$archived_log \n"+
243-
" WHERE \n"+
244-
" name IS NOT NULL \n"+
245-
" AND STANDBY_DEST='NO'\n"+
246-
" AND next_change# > start_scn )group by first_change# order by first_change# )LOOP IF st THEN \n"+
247-
" SYS.DBMS_LOGMNR.add_logfile(l_log_rec.name, SYS.DBMS_LOGMNR.new); \n"+
248-
" st := false; \n"+
249-
" ELSE \n"+
250-
" SYS.DBMS_LOGMNR.add_logfile(l_log_rec.name); \n"+
251-
" END IF; \n"+
252-
" END LOOP;\n"+
253-
" SYS.DBMS_LOGMNR.start_logmnr( options => SYS.DBMS_LOGMNR.skip_corruption + SYS.DBMS_LOGMNR.no_sql_delimiter + SYS.DBMS_LOGMNR.no_rowid_in_stmt\n"+
254-
" + SYS.DBMS_LOGMNR.dict_from_online_catalog );\n"+
255-
" end;";
256-
257-
258-
259218
/** 查找delete的rollback语句对应的insert语句 存在一个事务里rowid相同的其他语句 所以需要子查询过滤掉scn相同rowid相同的语句(这是一对rollback和DML)*/
260219
public static String queryDataForRollback =
261220
"SELECT\n" +

0 commit comments

Comments
 (0)