Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
more fixups to get things compiling
  • Loading branch information
jsoltren committed Apr 4, 2017
commit 543cd88f18210311691c86db30eb6ad8618386b6
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ private static class ClientPool {
private EventLoopGroup workerGroup;
private PooledByteBufAllocator pooledAllocator;

public PooledByteBufAllocator getPooledAllocator() { return pooledAllocator; }

public TransportClientFactory(
TransportContext context,
List<TransportClientBootstrap> clientBootstraps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class TransportServer implements Closeable {

private ServerBootstrap bootstrap;
private ChannelFuture channelFuture;
private PooledByteBufAllocator allocator;
private int port = -1;

/**
Expand All @@ -78,6 +79,8 @@ public TransportServer(
}
}

public PooledByteBufAllocator getAllocator() { return allocator; }

public int getPort() {
if (port == -1) {
throw new IllegalStateException("Server not initialized");
Expand All @@ -92,7 +95,7 @@ private void init(String hostToBind, int portToBind) {
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;

PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());

bootstrap = new ServerBootstrap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
// totally there are 16 lines, including SparkListenerLogStart event and 15 other events
assert(lines.size === 16)

val listenerBus = new LiveListenerBus
val listenerBus = new LiveListenerBus(sc)
val memoryListener = new MemoryListener
listenerBus.addListener(memoryListener)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.scheduler

import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AccumulatorV2
Expand Down Expand Up @@ -87,6 +88,7 @@ private class DummyTaskScheduler extends TaskScheduler {
override def applicationAttemptId(): Option[String] = None
def executorHeartbeatReceived(
execId: String,
executorMetrics: ExecutorMetrics,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId): Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._

import org.apache.spark._
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.executor.{DataReadMethod, ExecutorMetrics}
import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
Expand Down Expand Up @@ -1257,6 +1257,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
listener.onBlockFetchSuccess("mockBlockId", new NioManagedBuffer(ByteBuffer.allocate(1)))
}

override def getMemMetrics(executorMetrics: ExecutorMetrics): Unit = {}

override def close(): Unit = {}

override def hostName: String = { "MockBlockTransferServiceHost" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,7 @@ public ClientPool(int size) {
private EventLoopGroup workerGroup;
private PooledByteBufAllocator pooledAllocator;

public PooledByteBufAllocator getPooledAllocator() {
return pooledAllocator;
}
public PooledByteBufAllocator getPooledAllocator() { return pooledAllocator; }

public TransportClientFactory(
TransportContext context,
Expand Down