-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-7076][SPARK-7077][SPARK-7080][SQL] Use managed memory for aggregations #5725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit
Hold shift + click to select a range
480a74a
Initial import of code from Databricks unsafe utils repo.
JoshRosen ab68e08
Begin merging the UTF8String implementations.
JoshRosen f03e9c1
Play around with Unsafe implementations of more string methods.
JoshRosen 5d55cef
Add skeleton for Row implementation.
JoshRosen 8a8f9df
Add skeleton for GeneratedAggregate integration.
JoshRosen 1ff814d
Add reminder to free memory on iterator completion
JoshRosen 53ba9b7
Start prototyping Java Row -> UnsafeRow converters
JoshRosen fc4c3a8
Sketch how the converters will be used in UnsafeGeneratedAggregate
JoshRosen 1a483c5
First version that passes some aggregation tests:
JoshRosen 079f1bf
Some clarification of the BytesToBytesMap.lookup() / set() contract.
JoshRosen f764d13
Simplify address + length calculation in Location.
JoshRosen c754ae1
Now that the store*() contract has been stregthened, we can remove an…
JoshRosen ae39694
Add finalizer as "cleanup method of last resort"
JoshRosen c7f0b56
Reuse UnsafeRow pointer in UnsafeRowConverter
JoshRosen 62ab054
Optimize for fact that get() is only called on String columns.
JoshRosen c55bf66
Free buffer once iterator has been fully consumed.
JoshRosen 738fa33
Add feature flag to guard UnsafeGeneratedAggregate
JoshRosen c1b3813
Fix bug in UnsafeMemoryAllocator.free():
JoshRosen 7df6008
Optimizations related to zeroing out memory:
JoshRosen 58ac393
Use UNSAFE allocator in GeneratedAggregate (TODO: make this configura…
JoshRosen d2bb986
Update to implement new Row methods added upstream
JoshRosen b3eaccd
Extract aggregation map into its own class.
JoshRosen bade966
Comment update (bumping to refresh GitHub cache...)
JoshRosen d85eeff
Add basic sanity test for UnsafeFixedWidthAggregationMap
JoshRosen 1f4b716
Merge Unsafe code into the regular GeneratedAggregate, guarded by a
JoshRosen 92d5a06
Address a number of minor code review comments.
JoshRosen 628f936
Use ints intead of longs for indexing.
JoshRosen 23a440a
Bump up default hash map size
JoshRosen 765243d
Enable optional performance metrics for hash map.
JoshRosen b26f1d3
Fix bug in murmur hash implementation.
JoshRosen 49aed30
More long -> int conversion.
JoshRosen 29a7575
Remove debug logging
JoshRosen ef6b3d3
Fix a bunch of FindBugs and IntelliJ inspections
JoshRosen 06e929d
More warning cleanup
JoshRosen 854201a
Import and comment cleanup
JoshRosen f3dcbfe
More mod replacement
JoshRosen afe8dca
Some Javadoc cleanup
JoshRosen a95291e
Cleanups to string handling code
JoshRosen 31eaabc
Lots of TODO and doc cleanup.
JoshRosen 6ffdaa1
Null handling improvements in UnsafeRow.
JoshRosen 9c19fc0
Add configuration options for heap vs. offheap
JoshRosen cde4132
Add missing pom.xml
JoshRosen 0925847
Disable MiMa checks for new unsafe module
JoshRosen a8e4a3f
Introduce MemoryManager interface; add to SparkEnv.
JoshRosen b45f070
Don't redundantly store the offset from key to value, since we can co…
JoshRosen 162caf7
Fix test compilation
JoshRosen 3ca84b2
Only zero the used portion of groupingKeyConversionScratchSpace
JoshRosen 529e571
Measure timeSpentResizing in nanoseconds instead of milliseconds.
JoshRosen ce3c565
More comments, formatting, and code cleanup.
JoshRosen 78a5b84
Add logging to MemoryManager
JoshRosen a19e066
Rename unsafe Java test suites to match Scala test naming convention.
JoshRosen de5e001
Fix debug vs. trace in logging message.
JoshRosen 6e4b192
Remove an unused method from ByteArrayMethods.
JoshRosen 70a39e4
Split MemoryManager into ExecutorMemoryManager and TaskMemoryManager:
JoshRosen 50e9671
Throw memory leak warning even in case of error; add warning about co…
JoshRosen 017b2dc
Remove BytesToBytesMap.finalize()
JoshRosen 1bc36cc
Refactor UnsafeRowConverter to avoid unnecessary boxing.
JoshRosen 81f34f8
Follow 'place children last' convention for GeneratedAggregate
JoshRosen eeee512
Add converters for Null, Boolean, Byte, and Short columns.
JoshRosen File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Start prototyping Java Row -> UnsafeRow converters
- Loading branch information
commit 53ba9b79e12a58f5c4ee217e434fbc20195ffc62
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
168 changes: 168 additions & 0 deletions
168
...atalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverter.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,168 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.expressions | ||
|
|
||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.unsafe.PlatformDependent | ||
| import org.apache.spark.unsafe.array.ByteArrayMethods | ||
|
|
||
| /** Write a column into an UnsafeRow */ | ||
| private abstract class UnsafeColumnWriter[T] { | ||
| /** | ||
| * Write a value into an UnsafeRow. | ||
| * | ||
| * @param value the value to write | ||
| * @param columnNumber what column to write it to | ||
| * @param row a pointer to the unsafe row | ||
| * @param baseObject | ||
| * @param baseOffset | ||
| * @param appendCursor the offset from the start of the unsafe row to the end of the row; | ||
| * used for calculating where variable-length data should be written | ||
| * @return the number of variable-length bytes written | ||
| */ | ||
| def write( | ||
| value: T, | ||
| columnNumber: Int, | ||
| row: UnsafeRow, | ||
| baseObject: Object, | ||
| baseOffset: Long, | ||
| appendCursor: Int): Int | ||
|
|
||
| /** | ||
| * Return the number of bytes that are needed to write this variable-length value. | ||
| */ | ||
| def getSize(value: T): Int | ||
| } | ||
|
|
||
| private object UnsafeColumnWriter { | ||
| def forType(dataType: DataType): UnsafeColumnWriter[_] = { | ||
| dataType match { | ||
| case IntegerType => IntUnsafeColumnWriter | ||
| case LongType => LongUnsafeColumnWriter | ||
| case StringType => StringUnsafeColumnWriter | ||
| case _ => throw new UnsupportedOperationException() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private class StringUnsafeColumnWriter private() extends UnsafeColumnWriter[UTF8String] { | ||
| def getSize(value: UTF8String): Int = { | ||
| // round to nearest word | ||
| val numBytes = value.getBytes.length | ||
| 8 + ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes) | ||
| } | ||
|
|
||
| override def write( | ||
| value: UTF8String, | ||
| columnNumber: Int, | ||
| row: UnsafeRow, | ||
| baseObject: Object, | ||
| baseOffset: Long, | ||
| appendCursor: Int): Int = { | ||
| val numBytes = value.getBytes.length | ||
| PlatformDependent.UNSAFE.putLong(baseObject, baseOffset + appendCursor, numBytes) | ||
| PlatformDependent.copyMemory( | ||
| value.getBytes, | ||
| PlatformDependent.BYTE_ARRAY_OFFSET, | ||
| baseObject, | ||
| baseOffset + appendCursor + 8, | ||
| numBytes | ||
| ) | ||
| row.setLong(columnNumber, appendCursor) | ||
| 8 + ((numBytes / 8) + (if (numBytes % 8 == 0) 0 else 1)) * 8 | ||
| } | ||
| } | ||
| private object StringUnsafeColumnWriter extends StringUnsafeColumnWriter | ||
|
|
||
| private abstract class PrimitiveUnsafeColumnWriter[T] extends UnsafeColumnWriter[T] { | ||
| def getSize(value: T): Int = 0 | ||
| } | ||
|
|
||
| private class IntUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter[Int] { | ||
| override def write( | ||
| value: Int, | ||
| columnNumber: Int, | ||
| row: UnsafeRow, | ||
| baseObject: Object, | ||
| baseOffset: Long, | ||
| appendCursor: Int): Int = { | ||
| row.setInt(columnNumber, value) | ||
| 0 | ||
| } | ||
| } | ||
| private object IntUnsafeColumnWriter extends IntUnsafeColumnWriter | ||
|
|
||
| private class LongUnsafeColumnWriter private() extends PrimitiveUnsafeColumnWriter[Long] { | ||
| override def write( | ||
| value: Long, | ||
| columnNumber: Int, | ||
| row: UnsafeRow, | ||
| baseObject: Object, | ||
| baseOffset: Long, | ||
| appendCursor: Int): Int = { | ||
| row.setLong(columnNumber, value) | ||
| 0 | ||
| } | ||
| } | ||
| private case object LongUnsafeColumnWriter extends LongUnsafeColumnWriter | ||
|
|
||
|
|
||
| class UnsafeRowConverter(fieldTypes: Array[DataType]) { | ||
|
|
||
| private[this] val writers: Array[UnsafeColumnWriter[Any]] = { | ||
| fieldTypes.map(t => UnsafeColumnWriter.forType(t).asInstanceOf[UnsafeColumnWriter[Any]]) | ||
| } | ||
|
|
||
| def getSizeRequirement(row: Row): Int = { | ||
| var fieldNumber = 0 | ||
| var variableLengthFieldSize: Int = 0 | ||
| while (fieldNumber < writers.length) { | ||
| if (!row.isNullAt(fieldNumber)) { | ||
| variableLengthFieldSize += writers(fieldNumber).getSize(row.get(fieldNumber)) | ||
|
|
||
| } | ||
| fieldNumber += 1 | ||
| } | ||
| (8 * fieldTypes.length) + UnsafeRow.calculateBitSetWidthInBytes(fieldTypes.length) + variableLengthFieldSize | ||
| } | ||
|
|
||
| def writeRow(row: Row, baseObject: Object, baseOffset: Long): Long = { | ||
| val unsafeRow = new UnsafeRow() | ||
| unsafeRow.set(baseObject, baseOffset, writers.length, null) // TODO: schema? | ||
| var fieldNumber = 0 | ||
| var appendCursor: Int = | ||
| (8 * fieldTypes.length) + UnsafeRow.calculateBitSetWidthInBytes(fieldTypes.length) | ||
| while (fieldNumber < writers.length) { | ||
| if (row.isNullAt(fieldNumber)) { | ||
| unsafeRow.setNullAt(fieldNumber) | ||
| // TODO: type-specific null value writing? | ||
| } else { | ||
| appendCursor += writers(fieldNumber).write( | ||
| row.get(fieldNumber), | ||
| fieldNumber, | ||
| unsafeRow, | ||
| baseObject, | ||
| baseOffset, | ||
| appendCursor) | ||
| } | ||
| fieldNumber += 1 | ||
| } | ||
| appendCursor | ||
| } | ||
|
|
||
| } | ||
67 changes: 67 additions & 0 deletions
67
...st/src/test/scala/org/apache/spark/sql/catalyst/expressions/UnsafeRowConverterSuite.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,67 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.catalyst.expressions | ||
|
|
||
| import org.apache.spark.sql.types.{StringType, DataType, LongType, IntegerType} | ||
| import org.apache.spark.unsafe.PlatformDependent | ||
| import org.apache.spark.unsafe.array.ByteArrayMethods | ||
| import org.scalatest.{FunSuite, Matchers} | ||
|
|
||
|
|
||
| class UnsafeRowConverterSuite extends FunSuite with Matchers { | ||
|
|
||
| test("basic conversion with only primitive types") { | ||
| val fieldTypes: Array[DataType] = Array(LongType, LongType, IntegerType) | ||
| val row = new SpecificMutableRow(fieldTypes) | ||
| row.setLong(0, 0) | ||
| row.setLong(1, 1) | ||
| row.setInt(2, 2) | ||
| val converter = new UnsafeRowConverter(fieldTypes) | ||
| val sizeRequired: Int = converter.getSizeRequirement(row) | ||
| sizeRequired should be (8 + (3 * 8)) | ||
| val buffer: Array[Long] = new Array[Long](sizeRequired / 8) | ||
| val numBytesWritten = converter.writeRow(row, buffer, PlatformDependent.LONG_ARRAY_OFFSET) | ||
| numBytesWritten should be (sizeRequired) | ||
| val unsafeRow = new UnsafeRow() | ||
| unsafeRow.set(buffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null) | ||
| unsafeRow.getLong(0) should be (0) | ||
| unsafeRow.getLong(1) should be (1) | ||
| unsafeRow.getInt(2) should be (2) | ||
| } | ||
|
|
||
| test("basic conversion with primitive and string types") { | ||
| val fieldTypes: Array[DataType] = Array(LongType, StringType, StringType) | ||
| val row = new SpecificMutableRow(fieldTypes) | ||
| row.setLong(0, 0) | ||
| row.setString(1, "Hello") | ||
| row.setString(2, "World") | ||
| val converter = new UnsafeRowConverter(fieldTypes) | ||
| val sizeRequired: Int = converter.getSizeRequirement(row) | ||
| sizeRequired should be (8 + (8 * 3) + | ||
| ByteArrayMethods.roundNumberOfBytesToNearestWord("Hello".getBytes.length + 8) + | ||
| ByteArrayMethods.roundNumberOfBytesToNearestWord("World".getBytes.length + 8)) | ||
| val buffer: Array[Long] = new Array[Long](sizeRequired / 8) | ||
| val numBytesWritten = converter.writeRow(row, buffer, PlatformDependent.LONG_ARRAY_OFFSET) | ||
| numBytesWritten should be (sizeRequired) | ||
| val unsafeRow = new UnsafeRow() | ||
| unsafeRow.set(buffer, PlatformDependent.LONG_ARRAY_OFFSET, fieldTypes.length, null) | ||
| unsafeRow.getLong(0) should be (0) | ||
| unsafeRow.getString(1) should be ("Hello") | ||
| unsafeRow.getString(2) should be ("World") | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can u define the return value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The return value is the number of bytes written, which we use to increment the position cursor. I think that I commented on this in the base
UnsafeColumnWriterclass.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More specifically, this is the number of variable-length bytes written. Primitive types, like int, don't write to the variable-length portion and thus return 0.