Skip to content

Commit c07155c

Browse files
author
gituser
committed
Merge branch 'hotfix_1.10_4.2.x_37535' into 1.10_test_4.2.x
2 parents 206cd02 + 73a152a commit c07155c

File tree

14 files changed

+161
-55
lines changed

14 files changed

+161
-55
lines changed

core/src/main/java/com/dtstack/flink/sql/format/DeserializationMetricWrapper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.dtstack.flink.sql.format;
2020

2121
import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager;
22+
import com.dtstack.flink.sql.exception.ExceptionTrace;
2223
import com.dtstack.flink.sql.metric.MetricConstant;
2324
import org.apache.flink.api.common.functions.RuntimeContext;
2425
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
@@ -111,7 +112,8 @@ public Row deserialize(byte[] message) throws IOException {
111112
} catch (Exception e) {
112113
//add metric of dirty data
113114
dirtyDataManager.execute();
114-
dirtyDataManager.collectDirtyData(new String(message), e.getMessage());
115+
dirtyDataManager.collectDirtyData(
116+
new String(message), ExceptionTrace.traceOriginalCause(e));
115117
dirtyDataCounter.inc();
116118
return null;
117119
}

core/src/main/java/com/dtstack/flink/sql/format/dtnest/DtNestRowDeserializationSchema.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ public class DtNestRowDeserializationSchema extends AbstractDeserializationSchem
6666
private final String charsetName;
6767

6868
private static final Pattern TIMESTAMP_PATTERN = Pattern.compile("^\\d+$");
69-
private static final Pattern TIME_FORMAT_PATTERN = Pattern.compile("\\w+\\d+:\\d+:\\d+");
69+
private static final Pattern TIMESTAMP_FORMAT_PATTERN = Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}?.*");
70+
private static final Pattern TIME_FORMAT_PATTERN = Pattern.compile("[0-9]{2}:[0-9]{2}:[0-9]{2}?.*");
71+
private static final Pattern DATE_FORMAT_PATTERN = Pattern.compile("[0-9]{4}-[0-9]{2}-[0-9]{2}");
7072

7173
public DtNestRowDeserializationSchema(TypeInformation<Row> typeInfo, Map<String, String> rowAndFieldMapping,
7274
List<AbstractTableInfo.FieldExtraInfo> fieldExtraInfos,
@@ -147,7 +149,7 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
147149
return node.asText();
148150
}
149151
} else if (info.getTypeClass().equals(Types.SQL_DATE.getTypeClass())) {
150-
return Date.valueOf(node.asText());
152+
return convertToDate(node.asText());
151153
} else if (info.getTypeClass().equals(Types.SQL_TIME.getTypeClass())) {
152154
// local zone
153155
return convertToTime(node.asText());
@@ -169,27 +171,51 @@ private Object convert(JsonNode node, TypeInformation<?> info) {
169171
}
170172
}
171173

