Skip to content

Commit 1be4e8e

Browse files
rxinaarondav
authored andcommitted
Shorten NioManagedBuffer and NettyManagedBuffer class names.
1 parent 108c9ed commit 1be4e8e

File tree

7 files changed

+17
-17
lines changed

7 files changed

+17
-17
lines changed

core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ import org.apache.spark.util.{ByteBufferInputStream, Utils}
3636
* should specify how the data is provided:
3737
*
3838
* - [[FileSegmentManagedBuffer]]: data backed by part of a file
39-
* - [[NioByteBufferManagedBuffer]]: data backed by a NIO ByteBuffer
40-
* - [[NettyByteBufManagedBuffer]]: data backed by a Netty ByteBuf
39+
* - [[NioManagedBuffer]]: data backed by a NIO ByteBuffer
40+
* - [[NettyManagedBuffer]]: data backed by a Netty ByteBuf
4141
*
4242
* The concrete buffer implementation might be managed outside the JVM garbage collector.
43-
* For example, in the case of [[NettyByteBufManagedBuffer]], the buffers are reference counted.
43+
* For example, in the case of [[NettyManagedBuffer]], the buffers are reference counted.
4444
* In that case, if the buffer is going to be passed around to a different thread, retain/release
4545
* should be called.
4646
*/
@@ -149,7 +149,7 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt
149149
/**
150150
* A [[ManagedBuffer]] backed by [[java.nio.ByteBuffer]].
151151
*/
152-
final class NioByteBufferManagedBuffer(buf: ByteBuffer) extends ManagedBuffer {
152+
final class NioManagedBuffer(buf: ByteBuffer) extends ManagedBuffer {
153153

154154
override def size: Long = buf.remaining()
155155

@@ -168,7 +168,7 @@ final class NioByteBufferManagedBuffer(buf: ByteBuffer) extends ManagedBuffer {
168168
/**
169169
* A [[ManagedBuffer]] backed by a Netty [[ByteBuf]].
170170
*/
171-
final class NettyByteBufManagedBuffer(buf: ByteBuf) extends ManagedBuffer {
171+
final class NettyManagedBuffer(buf: ByteBuf) extends ManagedBuffer {
172172

173173
override def size: Long = buf.readableBytes()
174174

core/src/main/scala/org/apache/spark/network/netty/protocol.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import io.netty.channel.ChannelHandler.Sharable
2525
import io.netty.handler.codec._
2626

2727
import org.apache.spark.Logging
28-
import org.apache.spark.network.{NettyByteBufManagedBuffer, ManagedBuffer}
28+
import org.apache.spark.network.{NettyManagedBuffer, ManagedBuffer}
2929

3030

3131
/** Messages from the client to the server. */
@@ -141,7 +141,7 @@ final class ClientRequestDecoder extends MessageToMessageDecoder[ByteBuf] {
141141
case 1 => // BlockUploadRequest
142142
val blockId = ProtocolUtils.readBlockId(in)
143143
in.retain() // retain the bytebuf so we don't recycle it immediately.
144-
BlockUploadRequest(blockId, new NettyByteBufManagedBuffer(in))
144+
BlockUploadRequest(blockId, new NettyManagedBuffer(in))
145145
}
146146

147147
assert(decoded.id == msgTypeId)
@@ -218,7 +218,7 @@ final class ServerResponseDecoder extends MessageToMessageDecoder[ByteBuf] {
218218
case 0 => // BlockFetchSuccess
219219
val blockId = ProtocolUtils.readBlockId(in)
220220
in.retain()
221-
new BlockFetchSuccess(blockId, new NettyByteBufManagedBuffer(in))
221+
new BlockFetchSuccess(blockId, new NettyManagedBuffer(in))
222222

223223
case 1 => // BlockFetchFailure
224224
val blockId = ProtocolUtils.readBlockId(in)

core/src/main/scala/org/apache/spark/network/nio/NioBlockTransferService.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
104104
val blockId = blockMessage.getId
105105
val networkSize = blockMessage.getData.limit()
106106
listener.onBlockFetchSuccess(
107-
blockId.toString, new NioByteBufferManagedBuffer(blockMessage.getData))
107+
blockId.toString, new NioManagedBuffer(blockMessage.getData))
108108
}
109109
}
110110
}(cm.futureExecContext)
@@ -189,7 +189,7 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
189189
private def putBlock(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
190190
val startTimeMs = System.currentTimeMillis()
191191
logDebug("PutBlock " + blockId + " started from " + startTimeMs + " with data: " + bytes)
192-
blockDataManager.putBlockData(blockId, new NioByteBufferManagedBuffer(bytes), level)
192+
blockDataManager.putBlockData(blockId, new NioManagedBuffer(bytes), level)
193193
logDebug("PutBlock " + blockId + " used " + Utils.getUsedTimeMs(startTimeMs)
194194
+ " with data size: " + bytes.limit)
195195
}

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ private[spark] class BlockManager(
223223
val blockBytesOpt = doGetLocal(bid, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
224224
if (blockBytesOpt.isDefined) {
225225
val buffer = blockBytesOpt.get
226-
new NioByteBufferManagedBuffer(buffer)
226+
new NioManagedBuffer(buffer)
227227
} else {
228228
throw new BlockNotFoundException(blockId)
229229
}
@@ -868,7 +868,7 @@ private[spark] class BlockManager(
868868
data.rewind()
869869
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
870870
blockTransferService.uploadBlockSync(
871-
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
871+
peer.host, peer.port, blockId.toString, new NioManagedBuffer(data), tLevel)
872872
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms"
873873
.format((System.currentTimeMillis - onePeerStartTime)))
874874
peersReplicatedTo += peer

core/src/test/scala/org/apache/spark/network/netty/BlockClientHandlerSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class BlockClientHandlerSuite extends FunSuite with PrivateMethodTester {
6464
buf.put(blockData.getBytes)
6565
buf.flip()
6666

67-
channel.writeInbound(BlockFetchSuccess(blockId, new NioByteBufferManagedBuffer(buf)))
67+
channel.writeInbound(BlockFetchSuccess(blockId, new NioManagedBuffer(buf)))
6868

6969
assert(parsedBlockId === blockId)
7070
assert(parsedBlockData === blockData)
@@ -119,7 +119,7 @@ class BlockClientHandlerSuite extends FunSuite with PrivateMethodTester {
119119
assert(sizeOfOutstandingRequests(handler) === 3)
120120

121121
val channel = new EmbeddedChannel(handler)
122-
channel.writeInbound(BlockFetchSuccess("b1", new NettyByteBufManagedBuffer(Unpooled.buffer())))
122+
channel.writeInbound(BlockFetchSuccess("b1", new NettyManagedBuffer(Unpooled.buffer())))
123123
// Need to figure out a way to generate an exception
124124
assert(successCount.get() === 1)
125125
assert(errorCount.get() === 2)

core/src/test/scala/org/apache/spark/network/netty/ServerClientIntegrationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ class ServerClientIntegrationSuite extends FunSuite with BeforeAndAfterAll {
6565
server = new BlockServer(new NettyConfig(new SparkConf), new BlockDataManager {
6666
override def getBlockData(blockId: String): ManagedBuffer = {
6767
if (blockId == bufferBlockId) {
68-
new NioByteBufferManagedBuffer(buf)
68+
new NioManagedBuffer(buf)
6969
} else if (blockId == fileBlockId) {
7070
new FileSegmentManagedBuffer(testFile, 10, testFile.length - 25)
7171
} else {

core/src/test/scala/org/apache/spark/network/netty/TestManagedBuffer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
2222

2323
import io.netty.buffer.Unpooled
2424

25-
import org.apache.spark.network.{NettyByteBufManagedBuffer, ManagedBuffer}
25+
import org.apache.spark.network.{NettyManagedBuffer, ManagedBuffer}
2626

2727

2828
/**
@@ -36,7 +36,7 @@ class TestManagedBuffer(len: Int) extends ManagedBuffer {
3636

3737
private val byteArray: Array[Byte] = Array.tabulate[Byte](len)(_.toByte)
3838

39-
private val underlying = new NettyByteBufManagedBuffer(Unpooled.wrappedBuffer(byteArray))
39+
private val underlying = new NettyManagedBuffer(Unpooled.wrappedBuffer(byteArray))
4040

4141
override def size: Long = underlying.size
4242

0 commit comments

Comments
 (0)