Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5bb88f5
[SPARK-3453] Refactor Netty module to use BlockTransferService.
rxin Sep 9, 2014
9b3b397
Use Epoll.isAvailable in BlockServer as well.
rxin Sep 9, 2014
dd783ff
Added more documentation.
rxin Sep 9, 2014
b5b380e
Reference count buffers and clean them up properly.
rxin Sep 10, 2014
1474824
Fixed ShuffleBlockFetcherIteratorSuite.
rxin Sep 10, 2014
b404da3
Forgot to add TestSerializer to the commit list.
rxin Sep 10, 2014
fbf882d
Shorten NioManagedBuffer and NettyManagedBuffer class names.
rxin Sep 10, 2014
b32c3fe
Added more test cases covering cleanup when fault happens in ShuffleB…
rxin Sep 11, 2014
d135fa3
Fixed style violation.
rxin Sep 11, 2014
1e0d277
Fixed BlockClientHandlerSuite
rxin Sep 11, 2014
6e84cb2
Merge branch 'master' into netty-blockTransferService
rxin Sep 11, 2014
55266d1
Incorporated feedback from Norman:
rxin Sep 12, 2014
f83611e
Added connection pooling.
rxin Sep 12, 2014
6ddaa5d
Removed BlockManager.getLocalShuffleFromDisk.
rxin Sep 12, 2014
8295561
Fixed test hanging.
rxin Sep 12, 2014
d7d0aac
Mark private package visibility and MimaExcludes.
rxin Sep 12, 2014
29fe0cc
Implement java.io.Closeable interface.
rxin Sep 13, 2014
e92dad7
Merge branch 'master' into netty-blockTransferService
rxin Sep 17, 2014
a79a259
Added logging.
rxin Sep 17, 2014
088ed8a
Fixed error message.
rxin Sep 17, 2014
323dfec
Add more debug message.
rxin Sep 29, 2014
5814292
Logging close() in case close() fails.
rxin Sep 29, 2014
f23e682
Merge branch 'master' into netty-blockTransferService
rxin Sep 29, 2014
ba8c441
Fixed tests.
rxin Sep 29, 2014
ca88068
Merge branch 'buffer-debug' into netty-blockTransferService
rxin Sep 29, 2014
dfc2c34
Removed OIO and added num threads settings.
rxin Sep 29, 2014
3fbfd3f
Merge branch 'master' into netty-blockTransferService
rxin Sep 29, 2014
69f5d0a
Copy the buffer in fetchBlockSync.
rxin Sep 29, 2014
bc9ed22
Implemented block uploads.
rxin Sep 30, 2014
a3a09f6
Fix style violation.
rxin Sep 30, 2014
0140d6e
Merge branch 'master' into netty-blockTransferService
rxin Sep 30, 2014
0dae310
Merge with latest master.
rxin Sep 30, 2014
ad09236
Flip buffer.
rxin Sep 30, 2014
bdab2c7
Fixed spark.shuffle.io.receiveBuffer setting.
rxin Sep 30, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-3453] Refactor Netty module to use BlockTransferService.
Also includes some partial support for uploading blocks.
  • Loading branch information
rxin committed Sep 9, 2014
commit 5bb88f5fb7b02557d2c5438275b49993d0956e80
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode

import com.google.common.io.ByteStreams
import io.netty.buffer.{ByteBufInputStream, ByteBuf}
import io.netty.buffer.{Unpooled, ByteBufInputStream, ByteBuf}
import io.netty.channel.DefaultFileRegion

import org.apache.spark.util.ByteBufferInputStream

Expand All @@ -35,7 +36,7 @@ import org.apache.spark.util.ByteBufferInputStream
* - NioByteBufferManagedBuffer: data backed by a NIO ByteBuffer
* - NettyByteBufManagedBuffer: data backed by a Netty ByteBuf
*/
sealed abstract class ManagedBuffer {
abstract class ManagedBuffer {
// Note that all the methods are defined with parenthesis because their implementations can
// have side effects (io operations).

Expand All @@ -54,6 +55,11 @@ sealed abstract class ManagedBuffer {
* it does not go over the limit.
*/
def inputStream(): InputStream

/**
* Convert the buffer into an Netty object, used to write the data out.
*/
private[network] def convertToNetty(): AnyRef
}


Expand All @@ -75,6 +81,11 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
is.skip(offset)
ByteStreams.limit(is, length)
}

private[network] override def convertToNetty(): AnyRef = {
val fileChannel = new FileInputStream(file).getChannel
new DefaultFileRegion(fileChannel, offset, length)
}
}


