Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address comments
  • Loading branch information
cloud-fan committed Oct 19, 2017
commit 7eeb3b0bd15644d3facddefcdd2a218316573953
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,15 @@ public interface WriteSupport {
* Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data
* sources can return None if there is no writing needed to be done according to the save mode.
*
* @param jobId A unique string for the writing job. It's possible that there are many writing
* jobs running at the same time, and the returned {@link DataSourceV2Writer} should
* use this job id to distinguish itself with writers of other jobs.
* @param schema the schema of the data to be written.
* @param mode the save mode which determines what to do when the data are already in this data
* source, please refer to {@link SaveMode} for more details.
* @param options the options for the returned data source writer, which is an immutable
* case-insensitive string-to-string map.
*/
Optional<DataSourceV2Writer> createWriter(
StructType schema, SaveMode mode, DataSourceV2Options options);
String jobId, StructType schema, SaveMode mode, DataSourceV2Options options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

/**
* A data source writer that is returned by
* {@link WriteSupport#createWriter(StructType, SaveMode, DataSourceV2Options)}.
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}.
* It can mix in various writing optimization interfaces to speed up the data saving. The actual
* writing logic is delegated to {@link DataWriter}.
*
Expand Down Expand Up @@ -63,7 +63,7 @@ public interface DataSourceV2Writer {
* successful data writers and are produced by {@link DataWriter#commit()}. If this method
* fails(throw exception), this writing job is considered to be failed, and
* {@link #abort(WriterCommitMessage[])} will be called. The written data should only be visible
* to data source readers if this method successes.
* to data source readers if this method succeeds.
*
* Note that, one partition may have multiple committed data writers because of speculative tasks.
* Spark will pick the first successful one and get its commit message. Implementations should be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.spark.annotation.InterfaceStability;

/**
* A data writer returned by {@link DataWriterFactory#createWriter(int, int, int)} and is
* A data writer returned by {@link DataWriterFactory#createWriter(int, int)} and is
* responsible for writing data for an input RDD partition.
*
* One Spark task has one exclusive data writer, so there is no thread-safe concern.
Expand All @@ -29,12 +29,12 @@
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
* not be processed. If all records are successfully written, {@link #commit()} is called.
*
* If this data writer successes(all records are successfully written and {@link #commit()}
* successes), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])} with commit messages from other data
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
* exception will be sent to the driver side, and Spark will retry this writing task for some times,
* each time {@link DataWriterFactory#createWriter(int, int, int)} gets a different `attemptNumber`,
* each time {@link DataWriterFactory#createWriter(int, int)} gets a different `attemptNumber`,
* and finally call {@link DataSourceV2Writer#abort(WriterCommitMessage[])} if all retry fail.
*
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
Expand Down Expand Up @@ -68,7 +68,7 @@ public interface DataWriter<T> {
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])}.
*
* The written data should only be visible to data source readers after
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])} successes, which means this method
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])} succeeds, which means this method
* should still "hide" the written data and ask the {@link DataSourceV2Writer} at driver side to
* do the final commitment via {@link WriterCommitMessage}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,16 @@ public interface DataWriterFactory<T> extends Serializable {
/**
* Returns a data writer to do the actual writing work.
*
* @param stageId The id of the Spark stage that runs the returned writer.
* @param partitionId The id of the RDD partition that the returned writer will process.
* @param attemptNumber The attempt number of the Spark task that runs the returned writer, which
* is usually 0 if the task is not a retried task or a speculative task.
* @param partitionId A unique id of the RDD partition that the returned writer will process.
* Usually Spark processes many RDD partitions at the same time,
* implementations should use the partition id to distinguish writers for
* different partitions.
* @param attemptNumber Spark may launch multiple tasks with the same task id. For example, a task
* failed, Spark launches a new task wth the same task id but different
* attempt number. Or a task is too slow, Spark launches new tasks wth the
* same task id but different attempt number, which means there are multiple
* tasks with the same task id running at the same time. Implementations can
* use this attempt number to distinguish writers of different task attempts.
*/
DataWriter<T> createWriter(int stageId, int partitionId, int attemptNumber);
DataWriter<T> createWriter(int partitionId, int attemptNumber);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql

import java.util.{Locale, Properties}
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Properties, UUID}

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -238,7 +239,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
cls.newInstance() match {
case ds: WriteSupport =>
val options = new DataSourceV2Options(extraOptions.asJava)
val writer = ds.createWriter(df.logicalPlan.schema, mode, options)
// Using a timestamp and a random UUID to distinguish different writing jobs. This is good
// enough as there won't be tons of writing jobs created at the same second.
val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
.format(new Date()) + "-" + UUID.randomUUID()
val writer = ds.createWriter(jobId, df.logicalPlan.schema, mode, options)
if (writer.isPresent) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(writer.get(), df.logicalPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ object DataWritingSparkTask extends Logging {
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
val dataWriter =
writeTask.createWriter(context.stageId(), context.partitionId(), context.attemptNumber())
val dataWriter = writeTask.createWriter(context.partitionId(), context.attemptNumber())

// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
Expand All @@ -116,12 +115,9 @@ class RowToInternalRowDataWriterFactory(
rowWriterFactory: DataWriterFactory[Row],
schema: StructType) extends DataWriterFactory[InternalRow] {

override def createWriter(
stageId: Int,
partitionId: Int,
attemptNumber: Int): DataWriter[InternalRow] = {
override def createWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
new RowToInternalRowDataWriter(
rowWriterFactory.createWriter(stageId, partitionId, attemptNumber),
rowWriterFactory.createWriter(partitionId, attemptNumber),
RowEncoder.apply(schema).resolveAndBind())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
}
}

class Writer(path: String, conf: Configuration) extends DataSourceV2Writer {
// We can't get the real spark job id here, so we use a timestamp and random UUID to simulate
// a unique job id.
protected val jobId = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US).format(new Date()) +
"-" + UUID.randomUUID()

class Writer(jobId: String, path: String, conf: Configuration) extends DataSourceV2Writer {
override def createWriterFactory(): DataWriterFactory[Row] = {
new SimpleCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf))
}
Expand Down Expand Up @@ -96,8 +91,8 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
}
}

