Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
145 commits
Select commit Hold shift + click to select a range
48eab1d
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
b348901
[SPARK-23808][SQL] Set default Spark session in test-only spark sessi…
jose-torres Mar 30, 2018
df05fb6
[SPARK-23743][SQL] Changed a comparison logic from containing 'slf4j'…
jongyoul Mar 30, 2018
b02e76c
[SPARK-23727][SQL] Support for pushing down filters for DateType in p…
yucai Mar 30, 2018
5b5a36e
Roll forward "[SPARK-23096][SS] Migrate rate source to V2"
jose-torres Mar 30, 2018
bc8d093
[SPARK-23500][SQL][FOLLOWUP] Fix complex type simplification rules to…
gatorsmile Mar 30, 2018
ae91720
[SPARK-23640][CORE] Fix hadoop config may override spark config
wangyum Mar 30, 2018
15298b9
[SPARK-23827][SS] StreamingJoinExec should ensure that input data is …
tdas Mar 30, 2018
529f847
[SPARK-23040][CORE][FOLLOW-UP] Avoid double wrap result Iterator.
jiangxb1987 Mar 31, 2018
5e24fc6
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
44a9f8e
[SPARK-15009][PYTHON][FOLLOWUP] Add default param checks for CountVec…
BryanCutler Apr 2, 2018
6151f29
[SPARK-23825][K8S] Requesting memory + memory overhead for pod memory
dvogelbacher Apr 2, 2018
fe2b7a4
[SPARK-23285][K8S] Add a config property for specifying physical exec…
liyinan926 Apr 2, 2018
a7c19d9
[SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder classes
kiszk Apr 2, 2018
28ea4e3
[SPARK-23834][TEST] Wait for connection before disconnect in Launcher…
Apr 2, 2018
a135182
[SPARK-23690][ML] Add handleinvalid to VectorAssembler
Apr 2, 2018
441d0d0
[SPARK-19964][CORE] Avoid reading from remote repos in SparkSubmitSuite.
Apr 3, 2018
8020f66
[MINOR][DOC] Fix a few markdown typos
Apr 3, 2018
7cf9fab
[MINOR][CORE] Show block manager id when remove RDD/Broadcast fails.
jiangxb1987 Apr 3, 2018
66a3a5a
[SPARK-23099][SS] Migrate foreach sink to DataSourceV2
jose-torres Apr 3, 2018
1035aaa
[SPARK-23587][SQL] Add interpreted execution for MapObjects expression
viirya Apr 3, 2018
359375e
[SPARK-23809][SQL] Active SparkSession should be set by getOrCreate
ericl Apr 4, 2018
5cfd5fa
[SPARK-23802][SQL] PropagateEmptyRelation can leave query plan in unr…
Apr 4, 2018
16ef6ba
[SPARK-23826][TEST] TestHiveSparkSession should set default session
gatorsmile Apr 4, 2018
5197562
[SPARK-21351][SQL] Update nullability based on children's output
maropu Apr 4, 2018
a355236
[SPARK-23583][SQL] Invoke should support interpreted execution
kiszk Apr 4, 2018
cccaaa1
[SPARK-23668][K8S] Add config option for passing through k8s Pod.spec…
Apr 4, 2018
d8379e5
[SPARK-23838][WEBUI] Running SQL query is displayed as "completed" in…
gengliangwang Apr 4, 2018
d3bd043
[SPARK-23637][YARN] Yarn might allocate more resource if a same execu…
Apr 4, 2018
c5c8b54
[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean e…
viirya Apr 5, 2018
1822ecd
[SPARK-23582][SQL] StaticInvoke should support interpreted execution
kiszk Apr 5, 2018
b2329fb
Revert "[SPARK-23593][SQL] Add interpreted execution for InitializeJa…
hvanhovell Apr 5, 2018
d9ca1c9
[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean e…
viirya Apr 5, 2018
4807d38
[SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks to choose se…
kiszk Apr 6, 2018
f2ac087
[SPARK-23870][ML] Forward RFormula handleInvalid Param to VectorAssem…
Apr 6, 2018
d65e531
[SPARK-23823][SQL] Keep origin in transformExpression
Apr 6, 2018
249007e
[SPARK-19724][SQL] create a managed table with an existed default tab…
gengliangwang Apr 6, 2018
6ade5cb
[MINOR][DOC] Fix some typos and grammar issues
dsakuma Apr 6, 2018
9452401
[SPARK-23822][SQL] Improve error message for Parquet schema mismatches
yuchenhuo Apr 6, 2018
d766ea2
[SPARK-23861][SQL][DOC] Clarify default window frame with and without…
icexelloss Apr 6, 2018
c926acf
[SPARK-23882][CORE] UTF8StringSuite.writeToOutputStreamUnderflow() is…
kiszk Apr 6, 2018
d23a805
[SPARK-23859][ML] Initial PR for Instrumentation improvements: UUID a…
MrBago Apr 6, 2018
b6935ff
[SPARK-10399][SPARK-23879][HOTFIX] Fix Java lint errors
kiszk Apr 6, 2018
e998250
[SPARK-23828][ML][PYTHON] PySpark StringIndexerModel should have cons…
huaxingao Apr 6, 2018
6ab134c
[SPARK-21898][ML][FOLLOWUP] Fix Scala 2.12 build.
ueshin Apr 6, 2018
2c1fe64
[SPARK-23847][PYTHON][SQL] Add asc_nulls_first, asc_nulls_last to PyS…
huaxingao Apr 8, 2018
6a73457
[SPARK-23849][SQL] Tests for the samplingRatio option of JSON datasource
MaxGekk Apr 8, 2018
710a68c
[SPARK-23892][TEST] Improve converge and fix lint error in UTF8String…
kiszk Apr 8, 2018
8d40a79
[SPARK-23893][CORE][SQL] Avoid possible integer overflow in multiplic…
kiszk Apr 8, 2018
32471ba
Fix typo in Python docstring kinesis example
Apr 9, 2018
d81f29e
[SPARK-23881][CORE][TEST] Fix flaky test JobCancellationSuite."interr…
jiangxb1987 Apr 9, 2018
10f45bb
[SPARK-23816][CORE] Killed tasks should ignore FetchFailures.
squito Apr 9, 2018
7c1654e
[SPARK-22856][SQL] Add wrappers for codegen output and nullability
viirya Apr 9, 2018
252468a
[SPARK-14681][ML] Provide label/impurity stats for spark.ml decision …
WeichenXu123 Apr 9, 2018
61b7247
[INFRA] Close stale PRs.
Apr 9, 2018
f94f362
[SPARK-23947][SQL] Add hashUTF8String convenience method to hasher cl…
rednaxelafx Apr 10, 2018
6498884
[SPARK-23898][SQL] Simplify add & subtract code generation
hvanhovell Apr 10, 2018
95034af
[SPARK-23841][ML] NodeIdCache should unpersist the last cached nodeId…
zhengruifeng Apr 10, 2018
3323b15
[SPARK-23864][SQL] Add unsafe object writing to UnsafeWriter
hvanhovell Apr 10, 2018
e179658
[SPARK-19724][SQL][FOLLOW-UP] Check location of managed table when ig…
gengliangwang Apr 10, 2018
adb222b
[SPARK-23751][ML][PYSPARK] Kolmogorov-Smirnoff test Python API in pys…
WeichenXu123 Apr 10, 2018
4f1e8b9
[SPARK-23871][ML][PYTHON] add python api for VectorAssembler handleIn…
huaxingao Apr 10, 2018
7c7570d
[SPARK-23944][ML] Add the set method for the two LSHModel
lu-wang-dl Apr 11, 2018
c7622be
[SPARK-23847][FOLLOWUP][PYTHON][SQL] Actually test [desc|acs]_nulls_[…
HyukjinKwon Apr 11, 2018
87611bb
[MINOR][DOCS] Fix R documentation generation instruction for roxygen2
HyukjinKwon Apr 11, 2018
c604d65
[SPARK-23951][SQL] Use actual java class instead of string representa…
hvanhovell Apr 11, 2018
271c891
[SPARK-23960][SQL][MINOR] Mark HashAggregateExec.bufVars as transient
rednaxelafx Apr 11, 2018
653fe02
[SPARK-6951][CORE] Speed up parsing of event logs during listing.
Apr 11, 2018
3cb8204
[SPARK-22941][CORE] Do not exit JVM when submit fails with in-process…
Apr 11, 2018
75a1830
[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M
jkbradley Apr 11, 2018
9d960de
typo rawPredicition changed to rawPrediction
JBauerKogentix Apr 11, 2018
e904dfa
Revert "[SPARK-23960][SQL][MINOR] Mark HashAggregateExec.bufVars as t…
gatorsmile Apr 12, 2018
6a2289e
[SPARK-23962][SQL][TEST] Fix race in currentExecutionIds().
squito Apr 12, 2018
0b19122
[SPARK-23762][SQL] UTF8StringBuffer uses MemoryBlock
kiszk Apr 12, 2018
0f93b91
[SPARK-23751][FOLLOW-UP] fix build for scala-2.12
WeichenXu123 Apr 12, 2018
682002b
[SPARK-23867][SCHEDULER] use droppedCount in logWarning
Apr 13, 2018
14291b0
[SPARK-23748][SS] Fix SS continuous process doesn't support SubqueryA…
jerryshao Apr 13, 2018
ab7b961
[SPARK-23942][PYTHON][SQL] Makes collect in PySpark as action for a q…
HyukjinKwon Apr 13, 2018
1018be4
[SPARK-23971] Should not leak Spark sessions across test suites
ericl Apr 13, 2018
4b07036
[SPARK-23815][CORE] Spark writer dynamic partition overwrite mode may…
Apr 13, 2018
0323e61
[SPARK-23905][SQL] Add UDF weekday
yucai Apr 13, 2018
a83ae0d
[SPARK-22839][K8S] Refactor to unify driver and executor pod builder …
mccheah Apr 13, 2018
4dfd746
[SPARK-23896][SQL] Improve PartitioningAwareFileIndex
gengliangwang Apr 13, 2018
25892f3
[SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer
mgaido91 Apr 13, 2018
558f31b
[SPARK-23963][SQL] Properly handle large number of columns in query o…
bersprockets Apr 13, 2018
cbb41a0
[SPARK-23966][SS] Refactoring all checkpoint file writing logic in a …
tdas Apr 13, 2018
73f2853
[SPARK-23979][SQL] MultiAlias should not be a CodegenFallback
viirya Apr 14, 2018
c096493
[SPARK-23956][YARN] Use effective RPC port in AM registration
gerashegalov Apr 16, 2018
6931022
[SPARK-23917][SQL] Add array_max function
mgaido91 Apr 16, 2018
083cf22
[SPARK-21033][CORE][FOLLOW-UP] Update Spillable
wangyum Apr 16, 2018
5003736
[SPARK-9312][ML] Add RawPrediction, numClasses, and numFeatures for O…
lu-wang-dl Apr 16, 2018
0461482
[SPARK-21088][ML] CrossValidator, TrainValidationSplit support collec…
WeichenXu123 Apr 16, 2018
fd990a9
[SPARK-23873][SQL] Use accessors in interpreted LambdaVariable
viirya Apr 16, 2018
14844a6
[SPARK-23918][SQL] Add array_min function
mgaido91 Apr 17, 2018
1cc66a0
[SPARK-23687][SS] Add a memory source for continuous processing.
jose-torres Apr 17, 2018
05ae747
[SPARK-23747][STRUCTURED STREAMING] Add EpochCoordinator unit tests
Apr 17, 2018
30ffb53
[SPARK-23875][SQL] Add IndexedSeq wrapper for ArrayData
viirya Apr 17, 2018
0a9172a
[SPARK-23835][SQL] Add not-null check to Tuples' arguments deserializ…
mgaido91 Apr 17, 2018
ed4101d
[SPARK-22676] Avoid iterating all partition paths when spark.sql.hive…
Apr 17, 2018
3990daa
[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks
Apr 17, 2018
f39e82c
[SPARK-23986][SQL] freshName can generate non-unique names
mgaido91 Apr 17, 2018
1ca3c50
[SPARK-21741][ML][PYSPARK] Python API for DataFrame-based multivariat…
WeichenXu123 Apr 17, 2018
5fccdae
[SPARK-22968][DSTREAM] Throw an exception on partition revoking issue
jerryshao Apr 18, 2018
1e3b876
[SPARK-21479][SQL] Outer join filter pushdown in null supplying table…
maryannxue Apr 18, 2018
310a8cd
[SPARK-23341][SQL] define some standard options for data source v2
cloud-fan Apr 18, 2018
cce4694
[SPARK-24002][SQL] Task not serializable caused by org.apache.parquet…
gatorsmile Apr 18, 2018
f81fa47
[SPARK-23926][SQL] Extending reverse function to support ArrayType ar…
Apr 18, 2018
f09a9e9
[SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might g…
ueshin Apr 18, 2018
a906647
[SPARK-23875][SQL][FOLLOWUP] Add IndexedSeq wrapper for ArrayData
viirya Apr 18, 2018
0c94e48
[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
gaborgsomogyi Apr 18, 2018
8bb0df2
[SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingList…
viirya Apr 19, 2018
d5bec48
[SPARK-23919][SQL] Add array_position function
kiszk Apr 19, 2018
46bb2b5
[SPARK-23924][SQL] Add element_at function
kiszk Apr 19, 2018
1b08c43
[SPARK-23584][SQL] NewInstance should support interpreted execution
maropu Apr 19, 2018
e134165
[SPARK-23588][SQL] CatalystToExternalMap should support interpreted e…
maropu Apr 19, 2018
9e10f69
[SPARK-22676][FOLLOW-UP] fix code style for test.
Apr 19, 2018
d96c3e3
[SPARK-21811][SQL] Fix the inconsistency behavior when finding the wi…
jiangxb1987 Apr 19, 2018
0deaa52
[SPARK-24021][CORE] fix bug in BlacklistTracker's updateBlacklistForF…
Ngone51 Apr 19, 2018
6e19f76
[SPARK-23989][SQL] exchange should copy data before non-serialized sh…
cloud-fan Apr 19, 2018
a471880
[SPARK-24026][ML] Add Power Iteration Clustering to spark.ml
wangmiao1981 Apr 19, 2018
9ea8d3d
[SPARK-22362][SQL] Add unit test for Window Aggregate Functions
attilapiros Apr 19, 2018
e55953b
[SPARK-24022][TEST] Make SparkContextSuite not flaky
gaborgsomogyi Apr 19, 2018
b3fde5a
[SPARK-23877][SQL] Use filter predicates to prune partitions in metad…
rdblue Apr 20, 2018
e6b4660
[SPARK-23736][SQL] Extending the concat function to support array col…
Apr 20, 2018
074a7f9
[SPARK-23588][SQL][FOLLOW-UP] Resolve a map builder method per execut…
maropu Apr 20, 2018
0dd97f6
[SPARK-23595][SQL] ValidateExternalType should support interpreted ex…
maropu Apr 20, 2018
1d758dc
Revert "[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky"
Apr 20, 2018
32b4bcd
[SPARK-24029][CORE] Set SO_REUSEADDR on listen sockets.
Apr 21, 2018
7bc853d
[SPARK-24033][SQL] Fix Mismatched of Window Frame specifiedwindowfram…
gatorsmile Apr 21, 2018
ae8a388
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
c48085a
[SPARK-23799][SQL] FilterEstimation.evaluateInSet produces devision b…
Apr 22, 2018
c3a86fa
[SPARK-10399][SPARK-23879][FOLLOWUP][CORE] Free unused off-heap memor…
kiszk Apr 23, 2018
f70f46d
[SPARK-23877][SQL][FOLLOWUP] use PhysicalOperation to simplify the ha…
cloud-fan Apr 23, 2018
d87d30e
[SPARK-23564][SQL] infer additional filters from constraints for join…
cloud-fan Apr 23, 2018
afbdf42
[SPARK-23589][SQL] ExternalMapToCatalyst should support interpreted e…
maropu Apr 23, 2018
293a0f2
[Spark-24024][ML] Fix poisson deviance calculations in GLM to handle …
tengpeng Apr 23, 2018
448d248
[SPARK-21168] KafkaRDD should always set kafka clientId.
liu-zhaokun Apr 23, 2018
770add8
[SPARK-23004][SS] Ensure StateStore.commit is called only once in a s…
tdas Apr 23, 2018
e82cb68
[SPARK-11237][ML] Add pmml export for k-means in Spark ML
holdenk Apr 23, 2018
c8f3ac6
[SPARK-23888][CORE] correct the comment of hasAttemptOnHost()
Ngone51 Apr 23, 2018
efcfc64
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
b24f041
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
9d9c248
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
bbe1a82
Merge branch 'SPARK-23429' of https://github.com/edwinalu/spark into …
edwinalu Apr 23, 2018
8ae0126
fix MimaExcludes.scala
edwinalu Apr 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder classes
## What changes were proposed in this pull request?

This PR implemented the following cleanups related to  `UnsafeWriter` class:
- Remove code duplication between `UnsafeRowWriter` and `UnsafeArrayWriter`
- Make `BufferHolder` class internal by delegating its accessor methods to `UnsafeWriter`
- Replace `UnsafeRow.setTotalSize(...)` with `UnsafeRowWriter.setTotalSize()`

## How was this patch tested?

Tested by existing UTs

Author: Kazuaki Ishizaki <[email protected]>

Closes #20850 from kiszk/SPARK-23713.
  • Loading branch information
kiszk authored and hvanhovell committed Apr 2, 2018
commit a7c19d9c21d59fd0109a7078c80b33d3da03fafd
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,10 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

/**
* A [[ContinuousReader]] for data from kafka.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.clients.consumer.ConsumerRecord

import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter}
import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.unsafe.types.UTF8String

/** A simple class for converting Kafka ConsumerRecord to UnsafeRow */
private[kafka010] class KafkaRecordToUnsafeRowConverter {
private val sharedRow = new UnsafeRow(7)
private val bufferHolder = new BufferHolder(sharedRow)
private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
private val rowWriter = new UnsafeRowWriter(7)

def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow = {
bufferHolder.reset()
rowWriter.reset()

if (record.key == null) {
rowWriter.setNullAt(0)
Expand All @@ -46,7 +44,6 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
5,
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(record.timestamp)))
rowWriter.write(6, record.timestampType.id)
sharedRow.setTotalSize(bufferHolder.totalSize)
sharedRow
rowWriter.getRow()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,21 @@
* this class per writing program, so that the memory segment/data buffer can be reused. Note that
* for each incoming record, we should call `reset` of BufferHolder instance before write the record
* and reuse the data buffer.
*
* Generally we should call `UnsafeRow.setTotalSize` and pass in `BufferHolder.totalSize` to update
* the size of the result row, after writing a record to the buffer. However, we can skip this step
* if the fields of row are all fixed-length, as the size of result row is also fixed.
*/
public class BufferHolder {
final class BufferHolder {

private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;

public byte[] buffer;
public int cursor = Platform.BYTE_ARRAY_OFFSET;
private byte[] buffer;
private int cursor = Platform.BYTE_ARRAY_OFFSET;
private final UnsafeRow row;
private final int fixedSize;

public BufferHolder(UnsafeRow row) {
BufferHolder(UnsafeRow row) {
this(row, 64);
}

public BufferHolder(UnsafeRow row, int initialSize) {
BufferHolder(UnsafeRow row, int initialSize) {
int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {
throw new UnsupportedOperationException(
Expand All @@ -64,7 +60,7 @@ public BufferHolder(UnsafeRow row, int initialSize) {
/**
* Grows the buffer by at least neededSize and points the row to the buffer.
*/
public void grow(int neededSize) {
void grow(int neededSize) {
if (neededSize > ARRAY_MAX - totalSize()) {
throw new UnsupportedOperationException(
"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
Expand All @@ -86,11 +82,23 @@ public void grow(int neededSize) {
}
}

public void reset() {
byte[] getBuffer() {
return buffer;
}

int getCursor() {
return cursor;
}

void increaseCursor(int val) {
cursor += val;
}

void reset() {
cursor = Platform.BYTE_ARRAY_OFFSET + fixedSize;
}

public int totalSize() {
int totalSize() {
return cursor - Platform.BYTE_ARRAY_OFFSET;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.bitset.BitSetMethods;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.UTF8String;

import static org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.calculateHeaderPortionInBytes;

Expand All @@ -32,141 +30,123 @@
*/
public final class UnsafeArrayWriter extends UnsafeWriter {

private BufferHolder holder;

// The offset of the global buffer where we start to write this array.
private int startingOffset;

// The number of elements in this array
private int numElements;

// The element size in this array
private int elementSize;

private int headerInBytes;

private void assertIndexIsValid(int index) {
assert index >= 0 : "index (" + index + ") should >= 0";
assert index < numElements : "index (" + index + ") should < " + numElements;
}

public void initialize(BufferHolder holder, int numElements, int elementSize) {
public UnsafeArrayWriter(UnsafeWriter writer, int elementSize) {
super(writer.getBufferHolder());
this.elementSize = elementSize;
}

public void initialize(int numElements) {
// We need 8 bytes to store numElements in header
this.numElements = numElements;
this.headerInBytes = calculateHeaderPortionInBytes(numElements);

this.holder = holder;
this.startingOffset = holder.cursor;
this.startingOffset = cursor();

// Grows the global buffer ahead for header and fixed size data.
int fixedPartInBytes =
ByteArrayMethods.roundNumberOfBytesToNearestWord(elementSize * numElements);
holder.grow(headerInBytes + fixedPartInBytes);

// Write numElements and clear out null bits to header
Platform.putLong(holder.buffer, startingOffset, numElements);
Platform.putLong(getBuffer(), startingOffset, numElements);
for (int i = 8; i < headerInBytes; i += 8) {
Platform.putLong(holder.buffer, startingOffset + i, 0L);
Platform.putLong(getBuffer(), startingOffset + i, 0L);
}

// fill 0 into reminder part of 8-bytes alignment in unsafe array
for (int i = elementSize * numElements; i < fixedPartInBytes; i++) {
Platform.putByte(holder.buffer, startingOffset + headerInBytes + i, (byte) 0);
Platform.putByte(getBuffer(), startingOffset + headerInBytes + i, (byte) 0);
}
holder.cursor += (headerInBytes + fixedPartInBytes);
increaseCursor(headerInBytes + fixedPartInBytes);
}

private void zeroOutPaddingBytes(int numBytes) {
if ((numBytes & 0x07) > 0) {
Platform.putLong(holder.buffer, holder.cursor + ((numBytes >> 3) << 3), 0L);
}
}

private long getElementOffset(int ordinal, int elementSize) {
private long getElementOffset(int ordinal) {
return startingOffset + headerInBytes + ordinal * elementSize;
}

public void setOffsetAndSize(int ordinal, int currentCursor, int size) {
assertIndexIsValid(ordinal);
final long relativeOffset = currentCursor - startingOffset;
final long offsetAndSize = (relativeOffset << 32) | (long)size;

write(ordinal, offsetAndSize);
}

private void setNullBit(int ordinal) {
assertIndexIsValid(ordinal);
BitSetMethods.set(holder.buffer, startingOffset + 8, ordinal);
BitSetMethods.set(getBuffer(), startingOffset + 8, ordinal);
}

public void setNull1Bytes(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), (byte)0);
writeByte(getElementOffset(ordinal), (byte)0);
}

public void setNull2Bytes(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), (short)0);
writeShort(getElementOffset(ordinal), (short)0);
}

public void setNull4Bytes(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), 0);
writeInt(getElementOffset(ordinal), 0);
}

public void setNull8Bytes(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
Platform.putLong(holder.buffer, getElementOffset(ordinal, 8), (long)0);
writeLong(getElementOffset(ordinal), 0);
}

public void setNull(int ordinal) { setNull8Bytes(ordinal); }

public void write(int ordinal, boolean value) {
assertIndexIsValid(ordinal);
Platform.putBoolean(holder.buffer, getElementOffset(ordinal, 1), value);
writeBoolean(getElementOffset(ordinal), value);
}

public void write(int ordinal, byte value) {
assertIndexIsValid(ordinal);
Platform.putByte(holder.buffer, getElementOffset(ordinal, 1), value);
writeByte(getElementOffset(ordinal), value);
}

public void write(int ordinal, short value) {
assertIndexIsValid(ordinal);
Platform.putShort(holder.buffer, getElementOffset(ordinal, 2), value);
writeShort(getElementOffset(ordinal), value);
}

public void write(int ordinal, int value) {
assertIndexIsValid(ordinal);
Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), value);
writeInt(getElementOffset(ordinal), value);
}

public void write(int ordinal, long value) {
assertIndexIsValid(ordinal);
Platform.putLong(holder.buffer, getElementOffset(ordinal, 8), value);
writeLong(getElementOffset(ordinal), value);
}

public void write(int ordinal, float value) {
if (Float.isNaN(value)) {
value = Float.NaN;
}
assertIndexIsValid(ordinal);
Platform.putFloat(holder.buffer, getElementOffset(ordinal, 4), value);
writeFloat(getElementOffset(ordinal), value);
}

public void write(int ordinal, double value) {
if (Double.isNaN(value)) {
value = Double.NaN;
}
assertIndexIsValid(ordinal);
Platform.putDouble(holder.buffer, getElementOffset(ordinal, 8), value);
writeDouble(getElementOffset(ordinal), value);
}

public void write(int ordinal, Decimal input, int precision, int scale) {
// make sure Decimal object has the same scale as DecimalType
assertIndexIsValid(ordinal);
if (input.changePrecision(precision, scale)) {
if (input != null && input.changePrecision(precision, scale)) {
if (precision <= Decimal.MAX_LONG_DIGITS()) {
write(ordinal, input.toUnscaledLong());
} else {
Expand All @@ -180,65 +160,14 @@ public void write(int ordinal, Decimal input, int precision, int scale) {

// Write the bytes to the variable length portion.
Platform.copyMemory(
bytes, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, numBytes);
setOffsetAndSize(ordinal, holder.cursor, numBytes);
bytes, Platform.BYTE_ARRAY_OFFSET, getBuffer(), cursor(), numBytes);
setOffsetAndSize(ordinal, numBytes);

// move the cursor forward with 8-bytes boundary
holder.cursor += roundedSize;
increaseCursor(roundedSize);
}
} else {
setNull(ordinal);
}
}

public void write(int ordinal, UTF8String input) {
final int numBytes = input.numBytes();
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);

// grow the global buffer before writing data.
holder.grow(roundedSize);

zeroOutPaddingBytes(numBytes);

// Write the bytes to the variable length portion.
input.writeToMemory(holder.buffer, holder.cursor);

setOffsetAndSize(ordinal, holder.cursor, numBytes);

// move the cursor forward.
holder.cursor += roundedSize;
}

public void write(int ordinal, byte[] input) {
final int numBytes = input.length;
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(input.length);

// grow the global buffer before writing data.
holder.grow(roundedSize);

zeroOutPaddingBytes(numBytes);

// Write the bytes to the variable length portion.
Platform.copyMemory(
input, Platform.BYTE_ARRAY_OFFSET, holder.buffer, holder.cursor, numBytes);

setOffsetAndSize(ordinal, holder.cursor, numBytes);

// move the cursor forward.
holder.cursor += roundedSize;
}

public void write(int ordinal, CalendarInterval input) {
// grow the global buffer before writing data.
holder.grow(16);

// Write the months and microseconds fields of Interval to the variable length portion.
Platform.putLong(holder.buffer, holder.cursor, input.months);
Platform.putLong(holder.buffer, holder.cursor + 8, input.microseconds);

setOffsetAndSize(ordinal, holder.cursor, 16);

// move the cursor forward.
holder.cursor += 16;
}
}
Loading