Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
b7c9c23
Move Unsafe mem. mgrs. to spark-core subproject.
JoshRosen Oct 14, 2015
25ba4b5
Merge ExecutorMemoryManager into MemoryManager.
JoshRosen Oct 14, 2015
3d997ce
Naming and formatting fixes.
JoshRosen Oct 16, 2015
d9e6b84
Move Tungsten-related methods to end of MemoryManager file.
JoshRosen Oct 16, 2015
98ef86b
Add taskAttemptId to TaskMemoryManager constructor.
JoshRosen Oct 16, 2015
8f93e94
Move ShuffleMemoryManager into memory package.
JoshRosen Oct 16, 2015
3bbc54d
Merge remote-tracking branch 'origin/master' into SPARK-10984
JoshRosen Oct 16, 2015
88a7970
Fix bug in AbstractBytesToBytesMapSuite.
JoshRosen Oct 16, 2015
ec48ff9
Refactor the existing Tungsten TaskMemoryManager interactions so Tung…
JoshRosen Oct 16, 2015
6f98bc4
Move TaskMemoryManager from unsafe to memory.
JoshRosen Oct 16, 2015
6459397
Further minimization of ShuffleMemoryManager usage.
JoshRosen Oct 16, 2015
60c66b2
Merge ShuffleMemoryManager into MemoryManager.
JoshRosen Oct 17, 2015
7d6a37f
Clean up interaction between TaskMemoryManager and MemoryManager.
JoshRosen Oct 17, 2015
0dc21dc
Merge remote-tracking branch 'origin/master' into SPARK-10984
JoshRosen Oct 22, 2015
f21b767
Fix compilation.
JoshRosen Oct 22, 2015
46ad693
Fix Scalastyle
JoshRosen Oct 22, 2015
c33e330
Fix import ordering in Executor.scala
JoshRosen Oct 22, 2015
ef45d91
Fix import ordering in Task.scala
JoshRosen Oct 22, 2015
c7eac69
Fix import ordering in TaskContextImpl
JoshRosen Oct 22, 2015
d86f435
Fix spillable collection tests
JoshRosen Oct 22, 2015
bba5550
Integrate TaskMemoryManager acquire/releasePage with MemoryManager bo…
JoshRosen Oct 22, 2015
66ae259
Move pooling logic into allocators themselves.
JoshRosen Oct 22, 2015
b1d5151
Scaladoc updates.
JoshRosen Oct 22, 2015
d0c0dd9
Update Spillable to properly integrate with TaskMemoryManager.
JoshRosen Oct 22, 2015
48149fc
Move pageSizeBytes to Tungsten section
JoshRosen Oct 23, 2015
c8ba196
Cleanup after merging of ShuffleMemoryManager into MemoryManager.
JoshRosen Oct 23, 2015
63a6cbc
Rename getMemoryConsumptionForThisTask to getExecutionMemoryUsageForTask
JoshRosen Oct 23, 2015
6ec9c30
Properly thread numCores to memory manager.
JoshRosen Oct 23, 2015
1593fad
Explain why MemoryBlock.pageNumber is public
JoshRosen Oct 23, 2015
64bec0b
Fix TaskMemoryManagerSuite tests.
JoshRosen Oct 23, 2015
f9240e9
Fix compilation
JoshRosen Oct 23, 2015
a95bc08
Fix a memory leak in UnsafeShuffleWriter's sorter
JoshRosen Oct 23, 2015
b3ad761
Remove println
JoshRosen Oct 23, 2015
a7e8320
Fix Scalastyle.
JoshRosen Oct 23, 2015
e874a45
Fix remaining TODOs in UnsafeShuffleWriterSuite.
JoshRosen Oct 23, 2015
2ba6e51
Fix DeveloperAPI change
JoshRosen Oct 23, 2015
0c13723
Address comments in MemoryManager
JoshRosen Oct 23, 2015
04ec429
Release memory acquired after unsuccessful allocatePage() call
JoshRosen Oct 23, 2015
e56d039
Fix EAOM compilation.
JoshRosen Oct 23, 2015
aa14113
Port tests from ShuffleMemoryManagerSuite
JoshRosen Oct 23, 2015
7addf8b
Remove unused non-page-memory allocation methods.
JoshRosen Oct 23, 2015
5af0b17
Update Tungsten tests
JoshRosen Oct 23, 2015
a264703
Fix execution memory leaks in Spillable collections
JoshRosen Oct 24, 2015
f2ab708
Fix NPE in UnsafeRowSerializerSuite
JoshRosen Oct 24, 2015
0b5c72f
Update EAOM tests to reflect fact that iterator() is destructive.
JoshRosen Oct 24, 2015
f68fdb1
Fix streaming test compilation
JoshRosen Oct 26, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Move TaskMemoryManager from unsafe to memory.
  • Loading branch information
JoshRosen committed Oct 16, 2015
commit 6f98bc4fe7e1291fe2fa45e78c4ee257b7d57c2e
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.unsafe.memory;
package org.apache.spark.memory;

import java.util.*;

Expand All @@ -24,8 +24,7 @@
import org.slf4j.LoggerFactory;

