Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
165eab1
[SPARK-3453] Refactor Netty module to use BlockTransferService.
rxin Sep 9, 2014
1760d32
Use Epoll.isAvailable in BlockServer as well.
rxin Sep 9, 2014
2b44cf1
Added more documentation.
rxin Sep 9, 2014
064747b
Reference count buffers and clean them up properly.
rxin Sep 10, 2014
b5c8d1f
Fixed ShuffleBlockFetcherIteratorSuite.
rxin Sep 10, 2014
108c9ed
Forgot to add TestSerializer to the commit list.
rxin Sep 10, 2014
1be4e8e
Shorten NioManagedBuffer and NettyManagedBuffer class names.
rxin Sep 10, 2014
cb589ec
Added more test cases covering cleanup when fault happens in ShuffleB…
rxin Sep 11, 2014
5cd33d7
Fixed style violation.
rxin Sep 11, 2014
9e0cb87
Fixed BlockClientHandlerSuite
rxin Sep 11, 2014
d23ed7b
Incorporated feedback from Norman:
rxin Sep 12, 2014
b2f3281
Added connection pooling.
rxin Sep 12, 2014
14323a5
Removed BlockManager.getLocalShuffleFromDisk.
rxin Sep 12, 2014
f0a16e9
Fixed test hanging.
rxin Sep 12, 2014
519d64d
Mark private package visibility and MimaExcludes.
rxin Sep 12, 2014
c066309
Implement java.io.Closeable interface.
rxin Sep 13, 2014
6afc435
Added logging.
rxin Sep 17, 2014
f63fb4c
Add more debug message.
rxin Sep 29, 2014
d68f328
Logging close() in case close() fails.
rxin Sep 29, 2014
1bdd7ee
Fixed tests.
rxin Sep 29, 2014
bec4ea2
Removed OIO and added num threads settings.
rxin Sep 29, 2014
4b18db2
Copy the buffer in fetchBlockSync.
rxin Sep 29, 2014
a0518c7
Implemented block uploads.
rxin Sep 30, 2014
407e59a
Fix style violation.
rxin Sep 30, 2014
f6c220d
Merge with latest master.
rxin Sep 30, 2014
5d98ce3
Flip buffer.
rxin Sep 30, 2014
f7e7568
Fixed spark.shuffle.io.receiveBuffer setting.
rxin Sep 30, 2014
29c6dcf
[SPARK-3453] Netty-based BlockTransferService, extracted from Spark core
aarondav Oct 6, 2014
ae4083a
[SPARK-2805] Upgrade Akka to 2.3.4
avati Oct 10, 2014
020691e
[SPARK-3886] [PySpark] use AutoBatchedSerializer by default
davies Oct 10, 2014
2c5d9dc
HOTFIX: Fix build issue with Akka 2.3.4 upgrade.
pwendell Oct 10, 2014
5b5dbe6
[SPARK-2924] Required by scala 2.11, only one fun/ctor amongst overri…
ScrapCodes Oct 11, 2014
8dc1ded
[SPARK-3867][PySpark] ./python/run-tests failed when it run with Pyth…
cocoatomo Oct 11, 2014
aa58f67
[SPARK-3909][PySpark][Doc] A corrupted format in Sphinx documents and…
cocoatomo Oct 11, 2014
939f276
Attempt to make comm. bidirectional
aarondav Oct 12, 2014
dd420fd
Merge branch 'master' of https://github.com/apache/spark into netty-test
aarondav Oct 17, 2014
7b7a26c
Fix Nio compile issue
aarondav Oct 17, 2014
d236dfd
Remove no-op serializer :)
aarondav Oct 17, 2014
9da0bc1
Add RPC unit tests
aarondav Oct 17, 2014
ccd4959
Don't throw exception if client immediately fails
aarondav Oct 17, 2014
e5675a4
Fail outstanding RPCs as well
aarondav Oct 18, 2014
322dfc1
Address Reynold's comments, including major rename
aarondav Oct 27, 2014
8dfcceb
Merge branch 'master' of https://github.com/apache/spark into netty
aarondav Oct 27, 2014
14e37f7
Address Reynold's comments
aarondav Oct 28, 2014
0c5bca2
Merge branch 'master' of https://github.com/apache/spark into netty
aarondav Oct 28, 2014
2b0d1c0
100ch
aarondav Oct 28, 2014
4a204b8
Fail block fetches if client connection fails
aarondav Oct 29, 2014
d7be11b
Turn netty on by default
aarondav Oct 29, 2014
cadfd28
Turn netty off by default
aarondav Oct 29, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implemented block uploads.
  • Loading branch information
