Skip to content

Commit 96e95ab

Browse files
committed
More debugging
1 parent 34a52f9 commit 96e95ab

File tree

3 files changed

+9
-4
lines changed

3 files changed

+9
-4
lines changed

streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ class CheckpointWriter(
277277
val bytes = Checkpoint.serialize(checkpoint, conf)
278278
executor.execute(new CheckpointWriteHandler(
279279
checkpoint.checkpointTime, bytes, clearCheckpointDataLater))
280-
logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
280+
logInfo("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
281281
} catch {
282282
case rej: RejectedExecutionException =>
283283
logError("Could not submit checkpoint task to the thread pool executor", rej)

streaming/src/test/resources/log4j.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,5 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{
2525

2626
# Ignore messages below warning level from Jetty, because it's a bit verbose
2727
log4j.logger.org.spark-project.jetty=WARN
28+
log4j.appender.org.apache.spark.streaming=DEBUG
2829

streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,13 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
9696
expectedOutput.take(numBatchesBeforeRestart), stopSparkContextAfterTest)
9797

9898
// Restart and complete the computation from checkpoint file
99-
logInfo(
99+
// scalastyle:off println
100+
print(
100101
"\n-------------------------------------------\n" +
101102
" Restarting stream computation " +
102103
"\n-------------------------------------------\n"
103104
)
105+
// scalastyle:on println
104106
val restartedSsc = new StreamingContext(checkpointDir)
105107
generateAndAssertOutput[V](restartedSsc, batchDuration, checkpointDir, nextNumBatches,
106108
expectedOutput.takeRight(nextNumExpectedOutputs), stopSparkContextAfterTest)
@@ -125,9 +127,11 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
125127
ssc.start()
126128
val numBatches = expectedOutput.size
127129
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
128-
logDebug("Manual clock before advancing = " + clock.getTimeMillis())
130+
// scalastyle:off println
131+
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
129132
clock.advance((batchDuration * numBatches).milliseconds)
130-
logDebug("Manual clock after advancing = " + clock.getTimeMillis())
133+
logInfo("Manual clock after advancing = " + clock.getTimeMillis())
134+
// scalastyle:on println
131135

132136
val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
133137
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]

0 commit comments

Comments
 (0)