Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,17 @@ public class FileBaseOptions extends ConnectorCommonOptions {
.enumType(ArchiveCompressFormat.class)
.defaultValue(ArchiveCompressFormat.NONE)
.withDescription("Archive compression codec");

public static final Option<Boolean> ENABLE_SPLIT_FILE =
Options.key("enable_split_file")
.booleanType()
.defaultValue(false)
.withDescription("Turn on the file splitting function, the default is false");

public static final Option<Long> SPLIT_SIZE =
Options.key("split_size")
.longType()
.defaultValue(128 * 1024 * 1024L)
.withDescription(
"File split size, which can be filled in when the enable_split_file parameter is true. The unit is the number of bytes. The default value is the number of bytes of 128MB, which is 128*1024*1024.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseMultipleTableFileSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.MultipleTableFileSourceReader;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.DefaultFileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSplitStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.MultipleTableFileSourceSplitEnumerator;
import org.apache.seatunnel.connectors.seatunnel.file.source.state.FileSourceState;

Expand All @@ -41,10 +43,19 @@ public abstract class BaseMultipleTableFileSource
SupportColumnProjection {

private final BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig;
private final FileSplitStrategy fileSplitStrategy;

public BaseMultipleTableFileSource(
BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig) {
this.baseMultipleTableFileSourceConfig = baseMultipleTableFileSourceConfig;
this.fileSplitStrategy = new DefaultFileSplitStrategy();
}

public BaseMultipleTableFileSource(
BaseMultipleTableFileSourceConfig baseMultipleTableFileSourceConfig,
FileSplitStrategy fileSplitStrategy) {
this.baseMultipleTableFileSourceConfig = baseMultipleTableFileSourceConfig;
this.fileSplitStrategy = fileSplitStrategy;
}

@Override
Expand Down Expand Up @@ -72,7 +83,7 @@ public SourceReader<SeaTunnelRow, FileSourceSplit> createReader(
public SourceSplitEnumerator<FileSourceSplit, FileSourceState> createEnumerator(
SourceSplitEnumerator.Context<FileSourceSplit> enumeratorContext) {
return new MultipleTableFileSourceSplitEnumerator(
enumeratorContext, baseMultipleTableFileSourceConfig);
enumeratorContext, baseMultipleTableFileSourceConfig, fileSplitStrategy);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,19 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.file.config.ArchiveCompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;

import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipParameters;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.fs.FileStatus;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -94,6 +97,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
protected Date fileModifiedEndDate;
protected String fileBasePath;

protected boolean enableSplitFile;

@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
Expand Down Expand Up @@ -242,6 +247,10 @@ public void setPluginConfig(Config pluginConfig) {
pluginConfig.getString(
FileBaseSourceOptions.FILE_FILTER_MODIFIED_END.key()));
}
if (pluginConfig.hasPath(FileBaseSourceOptions.ENABLE_SPLIT_FILE.key())) {
enableSplitFile =
pluginConfig.getBoolean(FileBaseSourceOptions.ENABLE_SPLIT_FILE.key());
}
}

@Override
Expand All @@ -250,12 +259,13 @@ public SeaTunnelRowType getActualSeaTunnelRowTypeInfo() {
}

protected void resolveArchiveCompressedInputStream(
String path,
String tableId,
FileSourceSplit split,
Collector<SeaTunnelRow> output,
Map<String, String> partitionsMap,
FileFormat fileFormat)
throws IOException {
String path = split.getFilePath();
String tableId = split.getTableId();
switch (archiveCompressFormat) {
case ZIP:
try (ZipInputStream zis =
Expand All @@ -264,8 +274,7 @@ protected void resolveArchiveCompressedInputStream(
while ((entry = zis.getNextEntry()) != null) {
if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
readProcess(
path,
tableId,
split,
output,
copyInputStream(zis),
partitionsMap,
Expand All @@ -282,8 +291,7 @@ protected void resolveArchiveCompressedInputStream(
while ((entry = tarInput.getNextTarEntry()) != null) {
if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
readProcess(
path,
tableId,
split,
output,
copyInputStream(tarInput),
partitionsMap,
Expand All @@ -302,8 +310,7 @@ protected void resolveArchiveCompressedInputStream(
while ((entry = tarIn.getNextTarEntry()) != null) {
if (!entry.isDirectory() && checkFileType(entry.getName(), fileFormat)) {
readProcess(
path,
tableId,
split,
output,
copyInputStream(tarIn),
partitionsMap,
Expand Down Expand Up @@ -331,13 +338,11 @@ protected void resolveArchiveCompressedInputStream(
fileName = path;
}
}
readProcess(
path, tableId, output, copyInputStream(gzipIn), partitionsMap, fileName);
readProcess(split, output, copyInputStream(gzipIn), partitionsMap, fileName);
break;
case NONE:
readProcess(
path,
tableId,
split,
output,
hadoopFileSystemProxy.getInputStream(path),
partitionsMap,
Expand All @@ -348,8 +353,7 @@ protected void resolveArchiveCompressedInputStream(
"The file does not support this archive compress type: {}",
archiveCompressFormat);
readProcess(
path,
tableId,
split,
output,
hadoopFileSystemProxy.getInputStream(path),
partitionsMap,
Expand All @@ -358,8 +362,7 @@ protected void resolveArchiveCompressedInputStream(
}

protected void readProcess(
String path,
String tableId,
FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
Expand Down Expand Up @@ -451,6 +454,19 @@ protected boolean checkFileType(String fileName, FileFormat fileFormat) {
return false;
}

protected static InputStream safeSlice(InputStream in, long start, long length)
throws IOException {
long toSkip = start;
while (toSkip > 0) {
long skipped = in.skip(toSkip);
if (skipped <= 0) {
throw new SeaTunnelException("skipped error");
}
toSkip -= skipped;
}
return new BoundedInputStream(in, length);
}

@Override
public void close() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.format.csv.CsvDeserializationSchema;
import org.apache.seatunnel.format.csv.processor.CsvLineProcessor;
import org.apache.seatunnel.format.csv.processor.DefaultCsvLineProcessor;
Expand Down Expand Up @@ -75,13 +76,20 @@ public class CsvReadStrategy extends AbstractReadStrategy {
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.CSV);
resolveArchiveCompressedInputStream(
new FileSourceSplit(tableId, path), output, partitionsMap, FileFormat.CSV);
}

@Override
public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException {
Map<String, String> partitionsMap = parsePartitionsByPath(split.getFilePath());
resolveArchiveCompressedInputStream(split, output, partitionsMap, FileFormat.CSV);
}

@Override
public void readProcess(
String path,
String tableId,
FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
Expand All @@ -103,11 +111,18 @@ public void readProcess(
actualInputStream = inputStream;
break;
}
// rebuild inputStream
if (enableSplitFile && split.getLength() > -1) {
actualInputStream = safeSlice(inputStream, split.getStart(), split.getLength());
}
Builder builder =
CSVFormat.EXCEL.builder().setIgnoreEmptyLines(true).setDelimiter(getDelimiter());
CSVFormat csvFormat = builder.build();
if (firstLineAsHeader) {
csvFormat = csvFormat.withFirstRecordAsHeader();
// if enableSplitFile is true,no need to skip
if (!enableSplitFile) {
if (firstLineAsHeader) {
csvFormat = csvFormat.withFirstRecordAsHeader();
}
}
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(actualInputStream, encoding));
Expand All @@ -119,12 +134,15 @@ public void readProcess(
reader.reset();
}
// skip lines
for (int i = 0; i < skipHeaderNumber; i++) {
if (reader.readLine() == null) {
throw new IOException(
String.format(
"File [%s] has fewer lines than expected to skip.",
currentFileName));
// if enableSplitFile is true,no need to skip
if (!enableSplitFile) {
for (int i = 0; i < skipHeaderNumber; i++) {
if (reader.readLine() == null) {
throw new IOException(
String.format(
"File [%s] has fewer lines than expected to skip.",
currentFileName));
}
}
}
// read lines
Expand Down Expand Up @@ -161,7 +179,7 @@ public void readProcess(
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
seaTunnelRow.setTableId(split.getTableId());
output.collect(seaTunnelRow);
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.excel.ExcelCellUtils;
import org.apache.seatunnel.connectors.seatunnel.file.excel.ExcelReaderListener;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;

import org.apache.poi.hssf.usermodel.HSSFWorkbook;
import org.apache.poi.ss.usermodel.Cell;
Expand Down Expand Up @@ -77,19 +78,19 @@ public class ExcelReadStrategy extends AbstractReadStrategy {
@Override
public void read(String path, String tableId, Collector<SeaTunnelRow> output) {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.EXCEL);
resolveArchiveCompressedInputStream(
new FileSourceSplit(tableId, path), output, partitionsMap, FileFormat.EXCEL);
}

@Override
protected void readProcess(
String path,
String tableId,
FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
String currentFileName)
throws IOException {

String tableId = split.getTableId();
if (skipHeaderNumber > Integer.MAX_VALUE || skipHeaderNumber < Integer.MIN_VALUE) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;

import io.airlift.compress.lzo.LzopCodec;
Expand Down Expand Up @@ -78,13 +79,20 @@ public void setCatalogTable(CatalogTable catalogTable) {
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
Map<String, String> partitionsMap = parsePartitionsByPath(path);
resolveArchiveCompressedInputStream(path, tableId, output, partitionsMap, FileFormat.JSON);
resolveArchiveCompressedInputStream(
new FileSourceSplit(tableId, path), output, partitionsMap, FileFormat.JSON);
}

@Override
public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException {
Map<String, String> partitionsMap = parsePartitionsByPath(split.getFilePath());
resolveArchiveCompressedInputStream(split, output, partitionsMap, FileFormat.JSON);
}

@Override
public void readProcess(
String path,
String tableId,
FileSourceSplit split,
Collector<SeaTunnelRow> output,
InputStream inputStream,
Map<String, String> partitionsMap,
Expand All @@ -106,6 +114,10 @@ public void readProcess(
actualInputStream = inputStream;
break;
}
// rebuild inputStream
if (enableSplitFile && split.getLength() > -1) {
actualInputStream = safeSlice(inputStream, split.getStart(), split.getLength());
}
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(actualInputStream, encoding))) {
reader.lines()
Expand All @@ -121,7 +133,7 @@ public void readProcess(
seaTunnelRow.setField(index++, value);
}
}
seaTunnelRow.setTableId(tableId);
seaTunnelRow.setTableId(split.getTableId());
output.collect(seaTunnelRow);
} catch (IOException e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void pollNext(Collector<SeaTunnelRow> output) {
+ "]");
}
try {
readStrategy.read(split.getFilePath(), split.getTableId(), output);
readStrategy.read(split, output);
} catch (Exception e) {
String errorMsg =
String.format("Read data from this file [%s] failed", split.splitId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -45,6 +46,11 @@ public interface ReadStrategy extends Serializable, Closeable {
void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException;

default void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException {
read(split.getFilePath(), split.getTableId(), output);
}

SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnectorException;

default SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath, String path)
Expand Down
Loading