Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion docs/en/connector-v2/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| skip_header_row_number | long | no | 0 |
| schema | config | no | - |
| sheet_name | string | no | - |
| excel_engine | string | no | POI |
| excel_engine | string | no | POI |
| xml_row_tag | string | no | - |
| xml_use_attr_format | boolean | no | - |
| csv_use_header_line | boolean | no | false |
Expand All @@ -80,6 +80,8 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| tables_configs | list | no | used to define a multiple table task |
| file_filter_modified_start | string | no | - |
| file_filter_modified_end | string | no | - |
| enable_split_file | boolean | no | false |
| split_size | long | no | 134217728 |

### path [string]

Expand Down Expand Up @@ -415,6 +417,14 @@ File modification time filter. The connector will filter some files base on the

File modification time filter. The connector will filter some files base on the last modification end time (not include end time). The default data format is `yyyy-MM-dd HH:mm:ss`.

### enable_split_file [string]

Turn on the file splitting function, the default is false。It can be selected when the file type is csv, text, json and non-compressed format.

### split_size

File split size, which can be filled in when the enable_split_file parameter is true. The unit is the number of bytes. The default value is the number of bytes of 128MB, which is 134217728.

### common options

Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details
Expand Down
10 changes: 10 additions & 0 deletions docs/zh/connector-v2/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ import ChangeLog from '../changelog/connector-file-local.md';
| tables_configs | list | 否 | 用于定义多表任务 |
| file_filter_modified_start | string | 否 | - |
| file_filter_modified_end | string | 否 | - |
| enable_split_file | boolean | 否 | false |
| split_size | long | 否 | 134217728 |

### path [string]

Expand Down Expand Up @@ -415,6 +417,14 @@ null_format 定义哪些字符串可以表示为 null。

按照最后修改时间过滤文件。 要过滤的结束时间(不包括改时间),时间格式是:`yyyy-MM-dd HH:mm:ss`。

### enable_split_file [string]

开启文件分割功能,默认为false。文件类型为csv、text、json、非压缩格式时可选择。

### split_size

文件分割大小,enable_split_file参数为true时可以填写。单位是字节数。默认值为128MB的字节数,即134217728。

### 通用选项