import org.apache.spark.SparkEnv$;
import org.apache.spark.memory.MemoryManager;
import org.apache.spark.memory.ShuffleMemoryManager;
import org.apache.spark.unsafe.memory.MemoryBlock;

/**
* Manages the memory allocated by an individual task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.shuffle.unsafe;

import org.apache.spark.memory.TaskMemoryManager;

/**
* Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
* <p>
Expand All @@ -26,7 +28,7 @@
* </pre>
* This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
* our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
* 13-bit page numbers assigned by {@link org.apache.spark.unsafe.memory.TaskMemoryManager}), this
* 13-bit page numbers assigned by {@link TaskMemoryManager}), this
* implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
* <p>
* Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.TimeTrackingOutputStream;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;

@Private
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.spark.unsafe.hash.Murmur3_x86_32;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.MemoryLocation;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;

/**
* An append-only hash map where keys and values are contiguous regions of bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.spark.util.collection.unsafe.sort;

import org.apache.spark.memory.TaskMemoryManager;

final class RecordPointerAndKeyPrefix {
/**
* A pointer to a record; see {@link org.apache.spark.unsafe.memory.TaskMemoryManager} for a
* A pointer to a record; see {@link TaskMemoryManager} for a
* description of how these addresses are encoded.
*/
public long recordPointer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.collection.Sorter;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;

/**
* Sorts records using an AlphaSort-style key-prefix sort. This sort stores pointers to records
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import java.io.Serializable

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.Source
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.TaskCompletionListener


Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@

package org.apache.spark

import org.apache.spark.memory.TaskMemoryManager

import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.metrics.source.Source
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.{TaskCompletionListener, TaskCompletionListenerException}

private[spark] class TaskContextImpl(
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import org.apache.spark.memory.TaskMemoryManager

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal
Expand All @@ -32,7 +34,6 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util._

/**
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package org.apache.spark.scheduler
import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream}
import java.nio.ByteBuffer

import org.apache.spark.memory.TaskMemoryManager

import scala.collection.mutable.HashMap

import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.{Accumulator, SparkEnv, TaskContextImpl, TaskContext}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.util.ByteBufferInputStream
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.unsafe.memory;
package org.apache.spark.memory;

import org.junit.Assert;
import org.junit.Test;

import org.apache.spark.SparkConf;
import org.apache.spark.memory.GrantEverythingMemoryManager;
import org.apache.spark.unsafe.memory.MemoryBlock;

public class TaskMemoryManagerSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.memory.GrantEverythingMemoryManager;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
import static org.apache.spark.shuffle.unsafe.PackedRecordPointer.*;

public class PackedRecordPointerSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.memory.GrantEverythingMemoryManager;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;

public class UnsafeShuffleInMemorySorterSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.apache.spark.memory.ShuffleMemoryManager;
import org.apache.spark.storage.*;
import org.apache.spark.memory.GrantEverythingMemoryManager;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;

public class UnsafeShuffleWriterSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.nio.ByteBuffer;
import java.util.*;

import org.apache.spark.memory.TaskMemoryManager;
import org.junit.*;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
import org.apache.spark.memory.ShuffleMemoryManager;
import org.apache.spark.storage.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;

public class UnsafeExternalSorterSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.memory.GrantEverythingMemoryManager;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;

public class UnsafeInMemorySorterSuite {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryLocation;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;

/**
* Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.collection.unsafe.sort.*;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.nio.ByteOrder
import java.util.{HashMap => JavaHashMap}

import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.memory.{TaskMemoryManager, StaticMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.execution.local.LocalNode
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetrics}
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.unsafe.memory.{MemoryLocation, TaskMemoryManager}
import org.apache.spark.unsafe.memory.MemoryLocation
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.{SparkConf, SparkEnv}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import scala.util.{Try, Random}
import org.scalatest.Matchers

import org.apache.spark.{SparkConf, TaskContextImpl, TaskContext, SparkFunSuite}
import org.apache.spark.memory.{ShuffleMemoryManager, GrantEverythingMemoryManager, TestShuffleMemoryManager}
import org.apache.spark.memory.{TaskMemoryManager, ShuffleMemoryManager, GrantEverythingMemoryManager, TestShuffleMemoryManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.memory.TaskMemoryManager
import org.apache.spark.unsafe.types.UTF8String

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ package org.apache.spark.sql.execution
import scala.util.Random

import org.apache.spark._
import org.apache.spark.memory.{GrantEverythingMemoryManager, TestShuffleMemoryManager}
import org.apache.spark.memory.{TaskMemoryManager, GrantEverythingMemoryManager, TestShuffleMemoryManager}
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{InterpretedOrdering, UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.memory.TaskMemoryManager

/**
* Test suite for [[UnsafeKVExternalSorter]], with randomly generated test data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
package org.apache.spark.sql.execution.aggregate

import org.apache.spark._
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.unsafe.memory.TaskMemoryManager

class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLContext {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ public class MemoryBlock extends MemoryLocation {

private final long length;

// TODO(josh)
/**
* Optional page number; used when this MemoryBlock represents a page allocated by a
* MemoryManager. This is package-private and is modified by MemoryManager.
*/
int pageNumber = -1;
public int pageNumber = -1;

public MemoryBlock(@Nullable Object obj, long offset, long length) {
super(obj, offset);
Expand Down