Skip to content

Commit 4842f7d

Browse files
committed
[hotfix-36211][ftp] 解决windows下中文目录的文件读取不到问题
[hotfix-36332][flinkx-core][hivewriter]解决gsonutil解析到18446744073702197459出错 [hotfix-36332][hive]修改hive解析json失败报错 [hotfix-36431][logminer]logminer支持解析toDate函数 [hotfix-36432][logminer]logminer 增加3f207768657265(? where)场景解析支持 [hotfix-387][logminer][docs] add standaLoneQuickStart docs [hotfix-36476][kudureader]支持column配置为常量 [hotfix-36591][logminer]事务回滚日志delete where row_id=''转为正常业务字段,修改缓存逻辑,加载的日志文件从V$LOGMNR_LOGS查找出来并打印,加载日志文件SQL_START_LOGMINER_HAS_MAX_LIMIT 查找实时日志文件where条件修改,disConnect增加非空判断 [hotfix-36356][ftp]解决ftp在windows写入问题
2 parents 4330099 + ac26a60 commit 4842f7d

File tree

38 files changed

+1236
-1181
lines changed

38 files changed

+1236
-1181
lines changed

flinkx-core/src/main/java/com/dtstack/flinkx/authenticate/KerberosUtil.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,10 @@ public static String loadFile(Map<String, Object> kerberosConfig, String filePat
148148
LOG.info("will use local file:{}", filePath);
149149
checkFileExists(filePath);
150150
} else {
151+
if(filePath.contains(SP)){
152+
filePath = filePath.substring(filePath.lastIndexOf(SP) + 1);
153+
}
154+
151155
filePath = loadFromSftp(kerberosConfig, filePath);
152156
}
153157

