Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,17 @@ object SQLConf {
.intConf
.createWithDefault(20)

val FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT =
buildConf("spark.sql.codegen.aggregate.fastHashMap.capacityBit")
.internal()
.doc("Capacity for the max number of rows to be held in memory " +
"by the fast hash aggregate product operator. The bit is not for actual value, " +
"but the actual numBuckets is determined by loadFactor " +
"(e.g: default bit value 16 , the actual numBuckets is ((1 << 16) / 0.5).")
.intConf
.checkValue(bit => bit >= 10 && bit <= 30, "The bit value must be in [10, 30].")
.createWithDefault(16)

val AVRO_COMPRESSION_CODEC = buildConf("spark.sql.avro.compression.codec")
.doc("Compression codec used in writing of AVRO files. Supported codecs: " +
"uncompressed, deflate, snappy, bzip2 and xz. Default codec is snappy.")
Expand Down Expand Up @@ -1655,6 +1666,8 @@ class SQLConf extends Serializable with Logging {

def topKSortFallbackThreshold: Int = getConf(TOP_K_SORT_FALLBACK_THRESHOLD)

def fastHashAggregateRowMaxCapacityBit: Int = getConf(FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT)

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ case class HashAggregateExec(
case _ =>
}
}
val bitMaxCapacity = sqlContext.conf.fastHashAggregateRowMaxCapacityBit

val thisPlan = ctx.addReferenceObj("plan", this)

Expand All @@ -588,7 +589,7 @@ case class HashAggregateExec(
val fastHashMapClassName = ctx.freshName("FastHashMap")
if (isVectorizedHashMapEnabled) {
val generatedMap = new VectorizedHashMapGenerator(ctx, aggregateExpressions,
fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
fastHashMapClassName, groupingKeySchema, bufferSchema, bitMaxCapacity).generate()
ctx.addInnerClass(generatedMap)

// Inline mutable state since not many aggregation operations in a task
Expand All @@ -598,7 +599,7 @@ case class HashAggregateExec(
forceInline = true)
} else {
val generatedMap = new RowBasedHashMapGenerator(ctx, aggregateExpressions,
fastHashMapClassName, groupingKeySchema, bufferSchema).generate()
fastHashMapClassName, groupingKeySchema, bufferSchema, bitMaxCapacity).generate()
ctx.addInnerClass(generatedMap)

// Inline mutable state since not many aggregation operations in a task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ class RowBasedHashMapGenerator(
aggregateExpressions: Seq[AggregateExpression],
generatedClassName: String,
groupingKeySchema: StructType,
bufferSchema: StructType)
bufferSchema: StructType,
bitMaxCapacity: Int)
extends HashMapGenerator (ctx, aggregateExpressions, generatedClassName,
groupingKeySchema, bufferSchema) {

Expand Down Expand Up @@ -73,7 +74,7 @@ class RowBasedHashMapGenerator(
s"""
| private org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
| private int[] buckets;
| private int capacity = 1 << 16;
| private int capacity = 1 << $bitMaxCapacity;
| private double loadFactor = 0.5;
| private int numBuckets = (int) (capacity / loadFactor);
| private int maxSteps = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class VectorizedHashMapGenerator(
aggregateExpressions: Seq[AggregateExpression],
generatedClassName: String,
groupingKeySchema: StructType,
bufferSchema: StructType)
bufferSchema: StructType,
bitMaxCapacity: Int)
extends HashMapGenerator (ctx, aggregateExpressions, generatedClassName,
groupingKeySchema, bufferSchema) {

Expand Down Expand Up @@ -83,7 +84,7 @@ class VectorizedHashMapGenerator(
| private ${classOf[ColumnarBatch].getName} batch;
| private ${classOf[MutableColumnarRow].getName} aggBufferRow;
| private int[] buckets;
| private int capacity = 1 << 16;
| private int capacity = 1 << $bitMaxCapacity;
| private double loadFactor = 0.5;
| private int numBuckets = (int) (capacity / loadFactor);
| private int maxSteps = 2;
Expand Down