Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Modified BasicBlockFetchIterator to fail fast when local fetch error …
…has been occurred
  • Loading branch information
sarutak committed Jul 29, 2014
commit b7b8250aa9c7b637392f9334bd401e467119df99
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,21 @@ object BlockFetcherIterator {
// Get the local blocks while remote blocks are being fetched. Note that it's okay to do
// these all at once because they will just memory-map some files, so they won't consume
// any memory that might exceed our maxBytesInFlight
for (id <- localBlocksToFetch) {
try {
var fetchIndex = 0
try {
for (id <- localBlocksToFetch) {

// getLocalFromDisk never return None but throws BlockException
val iter = getLocalFromDisk(id, serializer).get
// Pass 0 as size since it's not in flight
results.put(new FetchResult(id, 0, () => iter))
fetchIndex += 1
logDebug("Got local block " + id)
} catch {
case e: Exception => {
logError(s"Error occurred while fetch local block $id", e)
}
} catch {
case e: Exception => {
logError(s"Error occurred while fetching local blocks", e)
for (id <- localBlocksToFetch.drop(fetchIndex)) {
results.put(new FetchResult(id, -1, null))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't do drop and such on a ConcurrentQueue, since it might drop stuff other threads were adding. Just do a results.put on the failed block and don't worry about dropping other ones. You can actually move the try/catch into the for loop and add a "return" at the bottom of the catch after adding this failing FetchResult.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your comment, @mateiz .

I wouldn't do drop and such on a ConcurrentQueue, since it might drop stuff other threads were adding. Just do a results.put on the failed block and don't worry about dropping other ones. You can actually move the try/catch into the for loop and add a "return" at the bottom of the catch after adding this failing FetchResult.

But, if it returns from getLocalBlocks immediately rest of FetchResults is not set to results, and we waits on results.take() in next method forever right? results is a instance of LinkedBlockingQueue and take method is blocking method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought next() would return a failure block, and then the caller of BlockFetcherIterator will just stop. Did you see it not doing that? I think all you have to do is put one FetchResult with size = -1 in the queue and return, and everything will be fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought wrong. Exactly, in current usage of BlockFetcherIterator, next() is not invoked after FetchFailedException has been thrown.
I wonder it's a little bit problem that we can invoke next() after FetchFailedException even if there are no such usages in current implementation.
I think it's better to prohibit invoking next() after FetchFailedException to clearly express the correct usage of the method.

}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we throw an exception above and then immediately catch it, instead of doing results.put above? Is there any other kind of error that can happen beyond getLocalFromDisk returning None?

Also, the current code seems to forget the exception: it just puts in a failed result. Is this intentional, i.e. will get a FetchFailedException later? It seems we should return from this method ASAP if there's a problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, getLocalFromDisk never return None but can throw BlockException. so I think "case None" block above is useless and we should remove the "case None" block rather than doing results.put.

Is there any other kind of error that can happen beyond getLocalFromDisk returning None?

Yes, BlockException is thrown from getLocalFromDisk, and FileNotFoundException from DiskStore#getBytes when it failed to fetch shuffle____ from local disk.

Also, the current code seems to forget the exception: it just puts in a failed result. Is this intentional, i.e. will get a FetchFailedException later?

It's for get FetchFailedException later. If we return from BasicBlockFetchIterator#getLocallocks, we can't know whether rest of blocks can be read successfully or not.

}
Expand Down