Skip to content

Commit b034f25

Browse files
cloud-fangatorsmile
authored andcommitted
[SPARK-22026][SQL] data source v2 write path
## What changes were proposed in this pull request? A working prototype for data source v2 write path. The writing framework is similar to the reading framework. i.e. `WriteSupport` -> `DataSourceV2Writer` -> `DataWriterFactory` -> `DataWriter`. Similar to the `FileCommitPotocol`, the writing API has job and task level commit/abort to support the transaction. ## How was this patch tested? new tests Author: Wenchen Fan <[email protected]> Closes #19269 from cloud-fan/data-source-v2-write.
1 parent 7fae799 commit b034f25

File tree

11 files changed

+842
-14
lines changed

11 files changed

+842
-14
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2;
19+
20+
import java.util.Optional;
21+
22+
import org.apache.spark.annotation.InterfaceStability;
23+
import org.apache.spark.sql.SaveMode;
24+
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
25+
import org.apache.spark.sql.types.StructType;
26+
27+
/**
28+
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
29+
* provide data writing ability and save the data to the data source.
30+
*/
31+
@InterfaceStability.Evolving
32+
public interface WriteSupport {
33+
34+
/**
35+
* Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data
36+
* sources can return None if there is no writing needed to be done according to the save mode.
37+
*
38+
* @param jobId A unique string for the writing job. It's possible that there are many writing
39+
* jobs running at the same time, and the returned {@link DataSourceV2Writer} should
40+
* use this job id to distinguish itself with writers of other jobs.
41+
* @param schema the schema of the data to be written.
42+
* @param mode the save mode which determines what to do when the data are already in this data
43+
* source, please refer to {@link SaveMode} for more details.
44+
* @param options the options for the returned data source writer, which is an immutable
45+
* case-insensitive string-to-string map.
46+
*/
47+
Optional<DataSourceV2Writer> createWriter(
48+
String jobId, StructType schema, SaveMode mode, DataSourceV2Options options);
49+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.writer;
19+
20+
import org.apache.spark.annotation.InterfaceStability;
21+
import org.apache.spark.sql.Row;
22+
import org.apache.spark.sql.SaveMode;
23+
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
24+
import org.apache.spark.sql.sources.v2.WriteSupport;
25+
import org.apache.spark.sql.types.StructType;
26+
27+
/**
28+
* A data source writer that is returned by
29+
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}.
30+
* It can mix in various writing optimization interfaces to speed up the data saving. The actual
31+
* writing logic is delegated to {@link DataWriter}.
32+
*
33+
* The writing procedure is:
34+
* 1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the
35+
* partitions of the input data(RDD).
36+
* 2. For each partition, create the data writer, and write the data of the partition with this
37+
* writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
38+
* exception happens during the writing, call {@link DataWriter#abort()}.
39+
* 3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
40+
* some writers are aborted, or the job failed with an unknown reason, call
41+
* {@link #abort(WriterCommitMessage[])}.
42+
*
43+
* Spark won't retry failed writing jobs, users should do it manually in their Spark applications if
44+
* they want to retry.
45+
*
46+
* Please refer to the document of commit/abort methods for detailed specifications.
47+
*
48+
* Note that, this interface provides a protocol between Spark and data sources for transactional
49+
* data writing, but the transaction here is Spark-level transaction, which may not be the
50+
* underlying storage transaction. For example, Spark successfully writes data to a Cassandra data
51+
* source, but Cassandra may need some more time to reach consistency at storage level.
52+
*/
53+
@InterfaceStability.Evolving
54+
public interface DataSourceV2Writer {
55+
56+
/**
57+
* Creates a writer factory which will be serialized and sent to executors.
58+
*/
59+
DataWriterFactory<Row> createWriterFactory();
60+
61+
/**
62+
* Commits this writing job with a list of commit messages. The commit messages are collected from
63+
* successful data writers and are produced by {@link DataWriter#commit()}. If this method
64+
* fails(throw exception), this writing job is considered to be failed, and
65+
* {@link #abort(WriterCommitMessage[])} will be called. The written data should only be visible
66+
* to data source readers if this method succeeds.
67+
*
68+
* Note that, one partition may have multiple committed data writers because of speculative tasks.
69+
* Spark will pick the first successful one and get its commit message. Implementations should be
70+
* aware of this and handle it correctly, e.g., have a mechanism to make sure only one data writer
71+
* can commit successfully, or have a way to clean up the data of already-committed writers.
72+
*/
73+
void commit(WriterCommitMessage[] messages);
74+
75+
/**
76+
* Aborts this writing job because some data writers are failed to write the records and aborted,
77+
* or the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])}
78+
* fails. If this method fails(throw exception), the underlying data source may have garbage that
79+
* need to be cleaned manually, but these garbage should not be visible to data source readers.
80+
*
81+
* Unless the abort is triggered by the failure of commit, the given messages should have some
82+
* null slots as there maybe only a few data writers that are committed before the abort
83+
* happens, or some data writers were committed but their commit messages haven't reached the
84+
* driver when the abort is triggered. So this is just a "best effort" for data sources to
85+
* clean up the data left by data writers.
86+
*/
87+
void abort(WriterCommitMessage[] messages);
88+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.writer;
19+
20+
import org.apache.spark.annotation.InterfaceStability;
21+
22+
/**
23+
* A data writer returned by {@link DataWriterFactory#createWriter(int, int)} and is
24+
* responsible for writing data for an input RDD partition.
25+
*
26+
* One Spark task has one exclusive data writer, so there is no thread-safe concern.
27+
*
28+
* {@link #write(Object)} is called for each record in the input RDD partition. If one record fails
29+
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
30+
* not be processed. If all records are successfully written, {@link #commit()} is called.
31+
*
32+
* If this data writer succeeds(all records are successfully written and {@link #commit()}
33+
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
34+
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])} with commit messages from other data
35+
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
36+
* exception will be sent to the driver side, and Spark will retry this writing task for some times,
37+
* each time {@link DataWriterFactory#createWriter(int, int)} gets a different `attemptNumber`,
38+
* and finally call {@link DataSourceV2Writer#abort(WriterCommitMessage[])} if all retry fail.
39+
*
40+
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
41+
* takes too long to finish. Different from retried tasks, which are launched one by one after the
42+
* previous one fails, speculative tasks are running simultaneously. It's possible that one input
43+
* RDD partition has multiple data writers with different `attemptNumber` running at the same time,
44+
* and data sources should guarantee that these data writers don't conflict and can work together.
45+
* Implementations can coordinate with driver during {@link #commit()} to make sure only one of
46+
* these data writers can commit successfully. Or implementations can allow all of them to commit
47+
* successfully, and have a way to revert committed data writers without the commit message, because
48+
* Spark only accepts the commit message that arrives first and ignore others.
49+
*
50+
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
51+
* source writers, or {@link org.apache.spark.sql.catalyst.InternalRow} for data source writers
52+
* that mix in {@link SupportsWriteInternalRow}.
53+
*/
54+
@InterfaceStability.Evolving
55+
public interface DataWriter<T> {
56+
57+
/**
58+
* Writes one record.
59+
*
60+
* If this method fails(throw exception), {@link #abort()} will be called and this data writer is
61+
* considered to be failed.
62+
*/
63+
void write(T record);
64+
65+
/**
66+
* Commits this writer after all records are written successfully, returns a commit message which
67+
* will be send back to driver side and pass to
68+
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])}.
69+
*
70+
* The written data should only be visible to data source readers after
71+
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])} succeeds, which means this method
72+
* should still "hide" the written data and ask the {@link DataSourceV2Writer} at driver side to
73+
* do the final commitment via {@link WriterCommitMessage}.
74+
*
75+
* If this method fails(throw exception), {@link #abort()} will be called and this data writer is
76+
* considered to be failed.
77+
*/
78+
WriterCommitMessage commit();
79+
80+
/**
81+
* Aborts this writer if it is failed. Implementations should clean up the data for already
82+
* written records.
83+
*
84+
* This method will only be called if there is one record failed to write, or {@link #commit()}
85+
* failed.
86+
*
87+
* If this method fails(throw exception), the underlying data source may have garbage that need
88+
* to be cleaned by {@link DataSourceV2Writer#abort(WriterCommitMessage[])} or manually, but
89+
* these garbage should not be visible to data source readers.
90+
*/
91+
void abort();
92+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.writer;
19+
20+
import java.io.Serializable;
21+
22+
import org.apache.spark.annotation.InterfaceStability;
23+
24+
/**
25+
* A factory of {@link DataWriter} returned by {@link DataSourceV2Writer#createWriterFactory()},
26+
* which is responsible for creating and initializing the actual data writer at executor side.
27+
*
28+
* Note that, the writer factory will be serialized and sent to executors, then the data writer
29+
* will be created on executors and do the actual writing. So {@link DataWriterFactory} must be
30+
* serializable and {@link DataWriter} doesn't need to be.
31+
*/
32+
@InterfaceStability.Evolving
33+
public interface DataWriterFactory<T> extends Serializable {
34+
35+
/**
36+
* Returns a data writer to do the actual writing work.
37+
*
38+
* @param partitionId A unique id of the RDD partition that the returned writer will process.
39+
* Usually Spark processes many RDD partitions at the same time,
40+
* implementations should use the partition id to distinguish writers for
41+
* different partitions.
42+
* @param attemptNumber Spark may launch multiple tasks with the same task id. For example, a task
43+
* failed, Spark launches a new task wth the same task id but different
44+
* attempt number. Or a task is too slow, Spark launches new tasks wth the
45+
* same task id but different attempt number, which means there are multiple
46+
* tasks with the same task id running at the same time. Implementations can
47+
* use this attempt number to distinguish writers of different task attempts.
48+
*/
49+
DataWriter<T> createWriter(int partitionId, int attemptNumber);
50+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.writer;
19+
20+
import org.apache.spark.annotation.Experimental;
21+
import org.apache.spark.annotation.InterfaceStability;
22+
import org.apache.spark.sql.Row;
23+
import org.apache.spark.sql.catalyst.InternalRow;
24+
25+
/**
26+
* A mix-in interface for {@link DataSourceV2Writer}. Data source writers can implement this
27+
* interface to write {@link InternalRow} directly and avoid the row conversion at Spark side.
28+
* This is an experimental and unstable interface, as {@link InternalRow} is not public and may get
29+
* changed in the future Spark versions.
30+
*/
31+
32+
@InterfaceStability.Evolving
33+
@Experimental
34+
@InterfaceStability.Unstable
35+
public interface SupportsWriteInternalRow extends DataSourceV2Writer {
36+
37+
@Override
38+
default DataWriterFactory<Row> createWriterFactory() {
39+
throw new IllegalStateException(
40+
"createWriterFactory should not be called with SupportsWriteInternalRow.");
41+
}
42+
43+
DataWriterFactory<InternalRow> createInternalRowWriterFactory();
44+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.sources.v2.writer;
19+
20+
import java.io.Serializable;
21+
22+
import org.apache.spark.annotation.InterfaceStability;
23+
24+
/**
25+
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
26+
* as the input parameter of {@link DataSourceV2Writer#commit(WriterCommitMessage[])}.
27+
*
28+
* This is an empty interface, data sources should define their own message class and use it in
29+
* their {@link DataWriter#commit()} and {@link DataSourceV2Writer#commit(WriterCommitMessage[])}
30+
* implementations.
31+
*/
32+
@InterfaceStability.Evolving
33+
public interface WriterCommitMessage extends Serializable {}

0 commit comments

Comments
 (0)