class InternalRowWriter(path: String, conf: Configuration)
extends Writer(path, conf) with SupportsWriteInternalRow {
class InternalRowWriter(jobId: String, path: String, conf: Configuration)
extends Writer(jobId, path, conf) with SupportsWriteInternalRow {

override def createWriterFactory(): DataWriterFactory[Row] = {
throw new IllegalArgumentException("not expected!")
Expand All @@ -115,6 +110,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
}

override def createWriter(
jobId: String,
schema: StructType,
mode: SaveMode,
options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
Expand All @@ -140,13 +136,17 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
fs.delete(path, true)
}

Optional.of(createWriter(path, conf, internal))
Optional.of(createWriter(jobId, path, conf, internal))
}

private def createWriter(
path: Path, conf: Configuration, internal: Boolean): DataSourceV2Writer = {
jobId: String, path: Path, conf: Configuration, internal: Boolean): DataSourceV2Writer = {
val pathStr = path.toUri.toString
if (internal) new InternalRowWriter(pathStr, conf) else new Writer(pathStr, conf)
if (internal) {
new InternalRowWriter(jobId, pathStr, conf)
} else {
new Writer(jobId, pathStr, conf)
}
}
}

Expand Down Expand Up @@ -185,7 +185,7 @@ class SimpleCSVReadTask(path: String, conf: SerializableConfiguration)
class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
extends DataWriterFactory[Row] {

override def createWriter(stageId: Int, partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
override def createWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
val jobPath = new Path(new Path(path, "_temporary"), jobId)
val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
val fs = filePath.getFileSystem(conf.value)
Expand Down Expand Up @@ -218,8 +218,7 @@ class SimpleCSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[Row] {
class InternalRowCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
extends DataWriterFactory[InternalRow] {

override def createWriter(
stageId: Int, partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
override def createWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
val jobPath = new Path(new Path(path, "_temporary"), jobId)
val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
val fs = filePath.getFileSystem(conf.value)
Expand Down