rxin authored and aarondav committed Oct 10, 2014
commit a0518c766f0f4eba24459ffac61dce789fc14092
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ abstract class BlockTransferService extends Closeable {
* Fetch a sequence of blocks from a remote node asynchronously,
* available only after [[init]] is invoked.
*
* Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
* while [[BlockFetchingListener.onBlockFetchFailure]] is called once per failure (not per block).
*
* Note that this API takes a sequence so the implementation can batch requests, and does not
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
* the data of a block is fetched, rather than waiting for all blocks to be fetched.
Expand Down
31 changes: 31 additions & 0 deletions core/src/main/scala/org/apache/spark/network/exceptions.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network

class BlockFetchFailureException(blockId: String, errorMsg: String, cause: Throwable)
extends Exception(errorMsg, cause) {

def this(blockId: String, errorMsg: String) = this(blockId, errorMsg, null)
}


class BlockUploadFailureException(blockId: String, cause: Throwable)
extends Exception(s"Failed to fetch block $blockId", cause) {

def this(blockId: String) = this(blockId, null)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package org.apache.spark.network.netty
import java.io.Closeable
import java.util.concurrent.TimeoutException

import scala.concurrent.{Future, promise}

import io.netty.channel.{ChannelFuture, ChannelFutureListener}

import org.apache.spark.Logging
import org.apache.spark.network.BlockFetchingListener
import org.apache.spark.network.{ManagedBuffer, BlockFetchingListener}
import org.apache.spark.storage.StorageLevel


/**
Expand Down Expand Up @@ -58,19 +61,19 @@ class BlockClient(cf: ChannelFuture, handler: BlockClientHandler) extends Closea
def fetchBlocks(blockIds: Seq[String], listener: BlockFetchingListener): Unit = {
var startTime: Long = 0
logTrace {
startTime = System.nanoTime()
startTime = System.currentTimeMillis()
s"Sending request $blockIds to $serverAddr"
}

blockIds.foreach { blockId =>
handler.addRequest(blockId, listener)
handler.addFetchRequest(blockId, listener)
}

cf.channel().writeAndFlush(BlockFetchRequest(blockIds)).addListener(new ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) {
logTrace {
val timeTaken = (System.nanoTime() - startTime).toDouble / 1000000
val timeTaken = System.currentTimeMillis() - startTime
s"Sending request $blockIds to $serverAddr took $timeTaken ms"
}
} else {
Expand All @@ -79,14 +82,43 @@ class BlockClient(cf: ChannelFuture, handler: BlockClientHandler) extends Closea
s"Failed to send request $blockIds to $serverAddr: ${future.cause.getMessage}"
logError(errorMsg, future.cause)
blockIds.foreach { blockId =>
handler.removeRequest(blockId)
handler.removeFetchRequest(blockId)
listener.onBlockFetchFailure(blockId, new RuntimeException(errorMsg))
}
}
}
})
}

def uploadBlock(blockId: String, data: ManagedBuffer, storageLevel: StorageLevel): Future[Unit] = {
var startTime: Long = 0
logTrace {
startTime = System.currentTimeMillis()
s"Uploading block ($blockId) to $serverAddr"
}
val f = cf.channel().writeAndFlush(new BlockUploadRequest(blockId, data, storageLevel))

val p = promise[Unit]()
handler.addUploadRequest(blockId, p)
f.addListener(new ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) {
logTrace {
val timeTaken = System.currentTimeMillis() - startTime
s"Uploading block ($blockId) to $serverAddr took $timeTaken ms"
}
} else {
// Fail all blocks.
val errorMsg =
s"Failed to upload block $blockId to $serverAddr: ${future.cause.getMessage}"
logError(errorMsg, future.cause)
}
}
})

p.future
}

/** Close the connection. This does NOT block till the connection is closed. */
def close(): Unit = cf.channel().close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package org.apache.spark.network.netty

import java.util.concurrent.ConcurrentHashMap

import scala.concurrent.Promise

import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}

import org.apache.spark.Logging
import org.apache.spark.network.BlockFetchingListener
import org.apache.spark.network.{BlockFetchFailureException, BlockUploadFailureException, BlockFetchingListener}


/**
Expand All @@ -35,43 +37,57 @@ private[netty]
class BlockClientHandler extends SimpleChannelInboundHandler[ServerResponse] with Logging {

/** Tracks the list of outstanding requests and their listeners on success/failure. */
private[this] val outstandingRequests: java.util.Map[String, BlockFetchingListener] =
private[this] val outstandingFetches: java.util.Map[String, BlockFetchingListener] =
new ConcurrentHashMap[String, BlockFetchingListener]

def addRequest(blockId: String, listener: BlockFetchingListener): Unit = {
outstandingRequests.put(blockId, listener)
private[this] val outstandingUploads: java.util.Map[String, Promise[Unit]] =
new ConcurrentHashMap[String, Promise[Unit]]

def addFetchRequest(blockId: String, listener: BlockFetchingListener): Unit = {
outstandingFetches.put(blockId, listener)
}

def removeRequest(blockId: String): Unit = {
outstandingRequests.remove(blockId)
def removeFetchRequest(blockId: String): Unit = {
outstandingFetches.remove(blockId)
}

def addUploadRequest(blockId: String, promise: Promise[Unit]): Unit = {
outstandingUploads.put(blockId, promise)
}

/**
* Fire the failure callback for all outstanding requests. This is called when we have an
* uncaught exception or pre-mature connection termination.
*/
private def failOutstandingRequests(cause: Throwable): Unit = {
val iter = outstandingRequests.entrySet().iterator()
while (iter.hasNext) {
val entry = iter.next()
val iter1 = outstandingFetches.entrySet().iterator()
while (iter1.hasNext) {
val entry = iter1.next()
entry.getValue.onBlockFetchFailure(entry.getKey, cause)
}
// TODO(rxin): Maybe we need to synchronize the access? Otherwise we could clear new requests
// as well. But I guess that is ok given the caller will fail as soon as any requests fail.
outstandingRequests.clear()
outstandingFetches.clear()

val iter2 = outstandingUploads.entrySet().iterator()
while (iter2.hasNext) {
val entry = iter2.next()
entry.getValue.failure(new RuntimeException(s"Failed to upload block ${entry.getKey}"))
}
outstandingUploads.clear()
}

override def channelUnregistered(ctx: ChannelHandlerContext): Unit = {
if (outstandingRequests.size() > 0) {
logError("Still have " + outstandingRequests.size() + " requests outstanding " +
if (outstandingFetches.size() > 0) {
logError("Still have " + outstandingFetches.size() + " requests outstanding " +
s"when connection from ${ctx.channel.remoteAddress} is closed")
failOutstandingRequests(new RuntimeException(
s"Connection from ${ctx.channel.remoteAddress} closed"))
}
}

override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
if (outstandingRequests.size() > 0) {
if (outstandingFetches.size() > 0) {
logError(
s"Exception in connection from ${ctx.channel.remoteAddress}: ${cause.getMessage}", cause)
failOutstandingRequests(cause)
Expand All @@ -83,23 +99,39 @@ class BlockClientHandler extends SimpleChannelInboundHandler[ServerResponse] wit
val server = ctx.channel.remoteAddress.toString
response match {
case BlockFetchSuccess(blockId, buf) =>
val listener = outstandingRequests.get(blockId)
val listener = outstandingFetches.get(blockId)
if (listener == null) {
logWarning(s"Got a response for block $blockId from $server but it is not outstanding")
buf.release()
} else {
outstandingRequests.remove(blockId)
outstandingFetches.remove(blockId)
listener.onBlockFetchSuccess(blockId, buf)
buf.release()
}
case BlockFetchFailure(blockId, errorMsg) =>
val listener = outstandingRequests.get(blockId)
val listener = outstandingFetches.get(blockId)
if (listener == null) {
logWarning(
s"Got a response for block $blockId from $server ($errorMsg) but it is not outstanding")
} else {
outstandingRequests.remove(blockId)
listener.onBlockFetchFailure(blockId, new RuntimeException(errorMsg))
outstandingFetches.remove(blockId)
listener.onBlockFetchFailure(blockId, new BlockFetchFailureException(blockId, errorMsg))
}
case BlockUploadSuccess(blockId) =>
val p = outstandingUploads.get(blockId)
if (p == null) {
logWarning(s"Got a response for upload $blockId from $server but it is not outstanding")
} else {
outstandingUploads.remove(blockId)
p.success(Unit)
}
case BlockUploadFailure(blockId, error) =>
val p = outstandingUploads.get(blockId)
if (p == null) {
logWarning(s"Got a response for upload $blockId from $server but it is not outstanding")
} else {
outstandingUploads.remove(blockId)
p.failure(new BlockUploadFailureException(blockId))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import io.netty.channel._

import org.apache.spark.Logging
import org.apache.spark.network.{ManagedBuffer, BlockDataManager}
import org.apache.spark.storage.StorageLevel


/**
Expand All @@ -39,13 +40,13 @@ private[netty] class BlockServerHandler(dataProvider: BlockDataManager)
override def channelRead0(ctx: ChannelHandlerContext, request: ClientRequest): Unit = {
request match {
case BlockFetchRequest(blockIds) =>
blockIds.foreach(processBlockRequest(ctx, _))
case BlockUploadRequest(blockId, data) =>
// TODO(rxin): handle upload.
blockIds.foreach(processFetchRequest(ctx, _))
case BlockUploadRequest(blockId, data, level) =>
processUploadRequest(ctx, blockId, data, level)
}
} // end of channelRead0

private def processBlockRequest(ctx: ChannelHandlerContext, blockId: String): Unit = {
private def processFetchRequest(ctx: ChannelHandlerContext, blockId: String): Unit = {
// A helper function to send error message back to the client.
def client = ctx.channel.remoteAddress.toString

Expand Down Expand Up @@ -90,4 +91,35 @@ private[netty] class BlockServerHandler(dataProvider: BlockDataManager)
}
)
} // end of processBlockRequest

private def processUploadRequest(
ctx: ChannelHandlerContext,
blockId: String,
data: ManagedBuffer,
level: StorageLevel): Unit = {
// A helper function to send error message back to the client.
def client = ctx.channel.remoteAddress.toString

try {
dataProvider.putBlockData(blockId, data, level)
ctx.writeAndFlush(BlockUploadSuccess(blockId)).addListener(new ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = {
if (!future.isSuccess) {
logError(s"Error sending an ACK back to client $client")
}
}
})
} catch {
case e: Throwable =>
logError(s"Error processing uploaded block $blockId", e)
ctx.writeAndFlush(BlockUploadFailure(blockId, e.getMessage)).addListener(
new ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = {
if (!future.isSuccess) {
logError(s"Error sending an ACK back to client $client")
}
}
})
}
} // end of processUploadRequest
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ final class NettyBlockTransferService(conf: SparkConf) extends BlockTransferServ
hostname: String,
port: Int,
blockId: String,
blockData: ManagedBuffer, level: StorageLevel): Future[Unit] = {
// TODO(rxin): Implement uploadBlock.
???
blockData: ManagedBuffer,
level: StorageLevel): Future[Unit] = {
clientFactory.createClient(hostName, port).uploadBlock(blockId, blockData, level)
}

override def hostName: String = {
Expand Down
Loading