Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
1d07034
Implementation of GenericArrayData specialized for primitive type array
kiszk Jun 18, 2016
7b48a30
fix scala style error
kiszk Jun 18, 2016
b5876a6
Introduce GenericRefArrayData
kiszk Jun 21, 2016
ac5b73b
replace 'new GenericArrayData' with 'GenericArrayData.allocate'
kiszk Jun 21, 2016
dab5a8c
Generate GenericArrayData.allocate in NewInstance()
kiszk Jun 22, 2016
a8fe2d8
intial version of Benchmark without performance
kiszk Jun 22, 2016
870dc84
move project of Benchmark program
kiszk Jun 22, 2016
7cdd558
update benchmark program
kiszk Jun 22, 2016
d28e256
addressed review comments
kiszk Jun 22, 2016
e95e137
fix test failures
kiszk Jun 22, 2016
2b27bb3
Enabled all of benchmark suites with performance data
kiszk Jun 22, 2016
878262b
fix test failures
kiszk Jun 22, 2016
6efddf0
update test suite to resolve test failures
kiszk Jun 23, 2016
4d2b6bc
fix compilation error
kiszk Jun 23, 2016
f55aeea
addressed comments
kiszk Jun 25, 2016
78aaf13
fix test failures
kiszk Jun 25, 2016
8b908e2
fix descriptions
kiszk Jun 25, 2016
e9ec382
Better usage of GenericArrayData
kiszk Jun 25, 2016
ae9591e
revert part of changes
kiszk Jun 26, 2016
79f4c95
undo revert
kiszk Jun 26, 2016
f93fbc2
revert changes at 0800fdc5
kiszk Jun 26, 2016
a452bda
add null check after asInstanceOf
kiszk Jun 26, 2016
685d3e7
generate GenericArrayData.allocate in NewInstance
kiszk Jun 26, 2016
29c8519
fix test failure
kiszk Jun 27, 2016
0e43c4b
fix test failure
kiszk Jun 27, 2016
d15a46c
update
kiszk Jul 11, 2016
3e84fdb
replace new GenericArrayData with GenericArrayData.allocate
kiszk Jul 11, 2016
604293e
rebase
kiszk Jul 25, 2016
1dcd582
rebase
kiszk Sep 10, 2016
e60bf40
reimplement without factory method
kiszk Oct 1, 2016
e541f35
update benchmark programs
kiszk Oct 1, 2016
22d310b
fix scala style error
kiszk Oct 1, 2016
8684ad6
fix test failure in OrcQuerySuite
kiszk Oct 1, 2016
aa3ada8
fix test failure (DatasetPrimitiveSuites)
kiszk Oct 1, 2016
a81ee14
update benchmark results
kiszk Oct 1, 2016
681ae03
Implementation of GenericArrayData specialized for primitive type array
kiszk Jun 18, 2016
556e76e
fix scala style error
kiszk Jun 18, 2016
ff45703
Introduce GenericRefArrayData
kiszk Jun 21, 2016
23f9f65
replace 'new GenericArrayData' with 'GenericArrayData.allocate'
kiszk Jun 21, 2016
839bcb8
Generate GenericArrayData.allocate in NewInstance()
kiszk Jun 22, 2016
97068d4
intial version of Benchmark without performance
kiszk Jun 22, 2016
cf2b216
move project of Benchmark program
kiszk Jun 22, 2016
8df1c34
update benchmark program
kiszk Jun 22, 2016
040d9aa
addressed review comments
kiszk Jun 22, 2016
cabc27a
fix test failures
kiszk Jun 22, 2016
80abdbb
Enabled all of benchmark suites with performance data
kiszk Jun 22, 2016
9311664
fix test failures
kiszk Jun 22, 2016
25105f3
update test suite to resolve test failures
kiszk Jun 23, 2016
f328662
fix compilation error
kiszk Jun 23, 2016
16de76f
addressed comments
kiszk Jun 25, 2016
12d138e
fix test failures
kiszk Jun 25, 2016
19c453b
fix descriptions
kiszk Jun 25, 2016
bf34ec4
Better usage of GenericArrayData
kiszk Jun 25, 2016
9cd7776
revert part of changes
kiszk Jun 26, 2016
d0b3f60
undo revert
kiszk Jun 26, 2016
8b19f75
revert changes at 0800fdc5
kiszk Jun 26, 2016
7b0e769
add null check after asInstanceOf
kiszk Jun 26, 2016
1a0486f
generate GenericArrayData.allocate in NewInstance
kiszk Jun 26, 2016
963876f
fix test failure
kiszk Jun 27, 2016
d50ee04
fix test failure
kiszk Jun 27, 2016
d581439
update
kiszk Jul 11, 2016
8063e63
replace new GenericArrayData with GenericArrayData.allocate
kiszk Jul 11, 2016
090869d
update benchmark program
kiszk Jul 19, 2016
1837792
rebase
kiszk Jul 25, 2016
06b07da
rebase
kiszk Sep 10, 2016
06c22ac
reimplement without factory method
kiszk Oct 1, 2016
4c6f41e
update benchmark programs
kiszk Oct 1, 2016
9031224
fix scala style error
kiszk Oct 1, 2016
ecbc32e
fix test failure in OrcQuerySuite
kiszk Oct 1, 2016
503dbde
update benchmark results
kiszk Oct 1, 2016
c7ed68f
fix compilation error
kiszk Nov 8, 2016
c82fbf3
add another use cases from #13909
kiszk Nov 8, 2016
6bf54ec
update benchmark results
kiszk Nov 8, 2016
8bcba32
Revert "update benchmark results"
kiszk Nov 9, 2016
7697e5f
Revert "add another use cases from #13909"
kiszk Nov 9, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
replace 'new GenericArrayData' with 'GenericArrayData.allocate'
  • Loading branch information