@@ -172,9 +176,6 @@ private static String loadFromSftp(Map<String, Object> config, String fileName){
172176
String localDirName = Md5Util.getMd5(remoteDir);
173177
String localDir = LOCAL_CACHE_DIR + SP + localDirName;
174178
localDir = createDir(localDir);
175-
if (fileName.contains(SP)) {
176-
fileName = fileName.substring(fileName.lastIndexOf(SP) + 1);
177-
}
178179
String fileLocalPath = localDir + SP + fileName;
179180
// 更新sftp文件对应的local文件
180181
if (fileExists(fileLocalPath)) {

flinkx-core/src/main/java/com/dtstack/flinkx/decoder/JsonDecoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
package com.dtstack.flinkx.decoder;
1919

20-
import com.dtstack.flinkx.util.GsonUtil;
20+
import com.dtstack.flinkx.util.MapUtil;
2121
import org.slf4j.Logger;
2222
import org.slf4j.LoggerFactory;
2323

@@ -38,7 +38,7 @@ public class JsonDecoder implements IDecode {
3838
@Override
3939
public Map<String, Object> decode(final String message) {
4040
try {
41-
Map<String, Object> event = GsonUtil.GSON.fromJson(message, GsonUtil.gsonMapTypeToken);
41+
Map<String, Object> event = MapUtil.jsonStrToObject(message,Map.class);
4242
if (!event.containsKey(KEY_MESSAGE)) {
4343
event.put(KEY_MESSAGE, message);
4444
}

flinkx-core/src/main/java/com/dtstack/flinkx/util/GsonUtil.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.IOException;
3434
import java.lang.reflect.Field;
3535
import java.lang.reflect.Type;
36+
import java.math.BigInteger;
3637
import java.util.*;
3738

3839
/**
@@ -107,7 +108,11 @@ public Object read(JsonReader in) throws IOException {
107108
try {
108109
return Integer.valueOf(s);
109110
} catch (Exception e) {
110-
return Long.valueOf(s);
111+
try{
112+
return Long.valueOf(s);
113+
}catch (Exception e1){
114+
return new BigInteger(s);
115+
}
111116
}
112117
}
113118
case BOOLEAN:

flinkx-core/src/main/java/com/dtstack/flinkx/util/MapUtil.java

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,15 @@
77
import com.google.gson.internal.LinkedHashTreeMap;
88
import com.google.gson.internal.LinkedTreeMap;
99
import com.fasterxml.jackson.databind.ObjectMapper;
10+
import org.apache.commons.collections.MapUtils;
11+
import org.apache.commons.lang3.StringUtils;
1012

1113
import java.io.IOException;
1214
import java.util.HashMap;
1315
import java.util.Map;
1416

17+
import static com.dtstack.flinkx.util.StringUtil.escapeExprSpecialWord;
18+
1519
/**
1620
* Reason:
1721
* Date: 2019/8/9
@@ -61,4 +65,89 @@ public static String writeValueAsString(Object obj) throws JsonProcessingExcepti
6165
return objectMapper.writeValueAsString(obj);
6266
}
6367

68+
/**
69+
* 根据key 以及切割键 获取真正的key,将key 和value放入data中
70+
* @param key key
71+
* @param fieldDelimiter 切割键
72+
* @param value 值
73+
* @param data 载体data
74+
*/
75+
public static void buildMap(String key, String fieldDelimiter, Object value, Map<String, Object> data) {
76+
String[] split = new String[1];
77+
if (StringUtils.isBlank(fieldDelimiter)) {
78+
split[0] = key;
79+
} else {
80+
split = key.split(escapeExprSpecialWord(fieldDelimiter));
81+
}
82+
83+
if (split.length == 1) {
84+
data.put(split[0], value);
85+
} else {
86+
Map<String, Object> temp = data;
87+
for (int i = 0; i < split.length - 1; i++) {
88+
if (temp.containsKey(split[i])) {
89+
if (temp.get(split[i]) instanceof HashMap) {
90+
temp = (HashMap) temp.get(split[i]);
91+
} else {
92+
throw new RuntimeException("build map failed ,data is " + GsonUtil.GSON.toJson(data) + " key is " + key);
93+
}
94+
} else {
95+
Map hashMap = new HashMap(2);
96+
temp.put(split[i], hashMap);
97+
temp = hashMap;
98+
}
99+
if (i == split.length - 2) {
100+
temp.put(split[split.length - 1], value);
101+
}
102+
}
103+
}
104+
}
105+
106+
107+
/**
108+
* 根据指定的key从map里获取对应的值
109+
* 如果key不存在 报错
110+
*
111+
* @param map 需要解析的map
112+
* @param key 指定的key key可以是嵌套的
113+
* @param fieldDelimiter 嵌套key的分隔符
114+
*/
115+
public static Object getValueByKey(Map<String, Object> map, String key, String fieldDelimiter) {
116+
if (MapUtils.isEmpty(map)) {
117+
throw new RuntimeException(key + " not exist because map is empty");
118+
}
119+
Object o = null;
120+
String[] split = new String[1];
121+
if (StringUtils.isBlank(fieldDelimiter)) {
122+
split[0] = key;
123+
} else {
124+
split = key.split(escapeExprSpecialWord(fieldDelimiter));
125+
}
126+
127+
Map<String, Object> tempMap = map;
128+
for (int i = 0; i < split.length; i++) {
129+
o = getValue(tempMap, split[i]);
130+
//仅仅代表这个key对应的值是null但是key还是存在的
131+
if (o == null && i != split.length - 1) {
132+
throw new RuntimeException(key + " on [" + GsonUtil.GSON.toJson(map) + "] is null");
133+
}
134+
135+
if (i != split.length - 1) {
136+
if (!(o instanceof Map)) {
137+
throw new RuntimeException("key " + key + " on " + map + " is not a json");
138+
}
139+
tempMap = (Map<String, Object>) o;
140+
}
141+
}
142+
return o;
143+
}
144+
145+
146+
private static Object getValue(Map<String, Object> map, String key) {
147+
if (!map.containsKey(key)) {
148+
throw new RuntimeException(key + " not exist on " + GsonUtil.GSON.toJson(map) );
149+
}
150+
return map.get(key);
151+
}
152+
64153
}

flinkx-core/src/main/java/com/dtstack/flinkx/util/StringUtil.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ public static Object string2col(String str, String type, SimpleDateFormat custom
9696
break;
9797
case MEDIUMINT:
9898
case BIGINT:
99+
case LONG:
99100
ret = Long.valueOf(str.trim());
100101
break;
101102
case FLOAT:
@@ -339,4 +340,23 @@ public static String splitIgnoreQuotaAndJoinByPoint(String table) {
339340
}
340341
return stringBuffer.toString();
341342
}
343+
344+
345+
/**
346+
* 转义正则特殊字符 ($()*+.[]?\^{},|)
347+
*
348+
* @param keyword 需要转义特殊字符串的文本
349+
* @return 特殊字符串转义后的文本
350+
*/
351+
public static String escapeExprSpecialWord(String keyword) {
352+
if (StringUtils.isNotBlank(keyword)) {
353+
String[] fbsArr = {"\\", "$", "(", ")", "*", "+", ".", "[", "]", "?", "^", "{", "}", "|"};
354+
for (String key : fbsArr) {
355+
if (keyword.contains(key)) {
356+
keyword = keyword.replace(key, "\\" + key);
357+
}
358+
}
359+
}
360+
return keyword;
361+
}
342362
}

flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpConfig.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package com.dtstack.flinkx.ftp;
2121

22+
import com.dtstack.flinkx.constants.ConstantValue;
2223
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
2324

2425
import java.io.Serializable;
@@ -60,6 +61,12 @@ public class FtpConfig implements Serializable {
6061

6162
public long maxFileSize = 1024 * 1024 * 1024L;
6263

64+
/** ftp客户端编码格式 **/
65+
public String controlEncoding = System.getProperty(ConstantValue.SYSTEM_PROPERTIES_KEY_FILE_ENCODING);
66+
67+
/** linux是否展示隐藏文件 **/
68+
public boolean listHiddenFiles = true;
69+
6370
public String getUsername() {
6471
return username;
6572
}
@@ -179,4 +186,20 @@ public long getMaxFileSize() {
179186
public void setMaxFileSize(long maxFileSize) {
180187
this.maxFileSize = maxFileSize;
181188
}
189+
190+
public String getControlEncoding() {
191+
return controlEncoding;
192+
}
193+
194+
public void setControlEncoding(String controlEncoding) {
195+
this.controlEncoding = controlEncoding;
196+
}
197+
198+
public boolean getListHiddenFiles() {
199+
return listHiddenFiles;
200+
}
201+
202+
public void setListHiddenFiles(boolean listHiddenFiles) {
203+
this.listHiddenFiles = listHiddenFiles;
204+
}
182205
}

0 commit comments

Comments
 (0)