Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ case class KafkaStreamWriterFactory(
topic: Option[String], producerParams: Map[String, String], schema: StructType)
extends DataWriterFactory[InternalRow] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[InternalRow] = {
new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.spark.annotation.InterfaceStability;

/**
* A data writer returned by {@link DataWriterFactory#createDataWriter(int, int)} and is
* A data writer returned by {@link DataWriterFactory#createDataWriter(int, int, long)} and is
* responsible for writing data for an input RDD partition.
*
* One Spark task has one exclusive data writer, so there is no thread-safe concern.
Expand All @@ -31,13 +31,17 @@
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
* not be processed. If all records are successfully written, {@link #commit()} is called.
*
* Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, its lifecycle
* is over and Spark will not use it again.
*
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
* exception will be sent to the driver side, and Spark will retry this writing task for some times,
* each time {@link DataWriterFactory#createDataWriter(int, int)} gets a different `attemptNumber`,
* and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all retry fail.
* exception will be sent to the driver side, and Spark may retry this writing task a few times.
* In each retry, {@link DataWriterFactory#createDataWriter(int, int, long)} will receive a
* different `attemptNumber`. Spark will call {@link DataSourceWriter#abort(WriterCommitMessage[])}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not clear to me. Isnt it the case that abort will be called every time a task attempt ends in an error?
This seems to give the impression that abort is called only after N failed attempts have been made.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The local abort will be called every time a task attempt fails. The global abort referenced here is called only when the job fails.

* when the configured number of retries is exhausted.
*
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
* takes too long to finish. Different from retried tasks, which are launched one by one after the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public interface DataWriterFactory<T> extends Serializable {
* same task id but different attempt number, which means there are multiple
* tasks with the same task id running at the same time. Implementations can
* use this attempt number to distinguish writers of different task attempts.
* @param epochId A monotonically increasing id for streaming queries that are split in to
* discrete periods of execution. For non-streaming queries,
* this ID will always be 0.
*/
DataWriter<T> createDataWriter(int partitionId, int attemptNumber);
DataWriter<T> createDataWriter(int partitionId, int attemptNumber, long epochId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using the same interface for streaming and batch here? Is there a compelling reason to do so instead of adding StreamingWriterFactory? Are the guarantees for an epoch identical to those of a single batch job?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The guarantees are identical, and in the current execution model, each epoch is in fact processed by a single batch job.

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;

/**
* A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and
* aborts relative to an epoch ID determined by the execution engine.
* A {@link DataSourceWriter} for use with structured streaming.
*
* {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
* and so must reset any internal state after a successful commit.
* Streaming queries are divided into intervals of data called epochs, with a monotonically
* increasing numeric ID. This writer handles commits and aborts for each successive epoch.
*/
@InterfaceStability.Evolving
public interface StreamWriter extends DataSourceWriter {
Expand All @@ -39,21 +38,21 @@ public interface StreamWriter extends DataSourceWriter {
* If this method fails (by throwing an exception), this writing job is considered to have been
* failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
*
* To support exactly-once processing, writer implementations should ensure that this method is
* idempotent. The execution engine may call commit() multiple times for the same epoch
* in some circumstances.
* The execution engine may call commit() multiple times for the same epoch in some circumstances.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhere in this file, add docs about what epochId means for MB and C execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

* To support exactly-once data semantics, implementations must ensure that multiple commits for
* the same epoch are idempotent.
*/
void commit(long epochId, WriterCommitMessage[] messages);

/**
* Aborts this writing job because some data writers are failed and keep failing when retry, or
* Aborts this writing job because some data writers are failed and keep failing when retried, or
* the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
*
* If this method fails (by throwing an exception), the underlying data source may require manual
* cleanup.
*
* Unless the abort is triggered by the failure of commit, the given messages should have some
* null slots as there maybe only a few data writers that are committed before the abort
* Unless the abort is triggered by the failure of commit, the given messages will have some
* null slots, as there may be only a few data writers that were committed before the abort
* happens, or some data writers were committed but their commit messages haven't reached the
* driver when the abort is triggered. So this is just a "best effort" for data sources to
* clean up the data left by data writers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.StreamExecution
import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamExecution}
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
Expand Down Expand Up @@ -132,7 +132,8 @@ object DataWritingSparkTask extends Logging {
val stageId = context.stageId()
val partId = context.partitionId()
val attemptId = context.attemptNumber()
val dataWriter = writeTask.createDataWriter(partId, attemptId)
val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0")
val dataWriter = writeTask.createDataWriter(partId, attemptId, epochId.toLong)

// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
Expand Down Expand Up @@ -172,18 +173,22 @@ object DataWritingSparkTask extends Logging {
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
val epochCoordinator = EpochCoordinatorRef.get(
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
SparkEnv.get)
val currentMsg: WriterCommitMessage = null
var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong

do {
var dataWriter: DataWriter[InternalRow] = null
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
try {
iter.foreach(dataWriter.write)
dataWriter = writeTask.createDataWriter(
context.partitionId(), context.attemptNumber(), currentEpoch)
while (iter.hasNext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason to change foreach to a while loop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC (/cc @tdas) foreach can be problematic in tight loops, because it introduces a lambda that isn't always optimized away.

dataWriter.write(iter.next())
}
logInfo(s"Writer for partition ${context.partitionId()} is committing.")
val msg = dataWriter.commit()
logInfo(s"Writer for partition ${context.partitionId()} committed.")
Expand All @@ -196,9 +201,10 @@ object DataWritingSparkTask extends Logging {
// Continuous shutdown always involves an interrupt. Just finish the task.
}
})(catchBlock = {
// If there is an error, abort this writer
// If there is an error, abort this writer. We enter this callback in the middle of
// rethrowing an exception, so runContinuous will stop executing at this point.
logError(s"Writer for partition ${context.partitionId()} is aborting.")
dataWriter.abort()
if (dataWriter != null) dataWriter.abort()
logError(s"Writer for partition ${context.partitionId()} aborted.")
})
} while (!context.isInterrupted())
Expand All @@ -211,9 +217,12 @@ class InternalRowDataWriterFactory(
rowWriterFactory: DataWriterFactory[Row],
schema: StructType) extends DataWriterFactory[InternalRow] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[InternalRow] = {
new InternalRowDataWriter(
rowWriterFactory.createDataWriter(partitionId, attemptNumber),
rowWriterFactory.createDataWriter(partitionId, attemptNumber, epochId),
RowEncoder.apply(schema).resolveAndBind())
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,9 @@ class MicroBatchExecution(
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}

sparkSessionToRunBatch.sparkContext.setLocalProperty(
MicroBatchExecution.BATCH_ID_KEY, currentBatchId.toString)

reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionToRunBatch,
Expand Down Expand Up @@ -518,3 +521,7 @@ class MicroBatchExecution(
Optional.ofNullable(scalaOption.orNull)
}
}

object MicroBatchExecution {
val BATCH_ID_KEY = "streaming.sql.batchId"
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, Dat
* for production-quality sinks. It's intended for use in tests.
*/
case object PackedRowWriterFactory extends DataWriterFactory[Row] {
def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[Row] = {
new PackedRowDataWriter()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode)
}

case class MemoryWriterFactory(outputMode: OutputMode) extends DataWriterFactory[Row] {
def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[Row] = {
new MemoryDataWriter(partitionId, outputMode)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,10 @@ private[v2] object SimpleCounter {
class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
extends DataWriterFactory[Row] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[Row] = {
val jobPath = new Path(new Path(path, "_temporary"), jobId)
val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
val fs = filePath.getFileSystem(conf.value)
Expand Down Expand Up @@ -240,7 +243,10 @@ class SimpleCSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[Row] {
class InternalRowCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
extends DataWriterFactory[InternalRow] {

override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
override def createDataWriter(
partitionId: Int,
attemptNumber: Int,
epochId: Long): DataWriter[InternalRow] = {
val jobPath = new Path(new Path(path, "_temporary"), jobId)
val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
val fs = filePath.getFileSystem(conf.value)
Expand Down