kiszk committed Nov 8, 2016
commit ac5b73b81375b63748fca7bed286d8a715c6e165
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,17 @@ object CatalystTypeConverters {
override def toCatalystImpl(scalaValue: Any): ArrayData = {
scalaValue match {
case a: Array[_] =>
new GenericArrayData(a.map(elementConverter.toCatalyst))
GenericArrayData.allocate(a.map(elementConverter.toCatalyst))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These allocates will create a GenericRefArrayData object?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I feel so for now since type of toCatalyst seems to be Any. Is it better to prepare specialized code to make a type concrete here?

case s: Seq[_] =>
new GenericArrayData(s.map(elementConverter.toCatalyst).toArray)
GenericArrayData.allocate(s.map(elementConverter.toCatalyst).toArray)
case i: JavaIterable[_] =>
val iter = i.iterator
val convertedIterable = scala.collection.mutable.ArrayBuffer.empty[Any]
while (iter.hasNext) {
val item = iter.next()
convertedIterable += elementConverter.toCatalyst(item)
}
new GenericArrayData(convertedIterable.toArray)
GenericArrayData.allocate(convertedIterable.toArray)
}
}

Expand Down Expand Up @@ -410,7 +410,7 @@ object CatalystTypeConverters {
case t: Timestamp => TimestampConverter.toCatalyst(t)
case d: BigDecimal => new DecimalConverter(DecimalType(d.precision, d.scale)).toCatalyst(d)
case d: JavaBigDecimal => new DecimalConverter(DecimalType(d.precision, d.scale)).toCatalyst(d)
case seq: Seq[Any] => new GenericArrayData(seq.map(convertToCatalyst).toArray)
case seq: Seq[Any] => GenericArrayData.allocate(seq.map(convertToCatalyst).toArray)
case r: Row => InternalRow(r.toSeq.map(convertToCatalyst): _*)
case arr: Array[Any] => new GenericArrayData(arr.map(convertToCatalyst))
case map: Map[_, _] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedAttribute, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericRefArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

Expand Down Expand Up @@ -459,7 +459,7 @@ object ScalaReflection extends ScalaReflection {

case dt =>
NewInstance(
classOf[GenericArrayData],
classOf[GenericRefArrayData],
input :: Nil,
dataType = ArrayType(dt, schemaFor(elementType).nullable))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
values(i) = elementCast(e)
}
})
new GenericArrayData(values)
GenericArrayData.allocate(values)
})
}

Expand Down Expand Up @@ -864,7 +864,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
}
}
}
$evPrim = new $arrayClass($values);
$evPrim = $arrayClass.allocate($values);
"""
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ case class PivotFirst(
for (i <- 0 until indexSize) {
result(i) = input.get(mutableAggBufferOffset + i, valueDataType)
}
new GenericArrayData(result)
GenericArrayData.allocate(result)
}

override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ abstract class Collect extends ImperativeAggregate {
}

override def eval(input: InternalRow): Any = {
new GenericArrayData(buffer.toArray)
GenericArrayData.allocate(buffer.toArray)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
$values[$index] = ${elementConverter.value};
}
}
final ArrayData $output = new $arrayClass($values);
final ArrayData $output = $arrayClass.allocate($values);
"""

ExprCode(code, "false", output)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ case class SortArray(base: Expression, ascendingOrder: Expression)
if (elementType != NullType) {
java.util.Arrays.sort(data, if (ascending.asInstanceOf[Boolean]) lt else gt)
}
new GenericArrayData(data.asInstanceOf[Array[Any]])
GenericArrayData.allocate(data.asInstanceOf[Array[Any]])
}

