Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ private void throwOom(final MemoryBlock page, final long required) {
taskMemoryManager.freePage(page, this);
}
taskMemoryManager.showMemoryUsage();
throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " + got);
throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " +
got);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ public Date getTimestamp() {


/**
* Returns instance of [[KinesisInitialPosition]] based on the passed [[InitialPositionInStream]].
* This method is used in KinesisUtils for translating the InitialPositionInStream
* to InitialPosition. This function would be removed when we deprecate the KinesisUtils.
* Returns instance of [[KinesisInitialPosition]] based on the passed
* [[InitialPositionInStream]]. This method is used in KinesisUtils for translating the
* InitialPositionInStream to InitialPosition. This function would be removed when we deprecate
* the KinesisUtils.
*
* @return [[InitialPosition]]
*/
Expand All @@ -83,9 +84,10 @@ public static KinesisInitialPosition fromKinesisInitialPosition(
// InitialPositionInStream.AT_TIMESTAMP is not supported.
// Use InitialPosition.atTimestamp(timestamp) instead.
throw new UnsupportedOperationException(
"Only InitialPositionInStream.LATEST and InitialPositionInStream.TRIM_HORIZON " +
"supported in initialPositionInStream(). Please use the initialPosition() from " +
"builder API in KinesisInputDStream for using InitialPositionInStream.AT_TIMESTAMP");
"Only InitialPositionInStream.LATEST and InitialPositionInStream." +
"TRIM_HORIZON supported in initialPositionInStream(). Please use " +
"the initialPosition() from builder API in KinesisInputDStream for " +
"using InitialPositionInStream.AT_TIMESTAMP");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.parquet.schema.PrimitiveType;

import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;
Expand Down Expand Up @@ -96,7 +95,7 @@ public class VectorizedColumnReader {
private final OriginalType originalType;
// The timezone conversion to apply to int96 timestamps. Null if no conversion.
private final TimeZone convertTz;
private final static TimeZone UTC = DateTimeUtils.TimeZoneUTC();
private static final TimeZone UTC = DateTimeUtils.TimeZoneUTC();

public VectorizedColumnReader(
ColumnDescriptor descriptor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
private boolean[] missingColumns;

/**
* The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to workaround
* incompatibilities between different engines when writing timestamp values.
* The timezone that timestamp INT96 values should be converted to. Null if no conversion. Here to
* workaround incompatibilities between different engines when writing timestamp values.
*/
private TimeZone convertTz = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
* to be reused, callers should copy the data out if it needs to be stored.
*/
public final class ColumnarRow extends InternalRow {
// The data for this row. E.g. the value of 3rd int field is `data.getChildColumn(3).getInt(rowId)`.
// The data for this row.
// E.g. the value of 3rd int field is `data.getChildColumn(3).getInt(rowId)`.
private final ColumnVector data;
private final int rowId;
private final int numFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@

import org.apache.spark.annotation.InterfaceStability;

import java.util.List;
import java.util.Map;

/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* propagate session configs with the specified key-prefix to all data source operations in this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,8 @@ public interface ContinuousReadSupport extends DataSourceV2 {
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
ContinuousReader createContinuousReader(Optional<StructType> schema, String checkpointLocation, DataSourceV2Options options);
ContinuousReader createContinuousReader(
Optional<StructType> schema,
String checkpointLocation,
DataSourceV2Options options);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ public interface ContinuousWriteSupport extends BaseStreamingSink {
* Creates an optional {@link ContinuousWriter} to save the data to this data source. Data
* sources can return None if there is no writing needed to be done.
*
* @param queryId A unique string for the writing query. It's possible that there are many writing
* queries running at the same time, and the returned {@link DataSourceV2Writer}
* can use this id to distinguish itself from others.
* @param queryId A unique string for the writing query. It's possible that there are many
* writing queries running at the same time, and the returned
* {@link DataSourceV2Writer} can use this id to distinguish itself from others.
* @param schema the schema of the data to be written.
* @param mode the output mode which determines what successive epoch output means to this
* sink, please refer to {@link OutputMode} for more details.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public abstract class Offset extends org.apache.spark.sql.execution.streaming.Of
@Override
public boolean equals(Object obj) {
if (obj instanceof org.apache.spark.sql.execution.streaming.Offset) {
return this.json().equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json());
return this.json()
.equals(((org.apache.spark.sql.execution.streaming.Offset) obj).json());
} else {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@
* These offsets must be serializable.
*/
public interface PartitionOffset extends Serializable {

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.VariableSubstitution;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.OperationLog;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDe;
Expand Down