Skip to content

Commit 50baa6e

Browse files
committed
[fix-36356][ftp]解决ftp在windows写入问题
1 parent 8d5a531 commit 50baa6e

File tree

2 files changed

+50
-10
lines changed

2 files changed

+50
-10
lines changed

flinkx-ftp/flinkx-ftp-core/src/main/java/com/dtstack/flinkx/ftp/FtpHandler.java

Lines changed: 46 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,10 @@ public boolean isDirExist(String directoryPath) {
118118
originDir = ftpClient.printWorkingDirectory();
119119
ftpClient.enterLocalPassiveMode();
120120
FTPFile[] ftpFiles = ftpClient.listFiles(new String(directoryPath.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING));
121-
if(ftpFiles.length == 0){
122-
throw new FileNotFoundException("file or path is not exist, please check the path or the permissions of account, path = " + directoryPath);
121+
if(ftpFiles.length == 0 && !ftpClient.changeWorkingDirectory(new String(directoryPath.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING))){
122+
//不存在返回false即可 makeDir参数 在写入的时候 如果目录不存在,默认是创建对应的目录 所以这儿不应该报错
123+
return false;
124+
// throw new FileNotFoundException("file or path is not exist, please check the path or the permissions of account, path = " + directoryPath);
123125
}
124126
boolean positiveCompletion = FTPReply.isPositiveCompletion(ftpClient.cwd(new String(directoryPath.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING)));
125127
if(positiveCompletion && ftpFiles.length == 1 && ftpFiles[0].isFile()){
@@ -154,6 +156,10 @@ public boolean isFileExist(String filePath) {
154156
@Override
155157
public List<String> getFiles(String path) {
156158
List<String> sources = new ArrayList<>();
159+
//如果path不存在 就返回空文件夹
160+
if(!isExist(path)){
161+
return sources;
162+
}
157163
ftpClient.enterLocalPassiveMode();
158164
if(isFileExist(path)) {
159165
sources.add(path);
@@ -236,12 +242,13 @@ public void mkDirRecursive(String directoryPath){
236242

237243
private boolean mkDirSingleHierarchy(String directoryPath) throws IOException {
238244
boolean isDirExist = this.ftpClient
239-
.changeWorkingDirectory(directoryPath);
245+
.changeWorkingDirectory(new String(directoryPath.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING));
240246
// 如果directoryPath目录不存在,则创建
241247
if (!isDirExist) {
242-
int replayCode = this.ftpClient.mkd(directoryPath);
248+
int replayCode = this.ftpClient.mkd(new String(directoryPath.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING));
243249
if (replayCode != FTPReply.COMMAND_OK
244-
&& replayCode != FTPReply.PATHNAME_CREATED) {
250+
&& replayCode != FTPReply.PATHNAME_CREATED && replayCode != FTPReply.FILE_ACTION_OK) {
251+
LOG.warn("create path [{}] failed ,replayCode is {} and reply is {} ", directoryPath, replayCode, ftpClient.getReplyString());
245252
return false;
246253
}
247254
}
@@ -257,7 +264,7 @@ public OutputStream getOutputStream(String filePath) {
257264
this.ftpClient.changeWorkingDirectory(parentDir);
258265
this.printWorkingDirectory();
259266
OutputStream writeOutputStream = this.ftpClient
260-
.appendFileStream(filePath);
267+
.appendFileStream(new String(filePath.getBytes(controlEncoding),FTP.DEFAULT_CONTROL_ENCODING));
261268
String message = String.format(
262269
"打开FTP文件[%s]获取写出流时出错,请确认文件%s有权限创建,有权限写出等", filePath,
263270
filePath);
@@ -293,7 +300,7 @@ public void deleteAllFilesInDir(String dir, List<String> exclude) {
293300
}
294301

295302
try {
296-
FTPFile[] ftpFiles = ftpClient.listFiles(new String(dir.getBytes(StandardCharsets.UTF_8),FTP.DEFAULT_CONTROL_ENCODING));
303+
FTPFile[] ftpFiles = ftpClient.listFiles(new String(dir.getBytes(controlEncoding),FTP.DEFAULT_CONTROL_ENCODING));
297304
if(ftpFiles != null) {
298305
for(FTPFile ftpFile : ftpFiles) {
299306
if(CollectionUtils.isNotEmpty(exclude) && exclude.contains(ftpFile.getName())){
@@ -307,15 +314,15 @@ public void deleteAllFilesInDir(String dir, List<String> exclude) {
307314
}
308315

309316
if(CollectionUtils.isEmpty(exclude)){
310-
ftpClient.rmd(dir);
317+
ftpClient.rmd(new String(dir.getBytes(controlEncoding),FTP.DEFAULT_CONTROL_ENCODING));
311318
}
312319
} catch (IOException e) {
313320
LOG.error("", e);
314321
throw new RuntimeException(e);
315322
}
316323
} else if(isFileExist(dir)) {
317324
try {
318-
ftpClient.deleteFile(dir);
325+
ftpClient.deleteFile(new String(dir.getBytes(controlEncoding),FTP.DEFAULT_CONTROL_ENCODING));
319326
} catch (IOException e) {
320327
LOG.error("", e);
321328
throw new RuntimeException(e);
@@ -365,7 +372,7 @@ public List<String> listDirs(String path) {
365372

366373
@Override
367374
public void rename(String oldPath, String newPath) throws IOException {
368-
ftpClient.rename(oldPath, newPath);
375+
ftpClient.rename(new String(oldPath.getBytes(controlEncoding),FTP.DEFAULT_CONTROL_ENCODING), new String(newPath.getBytes(controlEncoding),FTP.DEFAULT_CONTROL_ENCODING));
369376
}
370377

371378
@Override
@@ -380,4 +387,33 @@ public void completePendingCommand() throws IOException {
380387
throw new IOException(ExceptionUtil.getErrorMessage(e));
381388
}
382389
}
390+
391+
392+
/**
393+
* 判断路径是否存在
394+
* @param path 判断的路径
395+
* @return true 存在 false 不存在
396+
*/
397+
private boolean isExist(String path) {
398+
String originDir = null;
399+
try {
400+
originDir = ftpClient.printWorkingDirectory();
401+
ftpClient.enterLocalPassiveMode();
402+
FTPFile[] ftpFiles = ftpClient.listFiles(new String(path.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING));
403+
//空文件夹 或者 不存在的文件夹 length都是0 但是changeWorkingDirectory为true代表是空文件夹
404+
return ftpFiles.length != 0 || ftpClient.changeWorkingDirectory(new String(path.getBytes(controlEncoding), FTP.DEFAULT_CONTROL_ENCODING));
405+
} catch (IOException e) {
406+
String message = String.format("判断:[%s]是否存在时发生I/O异常,请确认与ftp服务器的连接正常", path);
407+
LOG.error(message);
408+
throw new RuntimeException(message, e);
409+
} finally {
410+
if (originDir != null) {
411+
try {
412+
ftpClient.changeWorkingDirectory(originDir);
413+
} catch (IOException e) {
414+
LOG.error(e.getMessage());
415+
}
416+
}
417+
}
418+
}
383419
}

flinkx-ftp/flinkx-ftp-reader/src/main/java/com/dtstack/flinkx/ftp/reader/FtpInputFormat.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.dtstack.flinkx.reader.MetaColumn;
2727
import com.dtstack.flinkx.util.GsonUtil;
2828
import com.dtstack.flinkx.util.StringUtil;
29+
import org.apache.commons.collections.CollectionUtils;
2930
import org.apache.flink.core.io.InputSplit;
3031
import org.apache.flink.types.Row;
3132

@@ -74,6 +75,9 @@ public InputSplit[] createInputSplitsInternal(int minNumSplits) throws Exception
7475
files.addAll(ftpHandler.getFiles(p.trim()));
7576
}
7677
}
78+
if (CollectionUtils.isEmpty(files)) {
79+
throw new RuntimeException("There are no readable files in directory " + path);
80+
}
7781
LOG.info("FTP files = {}", GsonUtil.GSON.toJson(files));
7882
int numSplits = (Math.min(files.size(), minNumSplits));
7983
FtpInputSplit[] ftpInputSplits = new FtpInputSplit[numSplits];

0 commit comments

Comments
 (0)