override def prettyName: String = "sort_array"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class CreateArray(children: Seq[Expression]) extends Expression {
override def nullable: Boolean = false

override def eval(input: InternalRow): Any = {
new GenericArrayData(children.map(_.eval(input)).toArray)
GenericArrayData.allocate(children.map(_.eval(input)).toArray)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
Expand All @@ -76,7 +76,7 @@ case class CreateArray(children: Seq[Expression]) extends Expression {
"""
}) +
s"""
final ArrayData ${ev.value} = new $arrayClass($values);
final ArrayData ${ev.value} = $arrayClass.allocate($values);
this.$values = null;
""")
}
Expand Down Expand Up @@ -130,7 +130,8 @@ case class CreateMap(children: Seq[Expression]) extends Expression {
throw new RuntimeException("Cannot use null as map key!")
}
val valueArray = values.map(_.eval(input)).toArray
new ArrayBasedMapData(new GenericArrayData(keyArray), new GenericArrayData(valueArray))
new ArrayBasedMapData(
GenericArrayData.allocate(keyArray), GenericArrayData.allocate(valueArray))
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
Expand All @@ -141,8 +142,8 @@ case class CreateMap(children: Seq[Expression]) extends Expression {
ctx.addMutableState("Object[]", keyArray, s"this.$keyArray = null;")
ctx.addMutableState("Object[]", valueArray, s"this.$valueArray = null;")

val keyData = s"new $arrayClass($keyArray)"
val valueData = s"new $arrayClass($valueArray)"
val keyData = s"$arrayClass.allocate($keyArray)"
val valueData = s"$arrayClass.allocate($valueArray)"
ev.copy(code = s"""
final boolean ${ev.isNull} = false;
$keyArray = new Object[${keys.size}];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ case class GetArrayStructFields(
}
i += 1
}
new GenericArrayData(result)
GenericArrayData.allocate(result)
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
Expand All @@ -201,7 +201,7 @@ case class GetArrayStructFields(
}
}
}
${ev.value} = new $arrayClass($values);
${ev.value} = $arrayClass.allocate($values);
"""
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ case class MapObjects private(
$loopIndex += 1;
}

${ev.value} = new ${classOf[GenericArrayData].getName}($convertedArray);
${ev.value} = ${classOf[GenericArrayData].getName}.allocate($convertedArray);
}
"""
ev.copy(code = code, isNull = genInputData.isNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ case class StringSplit(str: Expression, pattern: Expression)

override def nullSafeEval(string: Any, regex: Any): Any = {
val strings = string.asInstanceOf[UTF8String].split(regex.asInstanceOf[UTF8String], -1)
new GenericArrayData(strings.asInstanceOf[Array[Any]])
GenericArrayData.allocate(strings.asInstanceOf[Array[Any]])
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val arrayClass = classOf[GenericArrayData].getName
nullSafeCodeGen(ctx, ev, (str, pattern) =>
// Array in java is covariant, so we don't need to cast UTF8String[] to Object[].
s"""${ev.value} = new $arrayClass($str.split($pattern, -1));""")
s"""${ev.value} = $arrayClass.allocate($str.split($pattern, -1));""")
}

override def prettyName: String = "split"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class JacksonParser(
values += fieldConverter.apply(parser)
}