172-
/**
173-
* 将 2020-09-07 14:49:10.0 和 1598446699685 两种格式都转化为 Timestamp
174-
*/
174+
/** 将 2020-09-07 14:49:10.0 和 1598446699685 两种格式都转化为 Timestamp */
175175
private Timestamp convertToTimestamp(String timestamp) {
176176
if (TIMESTAMP_PATTERN.matcher(timestamp).find()) {
177177
return new Timestamp(Long.parseLong(timestamp));
178178
}
179-
if (TIME_FORMAT_PATTERN.matcher(timestamp).find()) {
179+
if (TIMESTAMP_FORMAT_PATTERN.matcher(timestamp).find()) {
180180
return Timestamp.valueOf(timestamp);
181181
}
182-
throw new IllegalArgumentException("Incorrect time format of timestamp");
182+
throw new IllegalArgumentException(
183+
String.format(
184+
"Incorrect timestamp format [yyyy-MM-dd hh:mm:ss] of timestamp type. Input value: [%s]",
185+
timestamp));
186+
}
187+
188+
private Date convertToDate(String date) {
189+
if (TIMESTAMP_PATTERN.matcher(date).find()) {
190+
return new Date(Long.parseLong(date));
191+
}
192+
if (TIMESTAMP_FORMAT_PATTERN.matcher(date).find()) {
193+
return new Date(Timestamp.valueOf(date).getTime());
194+
}
195+
if (DATE_FORMAT_PATTERN.matcher(date).find()) {
196+
return Date.valueOf(date);
197+
}
198+
throw new IllegalArgumentException(
199+
String.format(
200+
"Incorrect date format [yyyy-MM-dd] of date type. Input value: [%s]",
201+
date));
183202
}
184203

185204
private Time convertToTime(String timestamp) {
186205
if (TIMESTAMP_PATTERN.matcher(timestamp).find()) {
187206
return new Time(Long.parseLong(timestamp));
188207
}
208+
if (TIMESTAMP_FORMAT_PATTERN.matcher(timestamp).find()) {
209+
long time = Timestamp.valueOf(timestamp).getTime();
210+
return new Time(time);
211+
}
189212
if (TIME_FORMAT_PATTERN.matcher(timestamp).find()) {
190213
return Time.valueOf(timestamp);
191214
}
192-
throw new IllegalArgumentException("Incorrect time format of time");
215+
throw new IllegalArgumentException(
216+
String.format(
217+
"Incorrect time format [hh:mm:ss] of time type. Input value: [%s]",
218+
timestamp));
193219
}
194220

195221
private Row convertTopRow() {

core/src/main/java/com/dtstack/flink/sql/side/AbstractSideTableInfo.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ public abstract class AbstractSideTableInfo extends AbstractTableInfo implements
9595

9696
private List<PredicateInfo> predicateInfoes = Lists.newArrayList();
9797

98+
private List<PredicateInfo> fullPredicateInfoes = Lists.newArrayList();
99+
98100
private boolean fastCheck;
99101

100102
public RowTypeInfo getRowTypeInfo(){
@@ -197,6 +199,14 @@ public void addPredicateInfo(PredicateInfo predicateInfo) {
197199
this.predicateInfoes.add(predicateInfo);
198200
}
199201

202+
public List<PredicateInfo> getFullPredicateInfoes() {
203+
return fullPredicateInfoes;
204+
}
205+
206+
public void addFullPredicateInfoes(PredicateInfo predicateInfo) {
207+
this.fullPredicateInfoes.add(predicateInfo);
208+
}
209+
200210
public Long getAsyncFailMaxNum(Long defaultValue) {
201211
return Objects.isNull(asyncFailMaxNum) ? defaultValue : asyncFailMaxNum;
202212
}

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
3535

3636
import java.io.Serializable;
37+
import java.util.Arrays;
3738
import java.util.List;
3839
import java.util.Map;
3940

@@ -92,17 +93,20 @@ public void parseSelectFields(JoinInfo joinInfo){
9293
String sideTableName = joinInfo.getSideTableName();
9394
String nonSideTableName = joinInfo.getNonSideTable();
9495
List<String> fields = Lists.newArrayList();
95-
int sideTableFieldIndex = 0;
96+
int sideTableFieldIndex;
9697

9798
for( int i=0; i<outFieldInfoList.size(); i++){
9899
FieldInfo fieldInfo = outFieldInfoList.get(i);
99100
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
100101
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
101102
fields.add(sideFieldName);
103+
sideTableFieldIndex = Arrays.asList(sideTableInfo.getFields()).indexOf(sideFieldName);
104+
if (sideTableFieldIndex == -1){
105+
throw new RuntimeException(String.format("unknown filed {%s} in sideTable {%s} ", sideFieldName, sideTableName));
106+
}
102107
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName()));
103108
sideFieldIndex.put(i, sideTableFieldIndex);
104109
sideFieldNameIndex.put(i, sideFieldName);
105-
sideTableFieldIndex++;
106110
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
107111
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
108112
inFieldIndex.put(i, nonSideIndex);
@@ -194,6 +198,7 @@ private void evalConstantEquation(SqlLiteral literal, SqlIdentifier identifier,
194198
.setCondition(constant.toString())
195199
.build();
196200
sideTableInfo.addPredicateInfo(predicate);
201+
sideTableInfo.addFullPredicateInfoes(predicate);
197202
}
198203

199204
private void checkSupport(SqlIdentifier identifier) {

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ private void parseSql(SqlNode sqlNode, Map<String, AbstractSideTableInfo> sideTa
113113

114114
private void fillToSideTableInfo(Map<String, AbstractSideTableInfo> sideTableMap, Map<String, String> tabMapping, List<PredicateInfo> predicateInfoList) {
115115
predicateInfoList.stream().filter(info -> sideTableMap.containsKey(tabMapping.getOrDefault(info.getOwnerTable(), info.getOwnerTable())))
116-
.map(info -> sideTableMap.get(tabMapping.getOrDefault(info.getOwnerTable(), info.getOwnerTable())).getPredicateInfoes().add(info))
116+
.map(info -> sideTableMap.get(tabMapping.getOrDefault(info.getOwnerTable(), info.getOwnerTable())).getFullPredicateInfoes().add(info))
117117
.count();
118118
}
119119

core/src/main/java/com/dtstack/flink/sql/side/SideSqlExec.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
import java.util.Map;
7676
import java.util.Queue;
7777
import java.util.Set;
78+
import java.util.stream.Collectors;
7879

7980
import static org.apache.calcite.sql.SqlKind.AS;
8081
import static org.apache.calcite.sql.SqlKind.INSERT;
@@ -506,6 +507,8 @@ private void joinFun(Object pollObj,
506507
sideTableInfo = sideTableMap.get(joinInfo.getRightTableAlias());
507508
}
508509

510+
extractActualSidePredicateInfos(joinInfo, sideTableInfo);
511+
509512
if (sideTableInfo == null) {
510513
throw new RuntimeException("can't not find side table:" + joinInfo.getRightTableName());
511514
}
@@ -592,6 +595,18 @@ private void joinFun(Object pollObj,
592595
}
593596
}
594597

598+
/**
599+
* 抽取维表本次真正使用的谓词集合
600+
* @param joinInfo 维表join信息
601+
* @param sideTableInfo 维表实体信息
602+
*/
603+
private void extractActualSidePredicateInfos(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
604+
sideTableInfo.setPredicateInfoes(sideTableInfo.getFullPredicateInfoes()
605+
.stream()
606+
.filter(e -> e.getOwnerTable().equals(joinInfo.getRightTableAlias()))
607+
.collect(Collectors.toList()));
608+
}
609+
595610
private boolean checkFieldsInfo(CreateTmpTableParser.SqlParserResult result, Table table) {
596611
List<String> fieldNames = new LinkedList<>();
597612
String fieldsInfo = result.getFieldsInfoStr();

core/src/main/java/com/dtstack/flink/sql/util/TableUtils.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,8 +758,7 @@ public static void addConstant(Map<String, Object> keyMap, AbstractSideTableInfo
758758
List<PredicateInfo> predicateInfos = sideTableInfo.getPredicateInfoes();
759759
final String name = sideTableInfo.getName();
760760
for (PredicateInfo info : predicateInfos) {
761-
if (info.getOwnerTable().equals(name)
762-
&& info.getOperatorName().equals("=")) {
761+
if (info.getOperatorName().equals("=")) {
763762
String condition = info.getCondition();
764763
Matcher matcher = stringPattern.matcher(condition);
765764
if (matcher.matches()) {

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/ExtendES5ApiCallBridge.java

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import transwarp.org.elasticsearch.action.bulk.BulkProcessor;
3434
import transwarp.org.elasticsearch.client.transport.TransportClient;
3535
import transwarp.org.elasticsearch.common.network.NetworkModule;
36+
import transwarp.org.elasticsearch.common.settings.Setting;
3637
import transwarp.org.elasticsearch.common.settings.Settings;
3738
import transwarp.org.elasticsearch.common.transport.TransportAddress;
3839
import transwarp.org.elasticsearch.common.unit.TimeValue;
@@ -70,28 +71,41 @@ public ExtendES5ApiCallBridge(List<InetSocketAddress> transportAddresses, Elasti
7071
@Override
7172
public TransportClient createClient(Map<String, String> clientConfig) throws IOException{
7273

73-
//1. login kdc with keytab and krb5 conf
74-
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
75-
esTableInfo.getPrincipal(),
76-
esTableInfo.getKeytab(),
77-
esTableInfo.getKrb5conf());
78-
79-
//2. set transwarp attributes
80-
Settings settings = Settings.builder().put(clientConfig)
81-
.put("client.transport.sniff", true)
82-
.put("security.enable", true)
83-
.put(NetworkModule.TRANSPORT_TYPE_KEY, "security-netty3")
84-
.build();
85-
86-
//3. build transport client with transwarp plugins
87-
TransportClient transportClient = ugi.doAs((PrivilegedAction<TransportClient>) () -> {
88-
TransportClient tmpClient = new PreBuiltTransportClient(settings,
89-
Collections.singletonList(DoorKeeperClientPlugin.class));
74+
TransportClient transportClient;
75+
76+
if (esTableInfo.isEnableKrb()) {
77+
//1. login kdc with keytab and krb5 conf
78+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
79+
esTableInfo.getPrincipal(),
80+
esTableInfo.getKeytab(),
81+
esTableInfo.getKrb5conf());
82+
83+
//2. set transwarp attributes
84+
Settings settings = Settings.builder().put(clientConfig)
85+
.put("client.transport.sniff", true)
86+
.put("security.enable", true)
87+
.put(NetworkModule.TRANSPORT_TYPE_KEY, "security-netty3")
88+
.build();
89+
90+
//3. build transport client with transwarp plugins
91+
transportClient = ugi.doAs((PrivilegedAction<TransportClient>) () -> {
92+
TransportClient tmpClient = new PreBuiltTransportClient(settings,
93+
Collections.singletonList(DoorKeeperClientPlugin.class));
94+
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
95+
tmpClient.addTransportAddress(transport);
96+
}
97+
return tmpClient;
98+
});
99+
} else {
100+
Settings settings = Settings.builder().put(clientConfig)
101+
.put("client.transport.sniff", true)
102+
.build();
103+
104+
transportClient = new PreBuiltTransportClient(settings);
90105
for (TransportAddress transport : ElasticsearchUtils.convertInetSocketAddresses(transportAddresses)) {
91-
tmpClient.addTransportAddress(transport);
106+
transportClient.addTransportAddress(transport);
92107
}
93-
return tmpClient;
94-
});
108+
}
95109

96110
return transportClient;
97111
}
@@ -140,18 +154,27 @@ public void configureBulkProcessorBackoff(
140154
@Override
141155
public boolean verifyClientConnection(TransportClient client) throws IOException {
142156

143-
//1. login kdc with keytab and krb5 conf
144-
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
145-
esTableInfo.getPrincipal(),
146-
esTableInfo.getKeytab(),
147-
esTableInfo.getKrb5conf());
148157

149-
//2. refresh availableNodes.
150-
boolean verifyResult = ugi.doAs((PrivilegedAction<Boolean>) () -> {
151-
LOG.info("Refresh client available nodes.");
158+
boolean verifyResult = false;
159+
160+
if (esTableInfo.isEnableKrb()) {
161+
//1. login kdc with keytab and krb5 conf
162+
UserGroupInformation ugi = KrbUtils.loginAndReturnUgi(
163+
esTableInfo.getPrincipal(),
164+
esTableInfo.getKeytab(),
165+
esTableInfo.getKrb5conf());
166+
167+
//2. refresh availableNodes.
168+
verifyResult = ugi.doAs((PrivilegedAction<Boolean>) () -> {
169+
LOG.info("Refresh client available nodes.");
170+
client.refreshAvailableNodes();
171+
return client.connectedNodes().isEmpty();
172+
});
173+
} else {
152174
client.refreshAvailableNodes();
153-
return client.connectedNodes().isEmpty();
154-
});
175+
verifyResult = client.connectedNodes().isEmpty();
176+
}
177+
155178

156179
if (!verifyResult) {
157180
return true;

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,6 @@ public boolean check() {
160160
Preconditions.checkArgument(NumberUtils.isNumber(number), "id must be a numeric type");
161161
});
162162
}
163-
164-
boolean allNotSet =
165-
Strings.isNullOrEmpty(principal) &&
166-
Strings.isNullOrEmpty(keytab) &&
167-
Strings.isNullOrEmpty(krb5conf);
168-
169-
Preconditions.checkState(!allNotSet, "xh's elasticsearch type of kerberos file is required");
170-
171163
return true;
172164
}
173165

elasticsearch7/elasticsearch7-side/elasticsearch7-all-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch7/Elasticsearch7AllReqRow.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ private SearchSourceBuilder initConfiguration(BoolQueryBuilder boolQueryBuilder)
208208
}
209209

210210
searchSourceBuilder.size(getFetchSize());
211-
searchSourceBuilder.sort("_id", SortOrder.DESC);
212211

213212
// fields included in the source data
214213
String[] sideFieldNames = StringUtils.split(sideInfo.getSideSelectFields().trim(), ",");

0 commit comments

Comments
 (0)