From ab67d33787e568245c9e2ab30e51b471f21fa2ed Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 27 Mar 2016 13:15:06 +0900 Subject: [PATCH 01/13] make code size of hasNext() smaller by preparing get*Acceessor() methods group a lot of calls into a method --- .../columnar/GenerateColumnAccessor.scala | 99 +++++++++++++++---- 1 file changed, 79 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index eaafc96e4d2e..a3a296ad4a85 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.columnar +import scala.collection.mutable + import org.apache.spark.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -68,6 +70,8 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera protected def create(columnTypes: Seq[DataType]): ColumnarIterator = { val ctx = newCodeGenContext() val numFields = columnTypes.size + val accessorClasses = new mutable.HashMap[String, String] + val accessorStructClasses = new mutable.HashMap[(String, DataType), (String, String)] val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => val accessorName = ctx.freshName("accessor") val accessorCls = dt match { @@ -88,16 +92,20 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera case array: ArrayType => classOf[ArrayColumnAccessor].getName case t: MapType => classOf[MapColumnAccessor].getName } - ctx.addMutableState(accessorCls, accessorName, s"$accessorName = null;") - - val createCode = dt match { - case t if ctx.isPrimitiveType(dt) => - s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" - case NullType | StringType | BinaryType => - s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" - case other => - s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder), - (${dt.getClass.getName}) columnTypes[$index]);""" + ctx.addMutableState(accessorCls, accessorName, "") + + val createCode = { + val shortAccCls = accessorCls.substring(accessorCls.lastIndexOf(".") + 1) + dt match { + case t if ctx.isPrimitiveType(dt) => + s"$accessorName = get${accessorClasses.getOrElseUpdate(accessorCls, shortAccCls)}($index);" + case NullType | StringType | BinaryType => + s"$accessorName = get${accessorClasses.getOrElseUpdate(accessorCls, shortAccCls)}($index);" + case other => + val shortDTCls = dt.getClass.getName.substring(dt.getClass.getName.lastIndexOf(".") + 1) + accessorStructClasses.getOrElseUpdate((accessorCls, dt), (shortAccCls, shortDTCls)) + s"$accessorName = get${shortAccCls}_${shortDTCls}($index);" + } } val extract = s"$accessorName.extractTo(mutableRow, $index);" @@ -114,6 +122,57 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera (createCode, extract + patch) }.unzip + val accessorCode = accessorClasses.map { case (accessorCls, shortAccCls) => + s""" + private $accessorCls get${shortAccCls}(int idx) { + byte[] buffer = batch.buffers()[columnIndexes[idx]]; + return new $accessorCls(ByteBuffer.wrap(buffer).order(nativeOrder)); + } + """ + } + val accessorStructCode = accessorStructClasses.map { + case ((accessorCls, dt), (shortAccCls, shortDTCls)) => + s""" + private $accessorCls get${shortAccCls}_${shortDTCls}(int idx) { + byte[] buffer = batch.buffers()[columnIndexes[idx]]; + return new $accessorCls(ByteBuffer.wrap(buffer).order(nativeOrder), + (${dt.getClass.getName}) columnTypes[idx]); + } + """ + } + + /* 4000 = 64000 bytes / 16 (up to 16 bytes per one call)) */ + val numberOfStatementsThreshold = 4000 + val (initializerAccessorFuncs, initializerAccessorCalls, extractorFuncs, extractorCalls) = + if (initializeAccessors.length < numberOfStatementsThreshold) { + ("", initializeAccessors.mkString("\n"), "", extractors.mkString("\n")) + } else { + val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold) + var groupedAccessorsLength = 0 + val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold) + var groupedExtractorsLength = 0 + ( + groupedAccessorsItr.zipWithIndex.map { case (body, i) => + groupedAccessorsLength += 1 + s""" + |private void accessors$i() { + | ${body.mkString("\n")} + |} + """.stripMargin + }.mkString(""), + (0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"), + groupedExtractorsItr.zipWithIndex.map { case (body, i) => + groupedExtractorsLength += 1 + s""" + |private void extractors$i() { + | ${body.mkString("\n")} + |} + """.stripMargin + }.mkString(""), + (0 to groupedExtractorsLength - 1).map { i => s"extractors$i();" }.mkString("\n") + ) + } + val code = s""" import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -130,7 +189,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera class SpecificColumnarIterator extends ${classOf[ColumnarIterator].getName} { private ByteOrder nativeOrder = null; - private byte[][] buffers = null; private UnsafeRow unsafeRow = new UnsafeRow(); private BufferHolder bufferHolder = new BufferHolder(); private UnsafeRowWriter rowWriter = new UnsafeRowWriter(); @@ -142,15 +200,13 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera private scala.collection.Iterator input = null; private DataType[] columnTypes = null; private int[] columnIndexes = null; + ${classOf[CachedBatch].getName} batch = null; ${declareMutableStates(ctx)} public SpecificColumnarIterator() { this.nativeOrder = ByteOrder.nativeOrder(); - this.buffers = new byte[${columnTypes.length}][]; this.mutableRow = new MutableUnsafeRow(rowWriter); - - ${initMutableStates(ctx)} } public void initialize(Iterator input, DataType[] columnTypes, int[] columnIndexes) { @@ -159,6 +215,12 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera this.columnIndexes = columnIndexes; } + ${accessorCode.mkString("\n")} + ${accessorStructCode.mkString("\n")} + + ${initializerAccessorFuncs} + ${extractorFuncs} + public boolean hasNext() { if (currentRow < numRowsInBatch) { return true; @@ -167,13 +229,10 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera return false; } - ${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next(); + batch = (${classOf[CachedBatch].getName}) input.next(); currentRow = 0; numRowsInBatch = batch.numRows(); - for (int i = 0; i < columnIndexes.length; i ++) { - buffers[i] = batch.buffers()[columnIndexes[i]]; - } - ${initializeAccessors.mkString("\n")} + ${initializerAccessorCalls} return hasNext(); } @@ -182,7 +241,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera currentRow += 1; bufferHolder.reset(); rowWriter.initialize(bufferHolder, $numFields); - ${extractors.mkString("\n")} + ${extractorCalls} unsafeRow.pointTo(bufferHolder.buffer, $numFields, bufferHolder.totalSize()); return unsafeRow; } From fea2a524bbd5b1d0d285e02e6eda590d1f7d67e3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 27 Mar 2016 13:15:38 +0900 Subject: [PATCH 02/13] add test case --- .../sql/execution/columnar/InMemoryColumnarQuerySuite.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index 25afed25c897..eddbab9b696f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -219,4 +219,10 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { assert(data.count() === 10) assert(data.filter($"s" === "3").count() === 1) } + + test("SPARK-14138: Generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames") { + val length = 10000 + val columnTypes = List.fill(length)(IntegerType) + val columnarIterator = GenerateColumnAccessor.generate(columnTypes) + } } From e56406ed5173ae0c196705abcb8f7f28f0be0387 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 27 Mar 2016 13:51:32 +0900 Subject: [PATCH 03/13] fix scalastyle errors --- .../columnar/GenerateColumnAccessor.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index a3a296ad4a85..7276fe120b97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -95,16 +95,16 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera ctx.addMutableState(accessorCls, accessorName, "") val createCode = { - val shortAccCls = accessorCls.substring(accessorCls.lastIndexOf(".") + 1) + val shortCls = accessorCls.substring(accessorCls.lastIndexOf(".") + 1) dt match { case t if ctx.isPrimitiveType(dt) => - s"$accessorName = get${accessorClasses.getOrElseUpdate(accessorCls, shortAccCls)}($index);" + s"$accessorName = get${accessorClasses.getOrElseUpdate(accessorCls, shortCls)}($index);" case NullType | StringType | BinaryType => - s"$accessorName = get${accessorClasses.getOrElseUpdate(accessorCls, shortAccCls)}($index);" + s"$accessorName = get${accessorClasses.getOrElseUpdate(accessorCls, shortCls)}($index);" case other => val shortDTCls = dt.getClass.getName.substring(dt.getClass.getName.lastIndexOf(".") + 1) - accessorStructClasses.getOrElseUpdate((accessorCls, dt), (shortAccCls, shortDTCls)) - s"$accessorName = get${shortAccCls}_${shortDTCls}($index);" + accessorStructClasses.getOrElseUpdate((accessorCls, dt), (shortCls, shortDTCls)) + s"$accessorName = get${shortCls}_${shortDTCls}($index);" } } @@ -160,17 +160,17 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera |} """.stripMargin }.mkString(""), - (0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"), - groupedExtractorsItr.zipWithIndex.map { case (body, i) => - groupedExtractorsLength += 1 - s""" + (0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"), + groupedExtractorsItr.zipWithIndex.map { case (body, i) => + groupedExtractorsLength += 1 + s""" |private void extractors$i() { | ${body.mkString("\n")} |} """.stripMargin - }.mkString(""), - (0 to groupedExtractorsLength - 1).map { i => s"extractors$i();" }.mkString("\n") - ) + }.mkString(""), + (0 to groupedExtractorsLength - 1).map { i => s"extractors$i();" }.mkString("\n") + ) } val code = s""" From 60f6719dbbaf772436b9576b401fda6f8e673100 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 27 Mar 2016 14:07:44 +0900 Subject: [PATCH 04/13] fix scalastyle error --- .../sql/execution/columnar/InMemoryColumnarQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index eddbab9b696f..ecee7c69a4aa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -220,7 +220,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { assert(data.filter($"s" === "3").count() === 1) } - test("SPARK-14138: Generated SpecificColumnarIterator code can exceed JVM size limit for cached DataFrames") { + test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") { val length = 10000 val columnTypes = List.fill(length)(IntegerType) val columnarIterator = GenerateColumnAccessor.generate(columnTypes) From 226bad5ee0c0253cad958fa812fd5b274b8c6be0 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 27 Mar 2016 20:59:34 +0900 Subject: [PATCH 05/13] use String as a key instead of DataType object --- .../execution/columnar/GenerateColumnAccessor.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 7276fe120b97..7a7ba22376a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -71,7 +71,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera val ctx = newCodeGenContext() val numFields = columnTypes.size val accessorClasses = new mutable.HashMap[String, String] - val accessorStructClasses = new mutable.HashMap[(String, DataType), (String, String)] + val accessorStructClasses = new mutable.HashMap[(String, String), (String, String)] val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => val accessorName = ctx.freshName("accessor") val accessorCls = dt match { @@ -102,8 +102,9 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera case NullType | StringType | BinaryType => s"$accessorName = get${accessorClasses.getOrElseUpdate(accessorCls, shortCls)}($index);" case other => - val shortDTCls = dt.getClass.getName.substring(dt.getClass.getName.lastIndexOf(".") + 1) - accessorStructClasses.getOrElseUpdate((accessorCls, dt), (shortCls, shortDTCls)) + val dtCls = dt.getClass.getName + val shortDTCls = dt.getClass.getName.substring(dtCls.lastIndexOf(".") + 1) + accessorStructClasses.getOrElseUpdate((accessorCls, dtCls), (shortCls, shortDTCls)) s"$accessorName = get${shortCls}_${shortDTCls}($index);" } } @@ -131,12 +132,12 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera """ } val accessorStructCode = accessorStructClasses.map { - case ((accessorCls, dt), (shortAccCls, shortDTCls)) => + case ((accessorCls, dtCls), (shortAccCls, shortDTCls)) => s""" private $accessorCls get${shortAccCls}_${shortDTCls}(int idx) { byte[] buffer = batch.buffers()[columnIndexes[idx]]; return new $accessorCls(ByteBuffer.wrap(buffer).order(nativeOrder), - (${dt.getClass.getName}) columnTypes[idx]); + (${dtCls}) columnTypes[idx]); } """ } From f3307a7843d61d3a5fe4ec5781071e57f43fa28b Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 27 Mar 2016 22:59:17 +0900 Subject: [PATCH 06/13] fix boundary condition --- .../spark/sql/execution/columnar/GenerateColumnAccessor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 7a7ba22376a7..b030890b8305 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -145,7 +145,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera /* 4000 = 64000 bytes / 16 (up to 16 bytes per one call)) */ val numberOfStatementsThreshold = 4000 val (initializerAccessorFuncs, initializerAccessorCalls, extractorFuncs, extractorCalls) = - if (initializeAccessors.length < numberOfStatementsThreshold) { + if (initializeAccessors.length <= numberOfStatementsThreshold) { ("", initializeAccessors.mkString("\n"), "", extractors.mkString("\n")) } else { val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold) From 9346793cd2e0f0f58db503382aa51bd4ee2b39e4 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 27 Mar 2016 23:00:12 +0900 Subject: [PATCH 07/13] add a case that does not generate a new method, but the number of columns is enough large --- .../columnar/InMemoryColumnarQuerySuite.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala index ecee7c69a4aa..557415b801d8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala @@ -221,8 +221,12 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext { } test("SPARK-14138: Generated SpecificColumnarIterator can exceed JVM size limit for cached DF") { - val length = 10000 - val columnTypes = List.fill(length)(IntegerType) - val columnarIterator = GenerateColumnAccessor.generate(columnTypes) + val length1 = 3999 + val columnTypes1 = List.fill(length1)(IntegerType) + val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1) + + val length2 = 10000 + val columnTypes2 = List.fill(length2)(IntegerType) + val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2) } } From beb98401d8435d300f1eb7dd475defb2a860f679 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 31 Mar 2016 01:55:26 +0900 Subject: [PATCH 08/13] reduce threadhold of # of calls in a method to allow HotSpot to compile the method --- .../sql/execution/columnar/GenerateColumnAccessor.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index b030890b8305..f7a1d99ab03e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -142,8 +142,12 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera """ } - /* 4000 = 64000 bytes / 16 (up to 16 bytes per one call)) */ - val numberOfStatementsThreshold = 4000 + /* + * 500 = 7500 bytes / 15 (up to 15 bytes per one call)) + * the maximum byte code size to be compiled for HotSpot is 8000. + * We should keep less than 8000 + */ + val numberOfStatementsThreshold = 500 val (initializerAccessorFuncs, initializerAccessorCalls, extractorFuncs, extractorCalls) = if (initializeAccessors.length <= numberOfStatementsThreshold) { ("", initializeAccessors.mkString("\n"), "", extractors.mkString("\n")) From 16cf602b506c37c9e5cd9cbb48d577b7e66f14d4 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 31 Mar 2016 14:55:32 +0900 Subject: [PATCH 09/13] adressed Davies's comments --- .../columnar/GenerateColumnAccessor.scala | 60 ++++++------------- 1 file changed, 19 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index f7a1d99ab03e..37b6a94453ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -70,8 +70,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera protected def create(columnTypes: Seq[DataType]): ColumnarIterator = { val ctx = newCodeGenContext() val numFields = columnTypes.size - val accessorClasses = new mutable.HashMap[String, String] - val accessorStructClasses = new mutable.HashMap[(String, String), (String, String)] val (initializeAccessors, extractors) = columnTypes.zipWithIndex.map { case (dt, index) => val accessorName = ctx.freshName("accessor") val accessorCls = dt match { @@ -94,21 +92,17 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera } ctx.addMutableState(accessorCls, accessorName, "") - val createCode = { - val shortCls = accessorCls.substring(accessorCls.lastIndexOf(".") + 1) - dt match { - case t if ctx.isPrimitiveType(dt) => - s"$accessorName = get${accessorClasses.getOrElseUpdate(accessorCls, shortCls)}($index);" - case NullType | StringType | BinaryType => - s"$accessorName = get${accessorClasses.getOrElseUpdate(accessorCls, shortCls)}($index);" - case other => - val dtCls = dt.getClass.getName - val shortDTCls = dt.getClass.getName.substring(dtCls.lastIndexOf(".") + 1) - accessorStructClasses.getOrElseUpdate((accessorCls, dtCls), (shortCls, shortDTCls)) - s"$accessorName = get${shortCls}_${shortDTCls}($index);" - } - } + val createCode = dt match { + case t if ctx.isPrimitiveType(dt) => + s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" + case NullType | StringType | BinaryType => + s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" + case other => + s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder), + (${dt.getClass.getName}) columnTypes[$index]);""" + } + val extract = s"$accessorName.extractTo(mutableRow, $index);" val patch = dt match { case DecimalType.Fixed(p, s) if p > Decimal.MAX_LONG_DIGITS => @@ -123,31 +117,12 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera (createCode, extract + patch) }.unzip - val accessorCode = accessorClasses.map { case (accessorCls, shortAccCls) => - s""" - private $accessorCls get${shortAccCls}(int idx) { - byte[] buffer = batch.buffers()[columnIndexes[idx]]; - return new $accessorCls(ByteBuffer.wrap(buffer).order(nativeOrder)); - } - """ - } - val accessorStructCode = accessorStructClasses.map { - case ((accessorCls, dtCls), (shortAccCls, shortDTCls)) => - s""" - private $accessorCls get${shortAccCls}_${shortDTCls}(int idx) { - byte[] buffer = batch.buffers()[columnIndexes[idx]]; - return new $accessorCls(ByteBuffer.wrap(buffer).order(nativeOrder), - (${dtCls}) columnTypes[idx]); - } - """ - } - /* - * 500 = 7500 bytes / 15 (up to 15 bytes per one call)) + * 200 = 6000 bytes / 30 (up to 25 bytes per one call)) * the maximum byte code size to be compiled for HotSpot is 8000. * We should keep less than 8000 */ - val numberOfStatementsThreshold = 500 + val numberOfStatementsThreshold = 200 val (initializerAccessorFuncs, initializerAccessorCalls, extractorFuncs, extractorCalls) = if (initializeAccessors.length <= numberOfStatementsThreshold) { ("", initializeAccessors.mkString("\n"), "", extractors.mkString("\n")) @@ -194,6 +169,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera class SpecificColumnarIterator extends ${classOf[ColumnarIterator].getName} { private ByteOrder nativeOrder = null; + private byte[][] buffers = null; private UnsafeRow unsafeRow = new UnsafeRow(); private BufferHolder bufferHolder = new BufferHolder(); private UnsafeRowWriter rowWriter = new UnsafeRowWriter(); @@ -205,12 +181,12 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera private scala.collection.Iterator input = null; private DataType[] columnTypes = null; private int[] columnIndexes = null; - ${classOf[CachedBatch].getName} batch = null; ${declareMutableStates(ctx)} public SpecificColumnarIterator() { this.nativeOrder = ByteOrder.nativeOrder(); + this.buffers = new byte[${columnTypes.length}][]; this.mutableRow = new MutableUnsafeRow(rowWriter); } @@ -220,8 +196,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera this.columnIndexes = columnIndexes; } - ${accessorCode.mkString("\n")} - ${accessorStructCode.mkString("\n")} + ${declareAddedFunctions(ctx)} ${initializerAccessorFuncs} ${extractorFuncs} @@ -234,9 +209,12 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera return false; } - batch = (${classOf[CachedBatch].getName}) input.next(); + ${classOf[CachedBatch].getName} batch = (${classOf[CachedBatch].getName}) input.next(); currentRow = 0; numRowsInBatch = batch.numRows(); + for (int i = 0; i < columnIndexes.length; i ++) { + buffers[i] = batch.buffers()[columnIndexes[i]]; + } ${initializerAccessorCalls} return hasNext(); From a310bfc960f34f0ac6a61937a5519ac05ca52f94 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 31 Mar 2016 15:57:15 +0900 Subject: [PATCH 10/13] fix scala style errors --- .../execution/columnar/GenerateColumnAccessor.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 37b6a94453ef..90b5274ce0ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -95,14 +95,14 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera val createCode = dt match { case t if ctx.isPrimitiveType(dt) => - s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" - case NullType | StringType | BinaryType => - s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" - case other => - s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder), + s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" + case NullType | StringType | BinaryType => + s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" + case other => + s"""$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder), (${dt.getClass.getName}) columnTypes[$index]);""" } - + val extract = s"$accessorName.extractTo(mutableRow, $index);" val patch = dt match { case DecimalType.Fixed(p, s) if p > Decimal.MAX_LONG_DIGITS => From c1acf825cc0dfdefc1074ad4ce3309743a3240b5 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 31 Mar 2016 16:17:19 +0900 Subject: [PATCH 11/13] nit: fix number in a comment --- .../spark/sql/execution/columnar/GenerateColumnAccessor.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 90b5274ce0ac..4c4c9324bd26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -118,7 +118,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera }.unzip /* - * 200 = 6000 bytes / 30 (up to 25 bytes per one call)) + * 200 = 6000 bytes / 30 (up to 30 bytes per one call)) * the maximum byte code size to be compiled for HotSpot is 8000. * We should keep less than 8000 */ From 60cebd5ad0913699d0a683912e93d5334ce211ab Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 1 Apr 2016 04:11:11 +0900 Subject: [PATCH 12/13] addessed Davies's comments --- .../columnar/GenerateColumnAccessor.scala | 55 +++++++++---------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index 4c4c9324bd26..b9a15c25906c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -92,7 +92,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera } ctx.addMutableState(accessorCls, accessorName, "") - val createCode = dt match { case t if ctx.isPrimitiveType(dt) => s"$accessorName = new $accessorCls(ByteBuffer.wrap(buffers[$index]).order(nativeOrder));" @@ -123,34 +122,37 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera * We should keep less than 8000 */ val numberOfStatementsThreshold = 200 - val (initializerAccessorFuncs, initializerAccessorCalls, extractorFuncs, extractorCalls) = + val (initializerAccessorCalls, extractorCalls) = if (initializeAccessors.length <= numberOfStatementsThreshold) { - ("", initializeAccessors.mkString("\n"), "", extractors.mkString("\n")) + (initializeAccessors.mkString("\n"), extractors.mkString("\n")) } else { val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold) - var groupedAccessorsLength = 0 val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold) - var groupedExtractorsLength = 0 - ( - groupedAccessorsItr.zipWithIndex.map { case (body, i) => - groupedAccessorsLength += 1 - s""" - |private void accessors$i() { - | ${body.mkString("\n")} - |} - """.stripMargin - }.mkString(""), - (0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"), - groupedExtractorsItr.zipWithIndex.map { case (body, i) => - groupedExtractorsLength += 1 - s""" - |private void extractors$i() { - | ${body.mkString("\n")} - |} - """.stripMargin - }.mkString(""), - (0 to groupedExtractorsLength - 1).map { i => s"extractors$i();" }.mkString("\n") - ) + var groupedAccessorsLength = 0 + groupedAccessorsItr.zipWithIndex.map { case (body, i) => + groupedAccessorsLength += 1 + val funcName = s"accessors$i" + val funcCode = s""" + |private void $funcName() { + | ${body.mkString("\n")} + |} + """.stripMargin + ctx.addNewFunction(funcName, funcCode) + } + groupedExtractorsItr.zipWithIndex.map { case (body, i) => + val funcName = s"extractors$i" + val funcCode = s""" + |private void $funcName() { + | ${body.mkString("\n")} + |} + """.stripMargin + ctx.addNewFunction(funcName, funcCode) + } + ((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"), + (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n")) + //(0 to groupedAccessorsLength - 1).map { i => + // (s"accessors$i();", s"extractors$i();") + //}.unzip } val code = s""" @@ -198,9 +200,6 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera ${declareAddedFunctions(ctx)} - ${initializerAccessorFuncs} - ${extractorFuncs} - public boolean hasNext() { if (currentRow < numRowsInBatch) { return true; From 3a05ddf3919b267e5ba593d82949b9b409060f5a Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 1 Apr 2016 04:43:31 +0900 Subject: [PATCH 13/13] fix scala style errors --- .../sql/execution/columnar/GenerateColumnAccessor.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala index b9a15c25906c..4d01b78c3c10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala @@ -137,7 +137,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera | ${body.mkString("\n")} |} """.stripMargin - ctx.addNewFunction(funcName, funcCode) + ctx.addNewFunction(funcName, funcCode) } groupedExtractorsItr.zipWithIndex.map { case (body, i) => val funcName = s"extractors$i" @@ -146,13 +146,10 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera | ${body.mkString("\n")} |} """.stripMargin - ctx.addNewFunction(funcName, funcCode) + ctx.addNewFunction(funcName, funcCode) } ((0 to groupedAccessorsLength - 1).map { i => s"accessors$i();" }.mkString("\n"), (0 to groupedAccessorsLength - 1).map { i => s"extractors$i();" }.mkString("\n")) - //(0 to groupedAccessorsLength - 1).map { i => - // (s"accessors$i();", s"extractors$i();") - //}.unzip } val code = s"""