Commit 9873d57
[SPARK-17490][SQL] Optimize SerializeFromObject() for a primitive array
Waiting for merging #13680
This PR optimizes `SerializeFromObject()` for an primitive array. This is derived from #13758 to address one of problems by using a simple way in #13758.
The current implementation always generates `GenericArrayData` from `SerializeFromObject()` for any type of an array in a logical plan. This involves a boxing at a constructor of `GenericArrayData` when `SerializedFromObject()` has an primitive array.
This PR enables to generate `UnsafeArrayData` from `SerializeFromObject()` for a primitive array. It can avoid boxing to create an instance of `ArrayData` in the generated code by Catalyst.
This PR also generate `UnsafeArrayData` in a case for `RowEncoder.serializeFor` or `CatalystTypeConverters.createToCatalystConverter`.
Performance improvement of `SerializeFromObject()` is up to 2.0x
```
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64
Intel Xeon E3-12xx v2 (Ivy Bridge)
Without this PR
Write an array in Dataset: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Int 556 / 608 15.1 66.3 1.0X
Double 1668 / 1746 5.0 198.8 0.3X
with this PR
Write an array in Dataset: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
Int 352 / 401 23.8 42.0 1.0X
Double 821 / 885 10.2 97.9 0.4X
```
Here is an example program that will happen in mllib as described in [SPARK-16070](https://issues.apache.org/jira/browse/SPARK-16070).
```
sparkContext.parallelize(Seq(Array(1, 2)), 1).toDS.map(e => e).show
```
Generated code before applying this PR
``` java
/* 039 */ protected void processNext() throws java.io.IOException {
/* 040 */ while (inputadapter_input.hasNext()) {
/* 041 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 042 */ int[] inputadapter_value = (int[])inputadapter_row.get(0, null);
/* 043 */
/* 044 */ Object mapelements_obj = ((Expression) references[0]).eval(null);
/* 045 */ scala.Function1 mapelements_value1 = (scala.Function1) mapelements_obj;
/* 046 */
/* 047 */ boolean mapelements_isNull = false || false;
/* 048 */ int[] mapelements_value = null;
/* 049 */ if (!mapelements_isNull) {
/* 050 */ Object mapelements_funcResult = null;
/* 051 */ mapelements_funcResult = mapelements_value1.apply(inputadapter_value);
/* 052 */ if (mapelements_funcResult == null) {
/* 053 */ mapelements_isNull = true;
/* 054 */ } else {
/* 055 */ mapelements_value = (int[]) mapelements_funcResult;
/* 056 */ }
/* 057 */
/* 058 */ }
/* 059 */ mapelements_isNull = mapelements_value == null;
/* 060 */
/* 061 */ serializefromobject_argIsNulls[0] = mapelements_isNull;
/* 062 */ serializefromobject_argValue = mapelements_value;
/* 063 */
/* 064 */ boolean serializefromobject_isNull = false;
/* 065 */ for (int idx = 0; idx < 1; idx++) {
/* 066 */ if (serializefromobject_argIsNulls[idx]) { serializefromobject_isNull = true; break; }
/* 067 */ }
/* 068 */
/* 069 */ final ArrayData serializefromobject_value = serializefromobject_isNull ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue);
/* 070 */ serializefromobject_holder.reset();
/* 071 */
/* 072 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 073 */
/* 074 */ if (serializefromobject_isNull) {
/* 075 */ serializefromobject_rowWriter.setNullAt(0);
/* 076 */ } else {
/* 077 */ // Remember the current cursor so that we can calculate how many bytes are
/* 078 */ // written later.
/* 079 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 080 */
/* 081 */ if (serializefromobject_value instanceof UnsafeArrayData) {
/* 082 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 083 */ // grow the global buffer before writing data.
/* 084 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 085 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 086 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 087 */
/* 088 */ } else {
/* 089 */ final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 090 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
/* 091 */
/* 092 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 093 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 094 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
/* 095 */ } else {
/* 096 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index);
/* 097 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 098 */ }
/* 099 */ }
/* 100 */ }
/* 101 */
/* 102 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 103 */ }
/* 104 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 105 */ append(serializefromobject_result);
/* 106 */ if (shouldStop()) return;
/* 107 */ }
/* 108 */ }
/* 109 */ }
```
Generated code after applying this PR
``` java
/* 035 */ protected void processNext() throws java.io.IOException {
/* 036 */ while (inputadapter_input.hasNext()) {
/* 037 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 038 */ int[] inputadapter_value = (int[])inputadapter_row.get(0, null);
/* 039 */
/* 040 */ Object mapelements_obj = ((Expression) references[0]).eval(null);
/* 041 */ scala.Function1 mapelements_value1 = (scala.Function1) mapelements_obj;
/* 042 */
/* 043 */ boolean mapelements_isNull = false || false;
/* 044 */ int[] mapelements_value = null;
/* 045 */ if (!mapelements_isNull) {
/* 046 */ Object mapelements_funcResult = null;
/* 047 */ mapelements_funcResult = mapelements_value1.apply(inputadapter_value);
/* 048 */ if (mapelements_funcResult == null) {
/* 049 */ mapelements_isNull = true;
/* 050 */ } else {
/* 051 */ mapelements_value = (int[]) mapelements_funcResult;
/* 052 */ }
/* 053 */
/* 054 */ }
/* 055 */ mapelements_isNull = mapelements_value == null;
/* 056 */
/* 057 */ boolean serializefromobject_isNull = mapelements_isNull;
/* 058 */ final ArrayData serializefromobject_value = serializefromobject_isNull ? null : org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(mapelements_value);
/* 059 */ serializefromobject_isNull = serializefromobject_value == null;
/* 060 */ serializefromobject_holder.reset();
/* 061 */
/* 062 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 063 */
/* 064 */ if (serializefromobject_isNull) {
/* 065 */ serializefromobject_rowWriter.setNullAt(0);
/* 066 */ } else {
/* 067 */ // Remember the current cursor so that we can calculate how many bytes are
/* 068 */ // written later.
/* 069 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 070 */
/* 071 */ if (serializefromobject_value instanceof UnsafeArrayData) {
/* 072 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes();
/* 073 */ // grow the global buffer before writing data.
/* 074 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 075 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 076 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 077 */
/* 078 */ } else {
/* 079 */ final int serializefromobject_numElements = serializefromobject_value.numElements();
/* 080 */ serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
/* 081 */
/* 082 */ for (int serializefromobject_index = 0; serializefromobject_index < serializefromobject_numElements; serializefromobject_index++) {
/* 083 */ if (serializefromobject_value.isNullAt(serializefromobject_index)) {
/* 084 */ serializefromobject_arrayWriter.setNullInt(serializefromobject_index);
/* 085 */ } else {
/* 086 */ final int serializefromobject_element = serializefromobject_value.getInt(serializefromobject_index);
/* 087 */ serializefromobject_arrayWriter.write(serializefromobject_index, serializefromobject_element);
/* 088 */ }
/* 089 */ }
/* 090 */ }
/* 091 */
/* 092 */ serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 093 */ }
/* 094 */ serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 095 */ append(serializefromobject_result);
/* 096 */ if (shouldStop()) return;
/* 097 */ }
/* 098 */ }
/* 099 */ }
```
Added a test in `DatasetSuite`, `RowEncoderSuite`, and `CatalystTypeConvertersSuite`
Author: Kazuaki Ishizaki <[email protected]>
Closes #15044 from kiszk/SPARK-17490.
(cherry picked from commit 19cf208)
Signed-off-by: Herman van Hovell <[email protected]>1 parent d1eac3e commit 9873d57
File tree
7 files changed
+203
-14
lines changed- sql
- catalyst/src
- main/scala/org/apache/spark/sql/catalyst
- encoders
- util
- test/scala/org/apache/spark/sql/catalyst
- encoders
- core/src/test/scala/org/apache/spark/sql
- execution/benchmark
7 files changed
+203
-14
lines changedLines changed: 16 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
441 | 441 | | |
442 | 442 | | |
443 | 443 | | |
| 444 | + | |
| 445 | + | |
| 446 | + | |
| 447 | + | |
| 448 | + | |
| 449 | + | |
| 450 | + | |
| 451 | + | |
| 452 | + | |
| 453 | + | |
| 454 | + | |
| 455 | + | |
| 456 | + | |
| 457 | + | |
| 458 | + | |
| 459 | + | |
444 | 460 | | |
445 | 461 | | |
446 | 462 | | |
| |||
Lines changed: 14 additions & 13 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
23 | 23 | | |
24 | 24 | | |
25 | 25 | | |
26 | | - | |
| 26 | + | |
27 | 27 | | |
28 | 28 | | |
29 | 29 | | |
| |||
119 | 119 | | |
120 | 120 | | |
121 | 121 | | |
122 | | - | |
123 | | - | |
124 | | - | |
125 | | - | |
126 | | - | |
127 | | - | |
128 | | - | |
129 | | - | |
130 | | - | |
131 | | - | |
132 | | - | |
133 | | - | |
| 122 | + | |
| 123 | + | |
| 124 | + | |
| 125 | + | |
| 126 | + | |
| 127 | + | |
| 128 | + | |
| 129 | + | |
| 130 | + | |
| 131 | + | |
| 132 | + | |
| 133 | + | |
| 134 | + | |
134 | 135 | | |
135 | 136 | | |
136 | 137 | | |
| |||
Lines changed: 14 additions & 1 deletion
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
19 | 19 | | |
20 | 20 | | |
21 | 21 | | |
22 | | - | |
| 22 | + | |
23 | 23 | | |
24 | 24 | | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
25 | 38 | | |
26 | 39 | | |
27 | 40 | | |
| |||
Lines changed: 33 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
19 | 19 | | |
20 | 20 | | |
21 | 21 | | |
| 22 | + | |
| 23 | + | |
22 | 24 | | |
23 | 25 | | |
24 | 26 | | |
| |||
61 | 63 | | |
62 | 64 | | |
63 | 65 | | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
| 83 | + | |
| 84 | + | |
| 85 | + | |
| 86 | + | |
| 87 | + | |
| 88 | + | |
| 89 | + | |
| 90 | + | |
| 91 | + | |
| 92 | + | |
| 93 | + | |
| 94 | + | |
| 95 | + | |
| 96 | + | |
64 | 97 | | |
Lines changed: 26 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
191 | 191 | | |
192 | 192 | | |
193 | 193 | | |
| 194 | + | |
| 195 | + | |
| 196 | + | |
| 197 | + | |
| 198 | + | |
| 199 | + | |
| 200 | + | |
| 201 | + | |
| 202 | + | |
| 203 | + | |
| 204 | + | |
| 205 | + | |
| 206 | + | |
| 207 | + | |
| 208 | + | |
| 209 | + | |
| 210 | + | |
| 211 | + | |
| 212 | + | |
| 213 | + | |
| 214 | + | |
| 215 | + | |
| 216 | + | |
| 217 | + | |
| 218 | + | |
| 219 | + | |
194 | 220 | | |
195 | 221 | | |
196 | 222 | | |
| |||
Lines changed: 18 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
1033 | 1033 | | |
1034 | 1034 | | |
1035 | 1035 | | |
| 1036 | + | |
| 1037 | + | |
| 1038 | + | |
| 1039 | + | |
| 1040 | + | |
| 1041 | + | |
| 1042 | + | |
| 1043 | + | |
| 1044 | + | |
| 1045 | + | |
| 1046 | + | |
| 1047 | + | |
| 1048 | + | |
| 1049 | + | |
| 1050 | + | |
| 1051 | + | |
| 1052 | + | |
| 1053 | + | |
1036 | 1054 | | |
1037 | 1055 | | |
1038 | 1056 | | |
| |||
Lines changed: 82 additions & 0 deletions
| Original file line number | Diff line number | Diff line change | |
|---|---|---|---|
| |||
| 1 | + | |
| 2 | + | |
| 3 | + | |
| 4 | + | |
| 5 | + | |
| 6 | + | |
| 7 | + | |
| 8 | + | |
| 9 | + | |
| 10 | + | |
| 11 | + | |
| 12 | + | |
| 13 | + | |
| 14 | + | |
| 15 | + | |
| 16 | + | |
| 17 | + | |
| 18 | + | |
| 19 | + | |
| 20 | + | |
| 21 | + | |
| 22 | + | |
| 23 | + | |
| 24 | + | |
| 25 | + | |
| 26 | + | |
| 27 | + | |
| 28 | + | |
| 29 | + | |
| 30 | + | |
| 31 | + | |
| 32 | + | |
| 33 | + | |
| 34 | + | |
| 35 | + | |
| 36 | + | |
| 37 | + | |
| 38 | + | |
| 39 | + | |
| 40 | + | |
| 41 | + | |
| 42 | + | |
| 43 | + | |
| 44 | + | |
| 45 | + | |
| 46 | + | |
| 47 | + | |
| 48 | + | |
| 49 | + | |
| 50 | + | |
| 51 | + | |
| 52 | + | |
| 53 | + | |
| 54 | + | |
| 55 | + | |
| 56 | + | |
| 57 | + | |
| 58 | + | |
| 59 | + | |
| 60 | + | |
| 61 | + | |
| 62 | + | |
| 63 | + | |
| 64 | + | |
| 65 | + | |
| 66 | + | |
| 67 | + | |
| 68 | + | |
| 69 | + | |
| 70 | + | |
| 71 | + | |
| 72 | + | |
| 73 | + | |
| 74 | + | |
| 75 | + | |
| 76 | + | |
| 77 | + | |
| 78 | + | |
| 79 | + | |
| 80 | + | |
| 81 | + | |
| 82 | + | |
0 commit comments