Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix bug.
  • Loading branch information
JoshRosen committed Jul 29, 2016
commit c412f991c145cf02affd7c2d38de016a9b7f548d
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.rpc.netty
import java.io.File
import java.util.concurrent.ConcurrentHashMap

import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.server.StreamManager
import org.apache.spark.rpc.RpcEnvFileServer
Expand All @@ -37,7 +38,7 @@ import org.apache.spark.util.Utils
* Only streaming (openStream) is supported.
*/
private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
extends StreamManager with RpcEnvFileServer {
extends StreamManager with RpcEnvFileServer with Logging {

private val files = new ConcurrentHashMap[String, File]()
private val jars = new ConcurrentHashMap[String, File]()
Expand Down Expand Up @@ -66,15 +67,21 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
}

override def addFile(file: File): String = {
require(files.putIfAbsent(file.getName(), file) == null,
s"File ${file.getName()} already registered.")
val oldFile = files.put(file.getName, file)
if (oldFile != file) {
logInfo(
s"File ${file.getName} will now be served from $file (was previously served from $oldFile)")
}
s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}"
}

override def addJar(file: File): String = {
require(jars.putIfAbsent(file.getName(), file) == null,
s"JAR ${file.getName()} already registered.")
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(file.getName())}"
override def addJar(jar: File): String = {
val oldJar = jars.put(jar.getName, jar)
if (oldJar != jar) {
logInfo(
s"JAR ${jar.getName} will now be served from $jar (was previously served from $oldJar)")
}
s"${rpcEnv.address.toSparkURL}/jars/${Utils.encodeFileNameToURIRawPath(jar.getName())}"
}

override def addDirectory(baseUri: String, path: File): String = {
Expand Down