Skip to content
Prev Previous commit
Next Next commit
rvrt stream writer
  • Loading branch information
jose-torres committed Apr 30, 2018
commit 7a4f1e72a3a139fee7980c54f312f30d8f738c04
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
package org.apache.spark.sql.sources.v2.writer.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriterFactory;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;

/**
Expand All @@ -31,15 +29,14 @@
* increasing numeric ID. This writer handles commits and aborts for each successive epoch.
*/
@InterfaceStability.Evolving
public interface StreamWriter {
public interface StreamWriter extends DataSourceWriter {
/**
* Commits this writing job for the specified epoch with a list of commit messages. The commit
* messages are collected from successful data writers and are produced by
* {@link DataWriter#commit()}.
*
* If this method fails (by throwing an exception), this writing job is considered to have been
* failed, and the execution engine will attempt to call
* {@link #abort(long, WriterCommitMessage[])}.
* failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
*
* The execution engine may call commit() multiple times for the same epoch in some circumstances.
* To support exactly-once data semantics, implementations must ensure that multiple commits for
Expand All @@ -49,8 +46,7 @@ public interface StreamWriter {

/**
* Aborts this writing job because some data writers are failed and keep failing when retried, or
* the Spark job fails with some unknown reasons, or
* {@link #commit(long, WriterCommitMessage[])} fails.
* the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
*
* If this method fails (by throwing an exception), the underlying data source may require manual
* cleanup.
Expand All @@ -63,11 +59,13 @@ public interface StreamWriter {
*/
void abort(long epochId, WriterCommitMessage[] messages);

/**
* Creates a writer factory which will be serialized and sent to executors.
*
* If this method fails (by throwing an exception), the action would fail and no Spark job was
* submitted.
*/
DataWriterFactory<Row> createWriterFactory();
default void commit(WriterCommitMessage[] messages) {
throw new UnsupportedOperationException(
"Commit without epoch should not be called with StreamWriter");
}

default void abort(WriterCommitMessage[] messages) {
throw new UnsupportedOperationException(
"Abort without epoch should not be called with StreamWriter");
}
}