Skip to content
Prev Previous commit
Next Next commit
partial no rdd
  • Loading branch information
jose-torres committed Apr 30, 2018
commit 3a4991aa3345d6c5b088586b388269878d7667d3
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming.continuous
import scala.util.control.NonFatal

import org.apache.spark.{SparkEnv, SparkException, TaskContext}

import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -46,29 +47,34 @@ case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPla
case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
}

val rdd = query.execute()
val rdd = new ContinuousWriteRDD(query.execute(), writerFactory)
val messages = new Array[WriterCommitMessage](rdd.partitions.length)

logInfo(s"Start processing data source writer: $writer. " +
s"The input RDD has ${messages.length} partitions.")
// Let the epoch coordinator know how many partitions the write RDD has.
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
sparkContext.env)
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
sparkContext.env)
.askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))

try {
// Force the RDD to run so continuous processing starts; no data is actually being collected
// to the driver, as ContinuousWriteRDD outputs nothing.
sparkContext.runJob(
rdd,
(context: TaskContext, iter: Iterator[InternalRow]) =>
WriteToContinuousDataSourceExec.run(writerFactory, context, iter),
rdd.partitions.indices)
rdd.collect()
} catch {
case _: InterruptedException =>
// Interruption is how continuous queries are ended, so accept and ignore the exception.
// Interruption is how continuous queries are ended, so accept and ignore the exception.
case cause: Throwable =>
logError(s"Data source writer $writer is aborting.")
try {
writer.abort(0, messages)
} catch {
case t: Throwable =>
logError(s"Data source writer $writer failed to abort.")
cause.addSuppressed(t)
throw new SparkException("Writing job failed.", cause)
}
logError(s"Data source writer $writer aborted.")
cause match {
// Do not wrap interruption exceptions that will be handled by streaming specially.
case _ if StreamExecution.isInterruptionException(cause) => throw cause
Expand All @@ -81,46 +87,3 @@ case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPla
sparkContext.emptyRDD
}
}

object WriteToContinuousDataSourceExec extends Logging {
def run(
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): Unit = {
val epochCoordinator = EpochCoordinatorRef.get(
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
SparkEnv.get)
val currentMsg: WriterCommitMessage = null
var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong

do {
var dataWriter: DataWriter[InternalRow] = null
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
try {
dataWriter = writeTask.createDataWriter(
context.partitionId(), context.attemptNumber(), currentEpoch)
while (iter.hasNext) {
dataWriter.write(iter.next())
}
logInfo(s"Writer for partition ${context.partitionId()} is committing.")
val msg = dataWriter.commit()
logInfo(s"Writer for partition ${context.partitionId()} committed.")
epochCoordinator.send(
CommitPartitionEpoch(context.partitionId(), currentEpoch, msg)
)
currentEpoch += 1
} catch {
case _: InterruptedException =>
// Continuous shutdown always involves an interrupt. Just finish the task.
}
})(catchBlock = {
// If there is an error, abort this writer. We enter this callback in the middle of
// rethrowing an exception, so runContinuous will stop executing at this point.
logError(s"Writer for partition ${context.partitionId()} is aborting.")
if (dataWriter != null) dataWriter.abort()
logError(s"Writer for partition ${context.partitionId()} aborted.")
})
} while (!context.isInterrupted())
}
}