Skip to content
Closed
Prev Previous commit
Next Next commit
Some more changes
  • Loading branch information
tdas committed Dec 2, 2016
commit bec2fb392e995a517578341b1addfbe00bf04330
Original file line number Diff line number Diff line change
Expand Up @@ -97,20 +97,20 @@ class StreamExecution(
protected var currentBatchId: Long = -1

/** Metadata associated with the whole query */
protected val queryMetadata: StreamExecutionMetadata = {
protected val streamMetadata: StreamMetadata = {
val metadataPath = new Path(checkpointFile("metadata"))
val hadoopConf = sparkSession.sessionState.newHadoopConf()
StreamExecutionMetadata.read(metadataPath, hadoopConf).getOrElse {
val newMetadata = new StreamExecutionMetadata(UUID.randomUUID.toString)
StreamExecutionMetadata.write(newMetadata, metadataPath, hadoopConf)
StreamMetadata.read(metadataPath, hadoopConf).getOrElse {
val newMetadata = new StreamMetadata(UUID.randomUUID.toString)
StreamMetadata.write(newMetadata, metadataPath, hadoopConf)
newMetadata
}
}

/** Metadata associated with the offset seq of a batch in the query. */
protected var offsetSeqMetadata = OffsetSeqMetadata()

override val id: UUID = UUID.fromString(queryMetadata.id)
override val id: UUID = UUID.fromString(streamMetadata.id)

override val runId: UUID = UUID.randomUUID

Expand Down Expand Up @@ -637,22 +637,22 @@ class StreamExecution(
*
* @param id unique id of the [[StreamingQuery]] that needs to be persisted across restarts
*/
case class StreamExecutionMetadata(id: String) {
def json: String = Serialization.write(this)(StreamExecutionMetadata.format)
case class StreamMetadata(id: String) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these to their own file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

def json: String = Serialization.write(this)(StreamMetadata.format)
}

object StreamExecutionMetadata extends Logging {
object StreamMetadata extends Logging {
implicit val format = Serialization.formats(NoTypeHints)

/** Read the metadata from file if it exists */
def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamExecutionMetadata] = {
def read(metadataFile: Path, hadoopConf: Configuration): Option[StreamMetadata] = {
val fs = FileSystem.get(hadoopConf)
if (fs.exists(metadataFile)) {
var input: FSDataInputStream = null
try {
input = fs.open(metadataFile)
val reader = new InputStreamReader(input, StandardCharsets.UTF_8)
val metadata = Serialization.read[StreamExecutionMetadata](reader)
val metadata = Serialization.read[StreamMetadata](reader)
Some(metadata)
} catch {
case NonFatal(e) =>
Expand All @@ -666,7 +666,7 @@ object StreamExecutionMetadata extends Logging {

/** Write metadata to file, overwrite if it exists */
def write(
metadata: StreamExecutionMetadata,
metadata: StreamMetadata,
metadataFile: Path,
hadoopConf: Configuration): Unit = {
var output: FSDataOutputStream = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.sql.streaming.StreamTest

class StreamExecutionMetadataSuite extends StreamTest {
class StreamMetadataSuite extends StreamTest {

test("writing and reading") {
withTempDir { dir =>
val id = UUID.randomUUID.toString
val metadata = StreamExecutionMetadata(id)
val metadata = StreamMetadata(id)
val file = new Path(new File(dir, "test").toString)
StreamExecutionMetadata.write(metadata, file, hadoopConf)
val readMetadata = StreamExecutionMetadata.read(file, hadoopConf)
StreamMetadata.write(metadata, file, hadoopConf)
val readMetadata = StreamMetadata.read(file, hadoopConf)
assert(readMetadata.nonEmpty)
assert(readMetadata.get.id === id)
}
Expand All @@ -43,12 +43,12 @@ class StreamExecutionMetadataSuite extends StreamTest {
// query-metadata-logs-version-2.1.0.txt has the execution metadata generated by Spark 2.1.0
assert(
readForResource("query-metadata-logs-version-2.1.0.txt") ===
StreamExecutionMetadata("d366a8bf-db79-42ca-b5a4-d9ca0a11d63e"))
StreamMetadata("d366a8bf-db79-42ca-b5a4-d9ca0a11d63e"))
}

private def readForResource(fileName: String): StreamExecutionMetadata = {
private def readForResource(fileName: String): StreamMetadata = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note to self: readFromResource

val input = getClass.getResource(s"/structured-streaming/$fileName")
StreamExecutionMetadata.read(new Path(input.toString), hadoopConf).get
StreamMetadata.read(new Path(input.toString), hadoopConf).get
}

private val hadoopConf = new Configuration()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

test("QueryProgressEvent serialization") {
val event = new StreamingQueryListener.QueryProgressEvent(
StreamingQueryStatusAndProgressSuite.testProgress)
StreamingQueryStatusAndProgressSuite.testProgress2)
val json = JsonProtocol.sparkEventToJson(event)
val newEvent = JsonProtocol.sparkEventFromJson(json)
.asInstanceOf[StreamingQueryListener.QueryProgressEvent]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import org.apache.spark.sql.streaming.StreamingQueryStatusAndProgressSuite._
class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {

test("StreamingQueryProgress - prettyJson") {
val json = testProgress.prettyJson
assert(json ===
val json1 = testProgress1.prettyJson
assert(json1 ===
s"""
|{
| "id" : "${testProgress.id.toString}",
| "runId" : "${testProgress.runId.toString}",
| "id" : "${testProgress1.id.toString}",
| "runId" : "${testProgress1.runId.toString}",
| "name" : "myName",
| "timestamp" : 1,
| "numInputRows" : 678,
Expand All @@ -61,16 +61,48 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| }
|}
""".stripMargin.trim)
assert(compact(parse(json)) === testProgress.json)

assert(compact(parse(json1)) === testProgress1.json)

val json2 = testProgress2.prettyJson
assert(
json2 ===
s"""
|{
| "id" : "${testProgress2.id.toString}",
| "runId" : "${testProgress2.runId.toString}",
| "name" : null,
| "timestamp" : 1,
| "numInputRows" : 678,
| "durationMs" : {
| "total" : 0
| },
| "currentWatermark" : 3,
| "stateOperators" : [ {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1
| } ],
| "sources" : [ {
| "description" : "source",
| "startOffset" : 123,
| "endOffset" : 456,
| "numInputRows" : 678
| } ],
| "sink" : {
| "description" : "sink"
| }
|}
""".stripMargin.trim)
assert(compact(parse(json2)) === testProgress2.json)
}

test("StreamingQueryProgress - json") {
assert(compact(parse(testProgress.json)) === testProgress.json)
assert(compact(parse(testProgress1.json)) === testProgress1.json)
assert(compact(parse(testProgress2.json)) === testProgress2.json)
}

test("StreamingQueryProgress - toString") {
assert(testProgress.toString === testProgress.prettyJson)
assert(testProgress1.toString === testProgress1.prettyJson)
assert(testProgress2.toString === testProgress2.prettyJson)
}

test("StreamingQueryStatus - prettyJson") {
Expand All @@ -95,7 +127,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
}

object StreamingQueryStatusAndProgressSuite {
val testProgress = new StreamingQueryProgress(
val testProgress1 = new StreamingQueryProgress(
id = UUID.randomUUID,
runId = UUID.randomUUID,
name = "myName",
Expand All @@ -117,6 +149,28 @@ object StreamingQueryStatusAndProgressSuite {
sink = new SinkProgress("sink")
)

val testProgress2 = new StreamingQueryProgress(
id = UUID.randomUUID,
runId = UUID.randomUUID,
name = null, // should not be present in the json
timestamp = 1L,
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
currentWatermark = 3L,
stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, numRowsUpdated = 1)),
sources = Array(
new SourceProgress(
description = "source",
startOffset = "123",
endOffset = "456",
numInputRows = 678,
inputRowsPerSecond = Double.NaN, // should not be present in the json
processedRowsPerSecond = Double.NegativeInfinity // should not be present in the json
)
),
sink = new SinkProgress("sink")
)

val testStatus = new StreamingQueryStatus("active", true, false)
}