Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
progress
  • Loading branch information
brkyvz committed Oct 29, 2015
commit 573b657bca5a77297cafbde489ba380b3372c81c
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
if (eventLoop == null) return // scheduler has already been stopped
logDebug("Stopping JobScheduler")

// First, stop receiving
receiverTracker.stop(processAllReceivedData)
if (receiverTracker != null) {
// First, stop receiving
receiverTracker.stop(processAllReceivedData)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NPE thrown when streaming context is stopped before recovery is complete

}

// Second, stop generating jobs. If it has to process all received data,
// then this will wait for all the processing through JobScheduler to be over.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ private[streaming] class FileBasedWriteAheadLog(
val logFilesToRead = pastLogs.map{ _.path} ++ currentLogPath
logInfo("Reading from the logs: " + logFilesToRead.mkString("\n"))

logFilesToRead.iterator.map { file =>
logFilesToRead.par.map { file =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an expensive operation - you'd end up running an O(n) operation to create a copy (in addition to the copy cost). Do we really need this? I am not entirely sure the copying adds a whole lot of value, considering that this array is not going to be very huge. Also note the additional cost to spin up threads (if the context does not already have them spun up).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harishreedharan I did several benchmarks with this. In a setting where you would have a 30 minute window operation, which corresponds to ~9000 files on S3 with closeFileAfterWrite = true, the current code takes about 15 minutes to recover. With this very simple addition, this time is reduced to 1.5 minutes (on a driver with 4 CPUs, probably with hyper-threading)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which threadpool / execution context is this par operation going to? You must not use the default system execution.

logDebug(s"Creating log reader with $file")
val reader = new FileBasedWriteAheadLogReader(file, hadoopConf)
CompletionIterator[ByteBuffer, Iterator[ByteBuffer]](reader, reader.close _)
}.flatten.asJava
}.flatten.toIterator.asJava
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.streaming.util

import java.io.{Closeable, EOFException}
import java.io.{IOException, Closeable, EOFException}
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -55,6 +55,18 @@ private[streaming] class FileBasedWriteAheadLogReader(path: String, conf: Config
logDebug("Error reading next item, EOF reached", e)
close()
false
case e: IOException =>
logWarning("Error while trying to read data. If the file was deleted, " +
"this should be okay.", e)
if (HdfsUtils.checkFileExists(path, conf)) {
throw e
} else {
// file was deleted. This can occur when the daemon cleanup thread takes time to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: file -> File, since this comment actually has multiple sentences, its weird to have the first one start with lower case.

// delete the file during recovery.
close()
false
}

case e: Exception =>
logWarning("Error while trying to read data from HDFS.", e)
close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,9 @@ private[streaming] object HdfsUtils {
case _ => fs
}
}

def checkFileExists(path: Path, conf: Configuration): Boolean = {
val fs = getFileSystemForPath(path, conf)
fs.isFile(path)
}
}