new GenericArrayData(values.toArray)
GenericArrayData.allocate(values.toArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ private[sql] class UngroupableUDT extends UserDefinedType[UngroupableData] {
override def sqlType: DataType = MapType(IntegerType, IntegerType)

override def serialize(ungroupableData: UngroupableData): MapData = {
val keyArray = new GenericArrayData(ungroupableData.data.keys.toSeq)
val valueArray = new GenericArrayData(ungroupableData.data.values.toSeq)
val keyArray = GenericArrayData.allocate(ungroupableData.data.keys.toSeq)
val valueArray = GenericArrayData.allocate(ungroupableData.data.values.toSeq)
new ArrayBasedMapData(keyArray, valueArray)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ class EncoderResolutionSuite extends PlanTest {
val bound = encoder.resolveAndBind(attrs)

// If no null values appear, it should works fine
bound.fromRow(InternalRow(new GenericArrayData(Array(1, 2))))
bound.fromRow(InternalRow(GenericArrayData.allocate(Array(1, 2))))

// If there is null value, it should throw runtime exception
val e = intercept[RuntimeException] {
bound.fromRow(InternalRow(new GenericArrayData(Array(1, null))))
bound.fromRow(InternalRow(GenericArrayData.allocate(Array(1, null))))
}
assert(e.getMessage.contains("Null value appeared in non-nullable field"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class ExamplePointUDT extends UserDefinedType[ExamplePoint] {
val output = new Array[Any](2)
output(0) = p.x
output(1) = p.y
new GenericArrayData(output)
GenericArrayData.allocate(output)
}

override def deserialize(datum: Any): ExamplePoint = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ class UnsafeRowConverterSuite extends SparkFunSuite with Matchers {
assert(unsafeRow.getSizeInBytes == 8 + 2 * 8 + row1.getSizeInBytes + row2.getSizeInBytes)
}

private def createArray(values: Any*): ArrayData = new GenericArrayData(values.toArray)
private def createArray(values: Any*): ArrayData = GenericArrayData.allocate(values.toArray)

private def createMap(keys: Any*)(values: Any*): MapData = {
assert(keys.length == values.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class GeneratedProjectionSuite extends SparkFunSuite {
test("generated unsafe projection with array of binary") {
val row = InternalRow(
Array[Byte](1, 2),
new GenericArrayData(Array(Array[Byte](1, 2), null, Array[Byte](3, 4))))
GenericArrayData.allocate(Array(Array[Byte](1, 2), null, Array[Byte](3, 4))))
val fields = (BinaryType :: ArrayType(BinaryType) :: Nil).toArray[DataType]

val unsafeProj = UnsafeProjection.create(fields)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ private[parquet] class ParquetRowConverter(

override def getConverter(fieldIndex: Int): Converter = elementConverter

override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))
override def end(): Unit = updater.set(GenericArrayData.allocate(currentArray.toArray))

// NOTE: We can't reuse the mutable `ArrayBuffer` here and must instantiate a new buffer for the
// next value. `Row.copy()` only copies row cells, it doesn't do deep copy to objects stored
Expand Down Expand Up @@ -590,7 +590,7 @@ private[parquet] class ParquetRowConverter(

protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater {
override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray))
override def end(): Unit = updater.set(GenericArrayData.allocate(currentArray.toArray))
override def set(value: Any): Unit = currentArray += value
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ object EvaluatePython {
case (c, BinaryType) if c.getClass.isArray && c.getClass.getComponentType.getName == "byte" => c

case (c: java.util.List[_], ArrayType(elementType, _)) =>
new GenericArrayData(c.asScala.map { e => fromJava(e, elementType)}.toArray)
GenericArrayData.allocate(c.asScala.map { e => fromJava(e, elementType)}.toArray)

case (c, ArrayType(elementType, _)) if c.getClass.isArray =>
new GenericArrayData(c.asInstanceOf[Array[_]].map(e => fromJava(e, elementType)))
GenericArrayData.allocate(c.asInstanceOf[Array[_]].map(e => fromJava(e, elementType)))

case (javaMap: java.util.Map[_, _], MapType(keyType, valueType, _)) =>
ArrayBasedMapData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[sql] class ExamplePointUDT extends UserDefinedType[ExamplePoint] {
val output = new Array[Any](2)
output(0) = p.x
output(1) = p.y
new GenericArrayData(output)
GenericArrayData.allocate(output)
}

override def deserialize(datum: Any): ExamplePoint = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class UnsafeRowSuite extends SparkFunSuite {
}

test("calling hashCode on unsafe array returned by getArray(ordinal)") {
val row = InternalRow.apply(new GenericArrayData(Array(1L)))
val row = InternalRow.apply(GenericArrayData.allocate(Array(1L)))
val unsafeRow = UnsafeProjection.create(Array[DataType](ArrayType(LongType))).apply(row)
// Makes sure hashCode on unsafe array won't crash
unsafeRow.getArray(0).hashCode()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object UDT {
override def sqlType: DataType = ArrayType(DoubleType, containsNull = false)

override def serialize(features: MyDenseVector): ArrayData = {
new GenericArrayData(features.data.map(_.asInstanceOf[Any]))
GenericArrayData.allocate(features.data.map(_.asInstanceOf[Any]))
}

override def deserialize(datum: Any): MyDenseVector = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object ColumnarTestUtils {
case STRUCT(_) =>
new GenericInternalRow(Array[Any](UTF8String.fromString(Random.nextString(10))))
case ARRAY(_) =>
new GenericArrayData(Array[Any](Random.nextInt(), Random.nextInt()))
GenericArrayData.allocate(Array[Any](Random.nextInt(), Random.nextInt()))
case MAP(_) =>
ArrayBasedMapData(
Map(Random.nextInt() -> UTF8String.fromString(Random.nextString(Random.nextInt(32)))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class HiveInspectorSuite extends SparkFunSuite with HiveInspectors {
test("wrap / unwrap Array Type") {
val dt = ArrayType(dataTypes(0))

val d = new GenericArrayData(Array(row(0), row(0)))
val d = GenericArrayData.allocate(Array(row(0), row(0)))
checkValue(d, unwrap(wrap(d, toInspector(dt), dt), toInspector(dt)))
checkValue(null, unwrap(wrap(null, toInspector(dt), dt), toInspector(dt)))
checkValue(d,
Expand Down