数据源插件通用参数,请参阅 [数据源通用选项](../source-common-options.md) 了解详情
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,17 @@ public class FileBaseOptions extends ConnectorCommonOptions {
.enumType(ArchiveCompressFormat.class)
.defaultValue(ArchiveCompressFormat.NONE)
.withDescription("Archive compression codec");

public static final Option<Boolean> ENABLE_SPLIT_FILE =
Options.key("enable_split_file")
.booleanType()
.defaultValue(false)
.withDescription("Turn on the file splitting function, the default is false");

public static final Option<Long> SPLIT_SIZE =
Options.key("split_size")
.longType()
.defaultValue(128 * 1024 * 1024L)
.withDescription(
"File split size, which can be filled in when the enable_split_file parameter is true. The unit is the number of bytes. The default value is the number of bytes of 128MB, which is 128*1024*1024.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.MultipleTableFileSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;

Expand All @@ -41,10 +43,19 @@ public abstract class BaseMultipleTableFileSource
SupportColumnProjection {

private final BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig;
private final FileSplitStrategy fileSplitStrategy;

public BaseMultipleTableFileSource(
BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig) {
this.baseMultipleTableFileSourceConfig = baseMultipleTableFileSourceConfig;
this.fileSplitStrategy = new DefaultFileSplitStrategy();
}

public BaseMultipleTableFileSource(
BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig,
FileSplitStrategy fileSplitStrategy) {
this.baseMultipleTableFileSourceConfig = baseMultipleTableFileSourceConfig;
this.fileSplitStrategy = fileSplitStrategy;
}

@Override
Expand Down Expand Up @@ -72,7 +83,7 @@ public SourceReader<SeaTunnelRow, FileSourceSplit> createReader(
public SourceSplitEnumerator<FileSourceSplit, FileSourceState> createEnumerator(
SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext) {
return new MultipleTableFileSourceSplitEnumerator(
enumeratorContext, baseMultipleTableFileSourceConfig);
enumeratorContext, baseMultipleTableFileSourceConfig, fileSplitStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipParameters;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FileStatus;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -94,6 +97,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
protected Date fileModifiedEndDate;
protected String fileBasePath;

protected boolean enableSplitFile;

@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
Expand Down Expand Up @@ -242,6 +247,10 @@ public void setPluginConfig(Config pluginConfig) {
pluginConfig.getString(
FileBaseSourceOptions.FILE_FILTER_MODIFIED_END.key()));
}
if (pluginConfig.hasPath(FileBaseSourceOptions.ENABLE_SPLIT_FILE.key())) {
enableSplitFile =
pluginConfig.getBoolean(FileBaseSourceOptions.ENABLE_SPLIT_FILE.key());
}
}

@Override
Expand All @@ -250,12 +259,13 @@ public SeaTunnelRowType getActualSeaTunnelRowTypeInfo() {
}

protected void resolveArchiveCompressedInputStream(
String path,
String tableId,
FileSourceSplit split,
Collector<SeaTunnelRow> output,
Map<String, String> partitionsMap,
FileFormat fileFormat)
throws IOException {
String path = split.getFilePath();
String tableId = split.getTableId();
switch (archiveCompressFormat) {
case ZIP:
try (ZipInputStream zis =
Expand All @@ -264,8 +274,7 @@ protected void resolveArchiveCompressedInputStream(
while ((entry = zis.getNextEntry()) != null) {
if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
readProcess(
path,
tableId,
split,
output,
copyInputStream(zis),
partitionsMap,
Expand All @@ -282,8 +291,7 @@ protected void resolveArchiveCompressedInputStream(
while ((entry = tarInput.getNextTarEntry()) != null) {
if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
readProcess(
path,
tableId,
split,
output,
copyInputStream(tarInput),
partitionsMap,
Expand All @@ -302,8 +310,7 @@ protected void resolveArchiveCompressedInputStream(
while ((entry = tarIn.getNextTarEntry()) != null) {
if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
readProcess(
path,
tableId,
split,
output,
copyInputStream(tarIn),
partitionsMap,
Expand Down Expand Up @@ -331,13 +338,11 @@ protected void resolveArchiveCompressedInputStream(
fileName = path;
}
}
readProcess(
path, tableId, output, copyInputStream(gzipIn), partitionsMap, fileName);
readProcess(split, output, copyInputStream(gzipIn), partitionsMap, fileName);
break;
case NONE:
readProcess(
path,
tableId,
split,
output,
hadoopFileSystemProxy.getInputStream(path),
partitionsMap,
Expand All @@ -348,8 +353,7 @@ protected void resolveArchiveCompressedInputStream(
"The file does not support this archive compress type: {}",
archiveCompressFormat);
readProcess(
path,
tableId,
split,
output,
hadoopFileSystemProxy.getInputStream(path),
partitionsMap,
Expand All @@ -358,8 +362,7 @@ protected void resolveArchiveCompressedInputStream(
}

protected void readProcess(
String path,
String tableId,
FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
Expand Down Expand Up @@ -451,6 +454,19 @@ protected boolean checkFileType(String fileName, FileFormat fileFormat) {
return false;
}

protected static InputStream safeSlice(InputStream in, long start, long length)
throws IOException {
long toSkip = start;
while (toSkip > 0) {
long skipped = in.skip(toSkip);
if (skipped <= 0) {
throw new SeaTunnelException("skipped error");
}
toSkip -= skipped;
}
return new BoundedInputStream(in, length);
}

@Override
public void close() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.format.csv.CsvDeserializationSchema;
import org.apache.seatunnel.format.csv.processor.CsvLineProcessor;
import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor;
Expand Down Expand Up @@ -75,13 +76,20 @@ public class CsvReadStrategy extends AbstractReadStrategy {
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.CSV);
resolveArchiveCompressedInputStream(
new FileSourceSplit(tableId, path), output, partitionsMap, FileFormat.CSV);
}

@Override
public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException {
Map<String, String> partitionsMap = parsePartitionsByPath(split.getFilePath());
resolveArchiveCompressedInputStream(split, output, partitionsMap, FileFormat.CSV);
}

@Override
public void readProcess(
String path,
String tableId,
FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
Expand All @@ -103,11 +111,18 @@ public void readProcess(
actualInputStream = inputStream;
break;
}
// rebuild inputStream
if (enableSplitFile && split.getLength() > -1) {
actualInputStream = safeSlice(inputStream, split.getStart(), split.getLength());
}
Builder builder =
CSVFormat.EXCEL.builder().setIgnoreEmptyLines(true).setDelimiter(getDelimiter());
CSVFormat csvFormat = builder.build();
if (firstLineAsHeader) {
csvFormat = csvFormat.withFirstRecordAsHeader();
// if enableSplitFile is true,no need to skip
if (!enableSplitFile) {
if (firstLineAsHeader) {
csvFormat = csvFormat.withFirstRecordAsHeader();
}
}
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(actualInputStream, encoding));
Expand All @@ -119,12 +134,15 @@ public void readProcess(
reader.reset();
}
// skip lines
for (int i = 0; i < skipHeaderNumber; i++) {
if (reader.readLine() == null) {
throw new IOException(
String.format(
"File [%s] has fewer lines than expected to skip.",
currentFileName));
// if enableSplitFile is true,no need to skip
if (!enableSplitFile) {
for (int i = 0; i < skipHeaderNumber; i++) {
if (reader.readLine() == null) {
throw new IOException(
String.format(
"File [%s] has fewer lines than expected to skip.",
currentFileName));
}
}
}
// read lines
Expand Down Expand Up @@ -161,7 +179,7 @@ public void readProcess(
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
seaTunnelRow.setTableId(split.getTableId());
output.collect(seaTunnelRow);
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.excel.ExcelCellUtils;
import org.apache.seatunnel.connectors.seatunnel.file.excel.ExcelReaderListener;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;

import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
Expand Down Expand Up @@ -77,19 +78,19 @@ public class ExcelReadStrategy extends AbstractReadStrategy {
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.EXCEL);
resolveArchiveCompressedInputStream(
new FileSourceSplit(tableId, path), output, partitionsMap, FileFormat.EXCEL);
}

@Override
protected void readProcess(
String path,
String tableId,
FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
String currentFileName)
throws IOException {

String tableId = split.getTableId();
if (skipHeaderNumber > Integer.MAX_VALUE || skipHeaderNumber < Integer.MIN_VALUE) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
Expand Down
Loading