-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative memory management #9241
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
d0ada7b
ee6b9a4
49b8135
7087f2f
ce24f03
8470fc9
827d4f0
a3e01d0
7bf76e5
51278f8
27ff4fc
4491013
c044afe
afc8c7c
cda4b2a
4ee1f42
e943e74
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.memory; | ||
|
|
||
|
|
||
| import java.io.IOException; | ||
|
|
||
| import org.apache.spark.unsafe.memory.MemoryBlock; | ||
|
|
||
|
|
||
| /** | ||
| * An memory consumer of TaskMemoryManager, which support spilling. | ||
| */ | ||
| public class MemoryConsumer { | ||
|
|
||
| private TaskMemoryManager memoryManager; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mind renaming this variable to |
||
| private long pageSize; | ||
|
|
||
| protected MemoryConsumer(TaskMemoryManager memoryManager, long pageSize) { | ||
| this.memoryManager = memoryManager; | ||
| this.pageSize = pageSize; | ||
| } | ||
|
|
||
| protected MemoryConsumer(TaskMemoryManager memoryManager) { | ||
| this(memoryManager, memoryManager.pageSizeBytes()); | ||
| } | ||
|
|
||
| /** | ||
| * Spill some data to disk to release memory, which will be called by TaskMemoryManager | ||
| * when there is not enough memory for the task. | ||
| * | ||
| * @param size the amount of memory should be released | ||
| * @return the amount of released memory in bytes | ||
| * @throws IOException | ||
| */ | ||
| public long spill(long size) throws IOException { | ||
| return 0L; | ||
| } | ||
|
|
||
| /** | ||
| * Acquire `size` bytes memory. | ||
| * | ||
| * If there is not enough memory, throws IOException. | ||
| * | ||
| * @throws IOException | ||
| */ | ||
| protected void acquireMemory(long size) throws IOException { | ||
| long got = memoryManager.acquireExecutionMemory(size, this); | ||
| if (got < size) { | ||
| throw new IOException("Could not acquire " + size + " bytes of memory " + got); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Release amount of memory. | ||
| */ | ||
| protected void releaseMemory(long size) { | ||
| memoryManager.releaseExecutionMemory(size, this); | ||
| } | ||
|
|
||
| /** | ||
| * Allocate a memory block with at least `required` bytes. | ||
| * | ||
| * Throws IOException if there is not enough memory. | ||
| * | ||
| * @throws IOException | ||
| */ | ||
| protected MemoryBlock allocatePage(long required) throws IOException { | ||
| MemoryBlock page = memoryManager.allocatePage(Math.max(pageSize, required), this); | ||
| if (page == null || page.size() < required) { | ||
| if (page != null) { | ||
| freePage(page); | ||
| } | ||
| throw new IOException("Unable to acquire " + required + " bytes of memory"); | ||
| } | ||
| return page; | ||
| } | ||
|
|
||
| /** | ||
| * Free a memory block. | ||
| */ | ||
| protected void freePage(MemoryBlock page) { | ||
| memoryManager.freePage(page, this); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,9 @@ | |
|
|
||
| package org.apache.spark.memory; | ||
|
|
||
| import java.util.*; | ||
| import java.io.IOException; | ||
| import java.util.BitSet; | ||
| import java.util.HashMap; | ||
|
|
||
| import com.google.common.annotations.VisibleForTesting; | ||
| import org.slf4j.Logger; | ||
|
|
@@ -100,13 +102,19 @@ public class TaskMemoryManager { | |
| */ | ||
| private final boolean inHeap; | ||
|
|
||
| /** | ||
| * The size of memory granted to each consumer. | ||
| */ | ||
| private HashMap<MemoryConsumer, Long> consumers; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could be final. |
||
|
|
||
| /** | ||
| * Construct a new TaskMemoryManager. | ||
| */ | ||
| public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) { | ||
| this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap(); | ||
| this.memoryManager = memoryManager; | ||
| this.taskAttemptId = taskAttemptId; | ||
| this.consumers = new HashMap<>(); | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -117,13 +125,75 @@ public long acquireExecutionMemory(long size) { | |
| return memoryManager.acquireExecutionMemory(size, taskAttemptId); | ||
| } | ||
|
|
||
| /** | ||
| * Acquire N bytes of memory for a consumer. If there is no enough memory, it will call | ||
| * spill() of consumers to release more memory. | ||
| * | ||
| * @return number of bytes successfully granted (<= N). | ||
| */ | ||
| public long acquireExecutionMemory(long size, MemoryConsumer consumer) throws IOException { | ||
| synchronized (this) { | ||
| long got = acquireExecutionMemory(size); | ||
|
|
||
| if (got < size && consumer != null) { | ||
| // call spill() on itself to release some memory | ||
| consumer.spill(size - got); | ||
| got += acquireExecutionMemory(size - got); | ||
|
|
||
| if (got < size) { | ||
| long needed = size - got; | ||
| // call spill() on other consumers to release memory | ||
| for (MemoryConsumer c: consumers.keySet()) { | ||
| if (c != consumer) { | ||
| needed -= c.spill(size - got); | ||
| if (needed < 0) { | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| got += acquireExecutionMemory(size - got); | ||
| } | ||
| } | ||
|
|
||
| long old = 0L; | ||
| if (consumers.containsKey(consumer)) { | ||
| old = consumers.get(consumer); | ||
| } | ||
| consumers.put(consumer, got + old); | ||
|
|
||
| return got; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Release N bytes of execution memory. | ||
| */ | ||
| public void releaseExecutionMemory(long size) { | ||
| memoryManager.releaseExecutionMemory(size, taskAttemptId); | ||
| } | ||
|
|
||
| /** | ||
| * Release N bytes of execution memory for a MemoryConsumer. | ||
| */ | ||
| public void releaseExecutionMemory(long size, MemoryConsumer consumer) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add an assert to make sure
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
| synchronized (this) { | ||
| if (consumer != null && consumers.containsKey(consumer)) { | ||
| long old = consumers.get(consumer); | ||
| if (old > size) { | ||
| consumers.put(consumer, old - size); | ||
| } else { | ||
| if (old < size) { | ||
| // TODO | ||
| } | ||
| consumers.remove(consumer); | ||
| } | ||
| } else { | ||
| // TODO | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the |
||
| memoryManager.releaseExecutionMemory(size, taskAttemptId); | ||
| } | ||
| } | ||
|
|
||
| public long pageSizeBytes() { | ||
| return memoryManager.pageSizeBytes(); | ||
| } | ||
|
|
@@ -134,12 +204,27 @@ public long pageSizeBytes() { | |
| * | ||
| * Returns `null` if there was not enough memory to allocate the page. | ||
| */ | ||
| public MemoryBlock allocatePage(long size) { | ||
| public MemoryBlock allocatePage(long size) throws IOException { | ||
| return allocatePage(size, null); | ||
| } | ||
|
|
||
| /** | ||
| * Allocate a block of memory that will be tracked in the MemoryManager's page table; this is | ||
| * intended for allocating large blocks of Tungsten memory that will be shared between operators. | ||
| * | ||
| * Returns `null` if there was not enough memory to allocate the page. | ||
| */ | ||
| public MemoryBlock allocatePage(long size, MemoryConsumer consumer) throws IOException { | ||
| if (size > MAXIMUM_PAGE_SIZE_BYTES) { | ||
| throw new IllegalArgumentException( | ||
| "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes"); | ||
| } | ||
|
|
||
| long acquired = acquireExecutionMemory(size, consumer); | ||
| if (acquired <= 0) { | ||
| return null; | ||
| } | ||
|
|
||
| final int pageNumber; | ||
| synchronized (this) { | ||
| pageNumber = allocatedPages.nextClearBit(0); | ||
|
|
@@ -149,14 +234,6 @@ public MemoryBlock allocatePage(long size) { | |
| } | ||
| allocatedPages.set(pageNumber); | ||
| } | ||
| final long acquiredExecutionMemory = acquireExecutionMemory(size); | ||
| if (acquiredExecutionMemory != size) { | ||
| releaseExecutionMemory(acquiredExecutionMemory); | ||
| synchronized (this) { | ||
| allocatedPages.clear(pageNumber); | ||
| } | ||
| return null; | ||
| } | ||
| final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(size); | ||
| page.pageNumber = pageNumber; | ||
| pageTable[pageNumber] = page; | ||
|
|
@@ -170,6 +247,13 @@ public MemoryBlock allocatePage(long size) { | |
| * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage(long)}. | ||
| */ | ||
| public void freePage(MemoryBlock page) { | ||
| freePage(page, null); | ||
| } | ||
|
|
||
| /** | ||
| * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage(long)}. | ||
| */ | ||
| public void freePage(MemoryBlock page, MemoryConsumer consumer) { | ||
| assert (page.pageNumber != -1) : | ||
| "Called freePage() on memory that wasn't allocated with allocatePage()"; | ||
| assert(allocatedPages.get(page.pageNumber)); | ||
|
|
@@ -182,7 +266,7 @@ public void freePage(MemoryBlock page) { | |
| } | ||
| long pageSize = page.size(); | ||
| memoryManager.tungstenMemoryAllocator().free(page); | ||
| releaseExecutionMemory(pageSize); | ||
| releaseExecutionMemory(pageSize, consumer); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the idea that each operator will have its own subclass of MemoryConsumer which implements
spill()?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could make it abstract