Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
a367840
[SPARK-10859] [SQL] fix stats of StringType in columnar cache
Sep 28, 2015
9b3014b
[SPARK-10833] [BUILD] Inline, organize BSD/MIT licenses in LICENSE
srowen Sep 29, 2015
d544932
[SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamic…
zsxwing Sep 29, 2015
3b23873
[SPARK-10871] include number of executor failures in error msg
ryan-williams Sep 29, 2015
cbc6aec
[SPARK-10058] [CORE] [TESTS] Fix the flaky tests in HeartbeatReceiver…
zsxwing Oct 1, 2015
8836ac3
[SPARK-10904] [SPARKR] Fix to support `select(df, c("col1", "col2"))`
felixcheung Oct 4, 2015
d323e5e
[SPARK-10889] [STREAMING] Bump KCL to add MillisBehindLatest metric
akatz Oct 4, 2015
c8392cd
[SPARK-10934] [SQL] handle hashCode of unsafe array correctly
cloud-fan Oct 6, 2015
6847be6
[SPARK-10901] [YARN] spark.yarn.user.classpath.first doesn't work
Oct 6, 2015
84f510c
[SPARK-10885] [STREAMING] Display the failed output op in Streaming UI
zsxwing Oct 6, 2015
b6a0933
[SPARK-10952] Only add hive to classpath if HIVE_HOME is set.
kevincox Oct 7, 2015
57978ae
[SPARK-10980] [SQL] fix bug in create Decimal
Oct 7, 2015
ba601b1
[SPARK-10914] UnsafeRow serialization breaks when two machines have d…
rxin Oct 9, 2015
3df7500
[SPARK-10955] [STREAMING] Add a warning if dynamic allocation for Str…
harishreedharan Oct 9, 2015
a3b4b93
Merge branch 'branch-1.5' of github.com:apache/spark into csd-1.5
markhamstra Oct 9, 2015
f95129c
[SPARK-10959] [PYSPARK] StreamingLogisticRegressionWithSGD does not t…
BryanCutler Oct 9, 2015
9a625f3
Merge branch 'branch-1.5' of github.com:apache/spark into csd-1.5
markhamstra Oct 9, 2015
5a10e10
[SPARK-10389] [SQL] support order by non-attribute grouping expressio…
cloud-fan Sep 2, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-10914] UnsafeRow serialization breaks when two machines have d…
…ifferent Oops size.

UnsafeRow contains 3 pieces of information when pointing to some data in memory (an object, a base offset, and length). When the row is serialized with Java/Kryo serialization, the object layout in memory can change if two machines have different pointer width (Oops in JVM).

To reproduce, launch Spark using

MASTER=local-cluster[2,1,1024] bin/spark-shell --conf "spark.executor.extraJavaOptions=-XX:-UseCompressedOops"

And then run the following

scala> sql("select 1 xx").collect()

Author: Reynold Xin <[email protected]>

Closes apache#9030 from rxin/SPARK-10914.

(cherry picked from commit 84ea287)
Signed-off-by: Reynold Xin <[email protected]>
  • Loading branch information
rxin committed Oct 9, 2015
commit ba601b1ac46e2e3c9438c2658f9d95f2365709b6
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.spark.sql.catalyst.expressions;

import java.io.IOException;
import java.io.OutputStream;
import java.io.*;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoSerializable;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import org.apache.spark.sql.types.*;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
Expand All @@ -35,6 +39,7 @@
import org.apache.spark.unsafe.types.UTF8String;

import static org.apache.spark.sql.types.DataTypes.*;
import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;

/**
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
Expand All @@ -52,7 +57,7 @@
*
* Instances of `UnsafeRow` act as pointers to row data stored in this format.
*/
public final class UnsafeRow extends MutableRow {
public final class UnsafeRow extends MutableRow implements Externalizable, KryoSerializable {

//////////////////////////////////////////////////////////////////////////////
// Static methods
Expand Down Expand Up @@ -596,4 +601,40 @@ public boolean anyNull() {
public void writeToMemory(Object target, long targetOffset) {
Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
}

@Override
public void writeExternal(ObjectOutput out) throws IOException {
byte[] bytes = getBytes();
out.writeInt(bytes.length);
out.writeInt(this.numFields);
out.write(bytes);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
in.readFully((byte[]) baseObject);
}

@Override
public void write(Kryo kryo, Output out) {
byte[] bytes = getBytes();
out.writeInt(bytes.length);
out.writeInt(this.numFields);
out.write(bytes);
}

@Override
public void read(Kryo kryo, Input in) {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
in.read((byte[]) baseObject);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.sql

import java.io.ByteArrayOutputStream

import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.{KryoSerializer, JavaSerializer}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeRow, UnsafeProjection}
import org.apache.spark.sql.types._
Expand All @@ -29,6 +30,32 @@ import org.apache.spark.unsafe.types.UTF8String

class UnsafeRowSuite extends SparkFunSuite {

test("UnsafeRow Java serialization") {
// serializing an UnsafeRow pointing to a large buffer should only serialize the relevant data
val data = new Array[Byte](1024)
val row = new UnsafeRow
row.pointTo(data, 1, 16)
row.setLong(0, 19285)

val ser = new JavaSerializer(new SparkConf).newInstance()
val row1 = ser.deserialize[UnsafeRow](ser.serialize(row))
assert(row1.getLong(0) == 19285)
assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16)
}

test("UnsafeRow Kryo serialization") {
// serializing an UnsafeRow pointing to a large buffer should only serialize the relevant data
val data = new Array[Byte](1024)
val row = new UnsafeRow
row.pointTo(data, 1, 16)
row.setLong(0, 19285)

val ser = new KryoSerializer(new SparkConf).newInstance()
val row1 = ser.deserialize[UnsafeRow](ser.serialize(row))
assert(row1.getLong(0) == 19285)
assert(row1.getBaseObject().asInstanceOf[Array[Byte]].length == 16)
}

test("bitset width calculation") {
assert(UnsafeRow.calculateBitSetWidthInBytes(0) === 0)
assert(UnsafeRow.calculateBitSetWidthInBytes(1) === 8)
Expand Down