Expand All @@ -88,6 +99,8 @@ final class NioByteBufferManagedBuffer(buf: ByteBuffer) extends ManagedBuffer {
override def nioByteBuffer() = buf.duplicate()

override def inputStream() = new ByteBufferInputStream(buf)

private[network] override def convertToNetty(): AnyRef = Unpooled.wrappedBuffer(buf)
}


Expand All @@ -102,6 +115,8 @@ final class NettyByteBufManagedBuffer(buf: ByteBuf) extends ManagedBuffer {

override def inputStream() = new ByteBufInputStream(buf)

private[network] override def convertToNetty(): AnyRef = buf

// TODO(rxin): Promote this to top level ManagedBuffer interface and add documentation for it.
def release(): Unit = buf.release()
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,35 @@
* limitations under the License.
*/

package org.apache.spark.network.netty.client
package org.apache.spark.network.netty

import java.util.concurrent.TimeoutException

import io.netty.bootstrap.Bootstrap
import io.netty.buffer.PooledByteBufAllocator
import io.netty.channel.socket.SocketChannel
import io.netty.channel.{ChannelFutureListener, ChannelFuture, ChannelInitializer, ChannelOption}
import io.netty.handler.codec.LengthFieldBasedFrameDecoder
import io.netty.handler.codec.string.StringEncoder
import io.netty.util.CharsetUtil
import io.netty.channel.{ChannelFuture, ChannelFutureListener, ChannelInitializer, ChannelOption}

import org.apache.spark.Logging
import org.apache.spark.network.BlockFetchingListener


/**
* Client for fetching data blocks from [[org.apache.spark.network.netty.server.BlockServer]].
* Use [[BlockFetchingClientFactory]] to instantiate this client.
* Client for [[NettyBlockTransferService]]. Use [[BlockClientFactory]] to
* instantiate this client.
*
* The constructor blocks until a connection is successfully established.
*
* See [[org.apache.spark.network.netty.server.BlockServer]] for client/server protocol.
*
* Concurrency: thread safe and can be called from multiple threads.
*/
@throws[TimeoutException]
private[spark]
class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: String, port: Int)
private[netty]
class BlockClient(factory: BlockClientFactory, hostname: String, port: Int)
extends Logging {

private val handler = new BlockFetchingClientHandler
private val handler = new BlockClientHandler
private val encoder = new ClientRequestEncoder
private val decoder = new ServerResponseDecoder

/** Netty Bootstrap for creating the TCP connection. */
private val bootstrap: Bootstrap = {
Expand All @@ -61,9 +60,9 @@ class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: String,
b.handler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline
.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8))
// maxFrameLength = 2G, lengthFieldOffset = 0, lengthFieldLength = 4
.addLast("framedLengthDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4))
.addLast("clientRequestEncoder", encoder)
.addLast("frameDecoder", ProtocolUtils.createFrameDecoder())
.addLast("serverResponseDecoder", decoder)
.addLast("handler", handler)
}
})
Expand All @@ -86,12 +85,7 @@ class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: String,
* @param blockIds sequence of block ids to fetch.
* @param listener callback to fire on fetch success / failure.
*/
def fetchBlocks(blockIds: Seq[String], listener: BlockClientListener): Unit = {
// It's best to limit the number of "write" calls since it needs to traverse the whole pipeline.
// It's also best to limit the number of "flush" calls since it requires system calls.
// Let's concatenate the string and then call writeAndFlush once.
// This is also why this implementation might be more efficient than multiple, separate
// fetch block calls.
def fetchBlocks(blockIds: Seq[String], listener: BlockFetchingListener): Unit = {
var startTime: Long = 0
logTrace {
startTime = System.nanoTime
Expand All @@ -102,8 +96,7 @@ class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: String,
handler.addRequest(blockId, listener)
}

val writeFuture = cf.channel().writeAndFlush(blockIds.mkString("\n") + "\n")
writeFuture.addListener(new ChannelFutureListener {
cf.channel().writeAndFlush(BlockFetchRequest(blockIds)).addListener(new ChannelFutureListener {
override def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) {
logTrace {
Expand All @@ -116,9 +109,9 @@ class BlockFetchingClient(factory: BlockFetchingClientFactory, hostname: String,
s"Failed to send request $blockIds to $hostname:$port: ${future.cause.getMessage}"
logError(errorMsg, future.cause)
blockIds.foreach { blockId =>
listener.onFetchFailure(blockId, errorMsg)
handler.removeRequest(blockId)
}
listener.onBlockFetchFailure(new RuntimeException(errorMsg))
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,34 @@
* limitations under the License.
*/

package org.apache.spark.network.netty.client
package org.apache.spark.network.netty

import io.netty.channel.epoll.{EpollEventLoopGroup, EpollSocketChannel}
import io.netty.channel.epoll.{Epoll, EpollEventLoopGroup, EpollSocketChannel}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.oio.OioEventLoopGroup
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.channel.socket.oio.OioSocketChannel
import io.netty.channel.{EventLoopGroup, Channel}
import io.netty.channel.{Channel, EventLoopGroup}

import org.apache.spark.SparkConf
import org.apache.spark.network.netty.NettyConfig
import org.apache.spark.util.Utils


/**
* Factory for creating [[BlockFetchingClient]] by using createClient. This factory reuses
* Factory for creating [[BlockClient]] by using createClient. This factory reuses
* the worker thread pool for Netty.
*
* Concurrency: createClient is safe to be called from multiple threads concurrently.
*/
private[spark]
class BlockFetchingClientFactory(val conf: NettyConfig) {
private[netty]
class BlockClientFactory(val conf: NettyConfig) {

def this(sparkConf: SparkConf) = this(new NettyConfig(sparkConf))

/** A thread factory so the threads are named (for debugging). */
val threadFactory = Utils.namedThreadFactory("spark-shuffle-client")
private[netty] val threadFactory = Utils.namedThreadFactory("spark-shuffle-client")

/** The following two are instantiated by the [[init]] method, depending ioMode. */
var socketChannelClass: Class[_ <: Channel] = _
var workerGroup: EventLoopGroup = _
private[netty] var socketChannelClass: Class[_ <: Channel] = _
private[netty] var workerGroup: EventLoopGroup = _

init()

Expand All @@ -63,20 +61,12 @@ class BlockFetchingClientFactory(val conf: NettyConfig) {
workerGroup = new EpollEventLoopGroup(0, threadFactory)
}

// For auto mode, first try epoll (only available on Linux), then nio.
conf.ioMode match {
case "nio" => initNio()
case "oio" => initOio()
case "epoll" => initEpoll()
case "auto" =>
// For auto mode, first try epoll (only available on Linux), then nio.
try {
initEpoll()
} catch {
// TODO: Should we log the throwable? But that always happen on non-Linux systems.
// Perhaps the right thing to do is to check whether the system is Linux, and then only
// call initEpoll on Linux.
case e: Throwable => initNio()
}
case "auto" => if (Epoll.isAvailable) initEpoll() else initNio()
}
}

Expand All @@ -87,8 +77,8 @@ class BlockFetchingClientFactory(val conf: NettyConfig) {
*
* Concurrency: This method is safe to call from multiple threads.
*/
def createClient(remoteHost: String, remotePort: Int): BlockFetchingClient = {
new BlockFetchingClient(this, remoteHost, remotePort)
def createClient(remoteHost: String, remotePort: Int): BlockClient = {
new BlockClient(this, remoteHost, remotePort)
}

def stop(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.netty

import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}

import org.apache.spark.Logging
import org.apache.spark.network.BlockFetchingListener


/**
* Handler that processes server responses.
*
* Concurrency: thread safe and can be called from multiple threads.
*/
private[netty]
class BlockClientHandler extends SimpleChannelInboundHandler[ServerResponse] with Logging {

/** Tracks the list of outstanding requests and their listeners on success/failure. */
private val outstandingRequests = java.util.Collections.synchronizedMap {
new java.util.HashMap[String, BlockFetchingListener]
}

def addRequest(blockId: String, listener: BlockFetchingListener): Unit = {
outstandingRequests.put(blockId, listener)
}

def removeRequest(blockId: String): Unit = {
outstandingRequests.remove(blockId)
}

override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
val errorMsg = s"Exception in connection from ${ctx.channel.remoteAddress}: ${cause.getMessage}"
logError(errorMsg, cause)

// Fire the failure callback for all outstanding blocks
outstandingRequests.synchronized {
val iter = outstandingRequests.entrySet().iterator()
while (iter.hasNext) {
val entry = iter.next()
entry.getValue.onBlockFetchFailure(cause)
}
outstandingRequests.clear()
}

ctx.close()
}

override def channelRead0(ctx: ChannelHandlerContext, response: ServerResponse) {
val server = ctx.channel.remoteAddress.toString
response match {
case BlockFetchSuccess(blockId, buf) =>
val listener = outstandingRequests.get(blockId)
if (listener == null) {
logWarning(s"Got a response for block $blockId from $server but it is not outstanding")
} else {
outstandingRequests.remove(blockId)
listener.onBlockFetchSuccess(blockId, buf)
}
case BlockFetchFailure(blockId, errorMsg) =>
val listener = outstandingRequests.get(blockId)
if (listener == null) {
logWarning(
s"Got a response for block $blockId from $server ($errorMsg) but it is not outstanding")
} else {
outstandingRequests.remove(blockId)
listener.onBlockFetchFailure(new RuntimeException(errorMsg))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,33 @@
* limitations under the License.
*/

package org.apache.spark.network.netty.server
package org.apache.spark.network.netty

import java.net.InetSocketAddress

import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.PooledByteBufAllocator
import io.netty.channel.{ChannelFuture, ChannelInitializer, ChannelOption}
import io.netty.channel.epoll.{EpollEventLoopGroup, EpollServerSocketChannel}
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.oio.OioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.channel.socket.oio.OioServerSocketChannel
import io.netty.handler.codec.LineBasedFrameDecoder
import io.netty.handler.codec.string.StringDecoder
import io.netty.util.CharsetUtil
import io.netty.channel.{ChannelInitializer, ChannelFuture, ChannelOption}
import io.netty.handler.codec.LengthFieldBasedFrameDecoder

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.network.netty.NettyConfig
import org.apache.spark.storage.BlockDataProvider
import org.apache.spark.network.BlockDataManager
import org.apache.spark.util.Utils


/**
* Server for serving Spark data blocks.
* This should be used together with [[org.apache.spark.network.netty.client.BlockFetchingClient]].
*
* Protocol for requesting blocks (client to server):
* One block id per line, e.g. to request 3 blocks: "block1\nblock2\nblock3\n"
*
* Protocol for sending blocks (server to client):
* frame-length (4 bytes), block-id-length (4 bytes), block-id, block-data.
*
* frame-length should not include the length of itself.
* If block-id-length is negative, then this is an error message rather than block-data. The real
* length is the absolute value of the frame-length.
*
* Server for the [[NettyBlockTransferService]].
*/
private[spark]
class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Logging {
private[netty]
class BlockServer(conf: NettyConfig, dataProvider: BlockDataManager) extends Logging {

def this(sparkConf: SparkConf, dataProvider: BlockDataProvider) = {
def this(sparkConf: SparkConf, dataProvider: BlockDataManager) = {
this(new NettyConfig(sparkConf), dataProvider)
}

Expand Down Expand Up @@ -129,10 +114,10 @@ class BlockServer(conf: NettyConfig, dataProvider: BlockDataProvider) extends Lo

bootstrap.childHandler(new ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel): Unit = {
ch.pipeline
.addLast("frameDecoder", new LineBasedFrameDecoder(1024)) // max block id length 1024
.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8))
.addLast("blockHeaderEncoder", new BlockHeaderEncoder)
val p = ch.pipeline
.addLast("frameDecoder", ProtocolUtils.createFrameDecoder())
.addLast("clientRequestDecoder", new ClientRequestDecoder)
.addLast("serverResponseEncoder", new ServerResponseEncoder)
.addLast("handler", new BlockServerHandler(dataProvider))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this handler run on separate EventLoopGroup? since GetBlockData might block.

}
})
Expand Down
Loading