Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixed a bug .
  • Loading branch information
rxin committed Aug 1, 2015
commit 6269f96c5fa7e5c6f120a348d54fdf9b900df5a7
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR
// The only reduction comes from merging the bitset portion of the two rows, saving 1 word.
val sizeReduction = bitset1Words + bitset2Words - outputBitsetWords

// Copy bitset from row1: pretty straightforward
// --------------------- copy bitset from row 1 ----------------------- //
val copyBitset1 = Seq.tabulate(bitset1Words) { i =>
s"""
|PlatformDependent.UNSAFE.putLong(buf, ${offset + i * 8},
| PlatformDependent.UNSAFE.getLong(obj1, ${offset + i * 8}));
""".stripMargin
}.mkString


// --------------------- copy bitset from row 2 ----------------------- //
var copyBitset2 = ""
if (bitset1Remainder == 0) {
copyBitset2 += Seq.tabulate(bitset2Words) { i =>
Expand All @@ -75,8 +77,6 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR
""".stripMargin
}.mkString
} else {
// Copy bitset from row2: slightly more complicated here, as we need to use the leftover
// space from the first bitset.
copyBitset2 = Seq.tabulate(bitset2Words) { i =>
s"""
|long bs2w$i = PlatformDependent.UNSAFE.getLong(obj2, ${offset + i * 8});
Expand Down Expand Up @@ -105,7 +105,8 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR
}
}.mkString("\n")

if (bitset2Remainder > (64 - bitset1Remainder)) {
if (bitset2Words > 0 &&
(bitset2Remainder == 0 || bitset2Remainder > (64 - bitset1Remainder))) {
val lastWord = bitset2Words - 1
copyBitset2 +=
s"""
Expand All @@ -115,6 +116,38 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR
}
}

// --------------------- copy fixed length portion from row 1 ----------------------- //
val copyFixedLengthRow1 = s"""
|PlatformDependent.UNSAFE.copyMemory(
| obj1, offset1 + ${bitset1Words * 8},
| buf, offset + ${outputBitsetWords * 8},
| ${schema1.size * 8});
""".stripMargin

// --------------------- copy fixed length portion from row 2 ----------------------- //
val copyFixedLengthRow2 = s"""
|PlatformDependent.UNSAFE.copyMemory(
| obj2, offset2 + ${bitset1Words * 8},
| buf, offset + ${(outputBitsetWords + schema1.size) * 8},
| ${schema2.size * 8});
""".stripMargin

// --------------------- copy variable length portion from row 1 ----------------------- //
val copyVariableLengthRow1 = s"""
|PlatformDependent.UNSAFE.copyMemory(
| obj1, offset1 + ${(bitset1Words + schema1.size) * 8},
| buf, offset + ${(outputBitsetWords + schema1.size + schema2.size) * 8},
| row1.getSizeInBytes() - ${(bitset1Words + schema1.size) * 8});
""".stripMargin

// --------------------- copy variable length portion from row 2 ----------------------- //
val copyVariableLengthRow2 = s"""
|PlatformDependent.UNSAFE.copyMemory(
| obj1, offset1 + ${(outputBitsetWords + schema1.size + schema2.size) * 8},
| buf, offset + ${(outputBitsetWords + schema1.size + schema2.size) * 8},
| ${schema1.size * 8});
""".stripMargin


val code = s"""
|public Object generate($exprType[] exprs) {
Expand Down Expand Up @@ -151,6 +184,8 @@ object GenerateRowConcat extends CodeGenerator[(StructType, StructType), UnsafeR

logDebug(s"code for GenerateRowConcat($schema1, $schema2):\n${CodeFormatter.format(code)}")

// println(CodeFormatter.format(code))

val c = compile(code)
c.generate(Array.empty).asInstanceOf[UnsafeRowConcat]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ class GenerateRowConcatSuite extends SparkFunSuite {
}

private def testBitsets(numFields1: Int, numFields2: Int): Unit = {
for (i <- 0 until 5) {
testBitsetsOnce(numFields1, numFields2)
}
}

private def testBitsetsOnce(numFields1: Int, numFields2: Int): Unit = {
val schema1 = StructType(Seq.tabulate(numFields1) { i => StructField(s"a_$i", IntegerType) })
val schema2 = StructType(Seq.tabulate(numFields2) { i => StructField(s"b_$i", IntegerType) })

Expand Down Expand Up @@ -64,8 +70,8 @@ class GenerateRowConcatSuite extends SparkFunSuite {

s"""
|input1: ${set1.mkString}
|input2: ${set2.mkString}
|output: ${out.mkString}
|input2: ${set2.mkString}
|output: ${out.mkString}
""".stripMargin
}

Expand All @@ -78,62 +84,63 @@ class GenerateRowConcatSuite extends SparkFunSuite {
}
}

test("boundary size 0, 0") {
test("bitset concat: boundary size 0, 0") {
testBitsets(0, 0)
}

test("boundary size 0, 64") {
test("bitset concat: boundary size 0, 64") {
testBitsets(0, 64)
}

test("boundary size 64, 0") {
test("bitset concat: boundary size 64, 0") {
testBitsets(64, 0)
}

test("boundary size 64, 64") {
test("bitset concat: boundary size 64, 64") {
testBitsets(64, 64)
}

test("boundary size 0, 128") {
test("bitset concat: boundary size 0, 128") {
testBitsets(0, 128)
}

test("boundary size 128, 0") {
test("bitset concat: boundary size 128, 0") {
testBitsets(128, 0)
}

test("boundary size 128, 128") {
test("bitset concat: boundary size 128, 128") {
testBitsets(128, 128)
}

test("single word bitsets") {
test("bitset concat: single word bitsets") {
testBitsets(10, 5)
}

test("first bitset larger than a word") {
test("bitset concat: first bitset larger than a word") {
testBitsets(67, 5)
}

test("second bitset larger than a word") {
test("bitset concat: second bitset larger than a word") {
testBitsets(6, 67)
}

test("no reduction in bitset size") {
test("bitset concat: no reduction in bitset size") {
testBitsets(33, 34)
}

test("two words") {
test("bitset concat: two words") {
testBitsets(120, 95)
}

test("bitset 595, 960") {
test("bitset concat: bitset 65, 128") {
testBitsets(65, 128)
}

test("randomized tests") {
test("bitset concat: randomized tests") {
for (i <- 1 until 20) {
val numFields1 = Random.nextInt(1000)
val numFields2 = Random.nextInt(1000)
info(s"num fields: $numFields1 and $numFields2")
testBitsets(numFields1, numFields2)
}
}
Expand Down