Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
add comments
  • Loading branch information
jose-torres committed Feb 9, 2018
commit 23e4138e9de276809dc38d0dbcfd20d260be49ec
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,56 @@ case class ForeachWriterProvider[T: Encoder](writer: ForeachWriter[T]) extends S
val encoder = encoderFor[T].resolveAndBind(
schema.toAttributes,
SparkSession.getActiveSession.get.sessionState.analyzer)
ForeachInternalWriter(writer, encoder)
}
}

case class ForeachInternalWriter[T: Encoder](
writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
extends StreamWriter with SupportsWriteInternalRow {
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}

override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = {
ForeachWriterFactory(writer, encoder)
new StreamWriter with SupportsWriteInternalRow {
override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}

override def createInternalRowWriterFactory(): DataWriterFactory[InternalRow] = {
val byteStream = new ByteArrayOutputStream()
val objectStream = new ObjectOutputStream(byteStream)
objectStream.writeObject(writer)
ForeachWriterFactory(byteStream.toByteArray, encoder)
}
}
}
}

case class ForeachWriterFactory[T: Encoder](writer: ForeachWriter[T], encoder: ExpressionEncoder[T])
case class ForeachWriterFactory[T: Encoder](
serializedWriter: Array[Byte],
encoder: ExpressionEncoder[T])
extends DataWriterFactory[InternalRow] {
override def createDataWriter(partitionId: Int, attemptNumber: Int): ForeachDataWriter[T] = {
new ForeachDataWriter(writer, encoder, partitionId)
new ForeachDataWriter(serializedWriter, encoder, partitionId)
}
}

/**
* A [[DataWriter]] for the foreach sink.
*
* Note that [[ForeachWriter]] has the following lifecycle, and (as was true in the V1 sink API)
* assumes that it's never reused:
* * [create writer]
* * open(partitionId, batchId)
* * if open() returned true: write, write, write, ...
* * close()
* while DataSourceV2 writers have a slightly different lifecycle and will be reused for multiple
* epochs in the continuous processing engine:
* * [create writer]
* * write, write, write, ...
* * commit()
*
* The bulk of the implementation here is a shim between these two models.
*
* @param serializedWriter a serialized version of the user-provided [[ForeachWriter]]
* @param encoder encoder from [[Row]] to the type param [[T]]
* @param partitionId the ID of the partition this data writer is responsible for
*
* @tparam T the type of data to be handled by the writer
*/
class ForeachDataWriter[T : Encoder](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs describing the implementation of this DataWriter, especially the lifecycle of ForeachWriter (should go here than inline comments).

private var writer: ForeachWriter[T], encoder: ExpressionEncoder[T], partitionId: Int)
serializedWriter: Array[Byte],
encoder: ExpressionEncoder[T],
partitionId: Int)
extends DataWriter[InternalRow] {
private val initialEpochId: Long = {
// Start with the microbatch ID. If it's not there, we're in continuous execution,
Expand All @@ -74,28 +100,23 @@ class ForeachDataWriter[T : Encoder](
case batch => batch.toLong
}
}
private var currentEpochId = initialEpochId

// The lifecycle of the ForeachWriter is incompatible with the lifecycle of DataSourceV2 writers.
// Unfortunately, we cannot migrate ForeachWriter, as its implementations live in user code. So
// we need a small state machine to shim between them.
// A small state machine representing the lifecycle of the underlying ForeachWriter.
// * CLOSED means close() has been called.
// * OPENED
// * OPENED means open() was called and returned true.
// * OPENED_SKIP_PROCESSING means open() was called and returned false.
private object WriterState extends Enumeration {
type WriterState = Value
val CLOSED, OPENED, OPENED_SKIP_PROCESSING = Value
}
import WriterState._

private var state = CLOSED
private var writer: ForeachWriter[T] = _
private var state: WriterState = _
private var currentEpochId = initialEpochId

private def openAndSetState(epochId: Long) = {
// Create a new writer by roundtripping through the serialization for compatibility.
// In the old API, a writer instantiation would never get reused.
val byteStream = new ByteArrayOutputStream()
val objectStream = new ObjectOutputStream(byteStream)
objectStream.writeObject(writer)
writer = new ObjectInputStream(new ByteArrayInputStream(byteStream.toByteArray)).readObject()
writer = new ObjectInputStream(new ByteArrayInputStream(serializedWriter)).readObject()
.asInstanceOf[ForeachWriter[T]]

writer.open(partitionId, epochId) match {
Expand Down Expand Up @@ -132,4 +153,7 @@ class ForeachDataWriter[T : Encoder](
override def abort(): Unit = {}
}

/**
* An empty [[WriterCommitMessage]]. [[ForeachWriter]] implementations have no global coordination.
*/
case object ForeachWriterCommitMessage extends WriterCommitMessage