Skip to content

Commit eaaf578

Browse files
committed
[fix-36591][logminer]修改缓存逻辑
1 parent 97b4f28 commit eaaf578

File tree

2 files changed

+37
-17
lines changed

2 files changed

+37
-17
lines changed

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogFile.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public class LogFile {
3737
/** 日志文件状态 https://docs.oracle.com/cd/B12037_01/server.101/b10755/dynviews_1132.htm V$LOGMNR_LOGS里的status */
3838
private int status;
3939

40+
private String type;
41+
4042
/** 文件大小 **/
4143
private Long bytes;
4244

@@ -88,6 +90,14 @@ public void setStatus(int status) {
8890
this.status = status;
8991
}
9092

93+
public String getType() {
94+
return type;
95+
}
96+
97+
public void setType(String type) {
98+
this.type = type;
99+
}
100+
91101
@Override
92102
public String toString() {
93103
return "LogFile{" +
@@ -96,6 +106,7 @@ public String toString() {
96106
", nextChange=" + nextChange +
97107
", thread=" + thread +
98108
", bytes=" + bytes +
109+
", type=" + type +
99110
'}';
100111
}
101112

flinkx-oraclelogminer/flinkx-oraclelogminer-reader/src/main/java/com/dtstack/flinkx/oraclelogminer/format/LogMinerConnection.java

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public class LogMinerConnection {
7272
public static final String KEY_GRANTED_ROLE = "GRANTED_ROLE";
7373

7474
public static final String DBA_ROLE = "DBA";
75+
public static final String LOG_TYPE_ARCHIVED = "ARCHIVED";
7576
public static final String EXECUTE_CATALOG_ROLE = "EXECUTE_CATALOG_ROLE";
7677

7778
public static final int ORACLE_11_VERSION = 11;
@@ -139,7 +140,7 @@ public class LogMinerConnection {
139140
/**缓存的结构为 xidsqn(事务id)+rowId,scn**/
140141
Cache<String, LinkedHashMap<BigDecimal, RecordLog>> insertRecordCache = CacheBuilder.newBuilder()
141142
.maximumSize(1000)
142-
.expireAfterWrite(30, TimeUnit.MINUTES)
143+
.expireAfterWrite(20, TimeUnit.MINUTES)
143144
.build();
144145
/**
145146
* minScn 和maxScn 其实保证了此范围内的数据一定都加载到了logminer中
@@ -641,7 +642,7 @@ private List<LogFile> queryLogFiles(BigDecimal scn) throws SQLException {
641642
/**
642643
* 获取logminer加载的日志文件
643644
*/
644-
private List<LogFile> queryAddedLogFiles(){
645+
private List<LogFile> queryAddedLogFiles() throws SQLException {
645646
List<LogFile> logFileLists = new ArrayList<>();
646647
try(PreparedStatement statement = connection.prepareStatement(SqlUtil.SQL_QUERY_ADDED_LOG)) {
647648
try(ResultSet rs = statement.executeQuery()) {
@@ -657,8 +658,6 @@ private List<LogFile> queryAddedLogFiles(){
657658
}
658659
LOG.info("The log file loaded by logminer has changed, new log group = {}", GsonUtil.GSON.toJson(logFileLists));
659660
}
660-
} catch (Exception e) {
661-
LOG.warn("Failed to find log file loaded by logminer" + ExceptionUtil.getErrorMessage(e));
662661
}
663662
return logFileLists;
664663
}
@@ -708,7 +707,7 @@ public boolean hasNext() throws SQLException, UnsupportedEncodingException, Deco
708707
}
709708
}
710709

711-
if(undoLog.length() != 0){
710+
if(undoLog.length() == 0){
712711
//没有查找到对应的insert语句 会将delete where rowid=xxx 语句作为redoLog
713712
LOG.warn("has not found undoLog for scn {}",scn);
714713
}else{
@@ -956,6 +955,7 @@ private void putCache(RecordLog recordLog) {
956955

957956
/**
958957
* 从缓存的insert语句里找到rollback语句对应的DML语句
958+
* 如果查找到 需要删除对应的缓存信息
959959
* @param key xidSqn+rowid
960960
* @param scn scn of rollback
961961
* @return insert Log
@@ -968,11 +968,18 @@ public RecordLog queryUndoLog(String key, BigDecimal scn) {
968968
//根据scn号查找 如果scn号相同 则取此对应的DML语句
969969
RecordLog recordLog = recordMap.get(scn);
970970
if (Objects.isNull(recordLog)) {
971-
//如果scn相同的DML语句没有 则取同一个事务里rowId相同的
971+
//如果scn相同的DML语句没有 则取同一个事务里rowId相同的最后一个
972972
Iterator<Map.Entry<BigDecimal, RecordLog>> iterator = recordMap.entrySet().iterator();
973-
if (iterator.hasNext()) {
974-
recordLog = iterator.next().getValue();
973+
Map.Entry<BigDecimal, RecordLog> tail = null;
974+
while (iterator.hasNext()) {
975+
tail = iterator.next();
975976
}
977+
if(Objects.nonNull(tail)){
978+
recordLog = tail.getValue();
979+
recordMap.remove(tail.getKey());
980+
}
981+
} else {
982+
recordMap.remove(scn);
976983
}
977984
LOG.info("query a insert sql for rollback in cache,rollback scn is {}",scn);
978985
return recordLog;
@@ -991,8 +998,8 @@ public RecordLog recursionQueryDataForRollback(RecordLog rollbackRecord) throws
991998
queryDataForRollbackConnection.connect();
992999
}
9931000

994-
//不需要每次拉起一个新的queryDataForRollbackConnection 只需要移除加载的日志文件即可 status为4的 其实是logminer提示缺少日志文件 https://docs.oracle.com/cd/B12037_01/server.101/b10755/dynviews_1132.htm
995-
List<LogFile> files = queryAddedLogFiles().stream().filter(i -> i.getStatus() != 4).collect(Collectors.toList());
1001+
//不需要每次拉起一个新的queryDataForRollbackConnection 只需要移除加载的日志文件即可 (status为4的 其实是logminer提示缺少日志文件) https://docs.oracle.com/cd/B12037_01/server.101/b10755/dynviews_1132.htm
1002+
List<LogFile> files = queryDataForRollbackConnection.queryAddedLogFiles().stream().filter(i -> i.getStatus() != 4).collect(Collectors.toList());
9961003
if(CollectionUtils.isNotEmpty(files)){
9971004
for (LogFile file : files) {
9981005
try{
@@ -1004,17 +1011,19 @@ public RecordLog recursionQueryDataForRollback(RecordLog rollbackRecord) throws
10041011
}
10051012
}
10061013

1014+
//查找出当前加载归档日志文件里的最小scn 递归查找此scn之前的文件
1015+
List<LogFile> logFiles = queryAddedLogFiles().stream().filter(i->i.getStatus() != 4 && i.getType().equalsIgnoreCase(LOG_TYPE_ARCHIVED)).collect(Collectors.toList());
10071016

1008-
//查找出当前加载日志文件里的最小scn 递归查找此scn之前的文件
1009-
List<LogFile> logFiles = queryAddedLogFiles().stream().sorted(Comparator.comparing(LogFile::getFirstChange)).collect(Collectors.toList());
10101017
//默认每次往前查询4000个scn
1011-
BigDecimal startScn = rollbackRecord.getScn().subtract(new BigDecimal(4000));
1012-
BigDecimal endScn = rollbackRecord.getScn();
1013-
//nextChange-firstChange 为一个文件包含多少的scn,将其*2 代表加载此scn之前2个文件
1018+
BigDecimal step = new BigDecimal(4000);
10141019
if (CollectionUtils.isNotEmpty(logFiles)) {
1015-
startScn = logFiles.get(0).getFirstChange().subtract((logFiles.get(0).getNextChange().subtract( logFiles.get(0).getFirstChange())).multiply(new BigDecimal(2)));
1020+
//nextChange-firstChange 为一个文件包含多少的scn,将其*2 代表加载此scn之前2个文件
1021+
step = logFiles.get(0).getNextChange().subtract(logFiles.get(0).getFirstChange()).multiply(new BigDecimal(2));
10161022
}
10171023

1024+
BigDecimal startScn = rollbackRecord.getScn().subtract(step);
1025+
BigDecimal endScn = rollbackRecord.getScn();
1026+
10181027
for (int i = 0; i < 10; i++) {
10191028
queryDataForRollbackConnection.startOrUpdateLogminer(startScn, endScn);
10201029
queryDataForRollbackConnection.queryDataForDeleteRollback(rollbackRecord, SqlUtil.queryDataForRollback);
@@ -1027,7 +1036,7 @@ public RecordLog recursionQueryDataForRollback(RecordLog rollbackRecord) throws
10271036
}
10281037
}
10291038
endScn = startScn;
1030-
startScn = logFiles.get(0).getFirstChange().subtract((logFiles.get(0).getNextChange().subtract(logFiles.get(0).getFirstChange())).multiply(new BigDecimal(2)));
1039+
startScn = startScn.subtract(step);
10311040
}
10321041
return null;
10331042
}

0 commit comments

Comments
 (0)