Skip to content

Commit c604cad

Browse files
committed
Merge remote-tracking branch 'upstream/master' into docSaveLoad
2 parents 1dd77cc + f7fe9e4 commit c604cad

File tree

447 files changed

+3017
-1828
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

447 files changed

+3017
-1828
lines changed

R/pkg/R/DataFrame.R

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1314,9 +1314,8 @@ setMethod("except",
13141314
#' write.df(df, "myfile", "parquet", "overwrite")
13151315
#' }
13161316
setMethod("write.df",
1317-
signature(df = "DataFrame", path = 'character', source = 'character',
1318-
mode = 'character'),
1319-
function(df, path = NULL, source = NULL, mode = "append", ...){
1317+
signature(df = "DataFrame", path = 'character'),
1318+
function(df, path, source = NULL, mode = "append", ...){
13201319
if (is.null(source)) {
13211320
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
13221321
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
@@ -1338,9 +1337,8 @@ setMethod("write.df",
13381337
#' @aliases saveDF
13391338
#' @export
13401339
setMethod("saveDF",
1341-
signature(df = "DataFrame", path = 'character', source = 'character',
1342-
mode = 'character'),
1343-
function(df, path = NULL, source = NULL, mode = "append", ...){
1340+
signature(df = "DataFrame", path = 'character'),
1341+
function(df, path, source = NULL, mode = "append", ...){
13441342
write.df(df, path, source, mode, ...)
13451343
})
13461344

R/pkg/R/SQLContext.R

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,11 @@ read.df <- function(sqlContext, path = NULL, source = NULL, ...) {
457457
if (!is.null(path)) {
458458
options[['path']] <- path
459459
}
460+
if (is.null(source)) {
461+
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
462+
source <- callJMethod(sqlContext, "getConf", "spark.sql.sources.default",
463+
"org.apache.spark.sql.parquet")
464+
}
460465
sdf <- callJMethod(sqlContext, "load", source, options)
461466
dataFrame(sdf)
462467
}

R/pkg/R/generics.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,11 +482,11 @@ setGeneric("saveAsTable", function(df, tableName, source, mode, ...) {
482482

483483
#' @rdname write.df
484484
#' @export
485-
setGeneric("write.df", function(df, path, source, mode, ...) { standardGeneric("write.df") })
485+
setGeneric("write.df", function(df, path, ...) { standardGeneric("write.df") })
486486

487487
#' @rdname write.df
488488
#' @export
489-
setGeneric("saveDF", function(df, path, source, mode, ...) { standardGeneric("saveDF") })
489+
setGeneric("saveDF", function(df, path, ...) { standardGeneric("saveDF") })
490490

491491
#' @rdname schema
492492
#' @export

R/pkg/R/sparkR.R

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,14 +225,21 @@ sparkR.init <- function(
225225
#' sqlContext <- sparkRSQL.init(sc)
226226
#'}
227227

228-
sparkRSQL.init <- function(jsc) {
228+
sparkRSQL.init <- function(jsc = NULL) {
229229
if (exists(".sparkRSQLsc", envir = .sparkREnv)) {
230230
return(get(".sparkRSQLsc", envir = .sparkREnv))
231231
}
232232

233+
# If jsc is NULL, create a Spark Context
234+
sc <- if (is.null(jsc)) {
235+
sparkR.init()
236+
} else {
237+
jsc
238+
}
239+
233240
sqlContext <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
234-
"createSQLContext",
235-
jsc)
241+
"createSQLContext",
242+
sc)
236243
assign(".sparkRSQLsc", sqlContext, envir = .sparkREnv)
237244
sqlContext
238245
}
@@ -249,12 +256,19 @@ sparkRSQL.init <- function(jsc) {
249256
#' sqlContext <- sparkRHive.init(sc)
250257
#'}
251258

252-
sparkRHive.init <- function(jsc) {
259+
sparkRHive.init <- function(jsc = NULL) {
253260
if (exists(".sparkRHivesc", envir = .sparkREnv)) {
254261
return(get(".sparkRHivesc", envir = .sparkREnv))
255262
}
256263

257-
ssc <- callJMethod(jsc, "sc")
264+
# If jsc is NULL, create a Spark Context
265+
sc <- if (is.null(jsc)) {
266+
sparkR.init()
267+
} else {
268+
jsc
269+
}
270+
271+
ssc <- callJMethod(sc, "sc")
258272
hiveCtx <- tryCatch({
259273
newJObject("org.apache.spark.sql.hive.HiveContext", ssc)
260274
}, error = function(err) {

bagel/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@
4040
<artifactId>spark-core_${scala.binary.version}</artifactId>
4141
<version>${project.version}</version>
4242
</dependency>
43+
<dependency>
44+
<groupId>org.apache.spark</groupId>
45+
<artifactId>spark-core_${scala.binary.version}</artifactId>
46+
<version>${project.version}</version>
47+
<type>test-jar</type>
48+
<scope>test</scope>
49+
</dependency>
4350
<dependency>
4451
<groupId>org.scalacheck</groupId>
4552
<artifactId>scalacheck_${scala.binary.version}</artifactId>

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.bagel
1919

20-
import org.scalatest.{BeforeAndAfter, FunSuite, Assertions}
20+
import org.scalatest.{BeforeAndAfter, Assertions}
2121
import org.scalatest.concurrent.Timeouts
2222
import org.scalatest.time.SpanSugar._
2323

@@ -27,7 +27,7 @@ import org.apache.spark.storage.StorageLevel
2727
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
2828
class TestMessage(val targetId: String) extends Message[String] with Serializable
2929

30-
class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {
30+
class BagelSuite extends SparkFunSuite with Assertions with BeforeAndAfter with Timeouts {
3131

3232
var sc: SparkContext = _
3333

bin/pyspark

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,7 @@ if [[ -n "$SPARK_TESTING" ]]; then
9090
unset YARN_CONF_DIR
9191
unset HADOOP_CONF_DIR
9292
export PYTHONHASHSEED=0
93-
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
94-
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
95-
else
96-
exec "$PYSPARK_DRIVER_PYTHON" $1
97-
fi
93+
exec "$PYSPARK_DRIVER_PYTHON" -m $1
9894
exit
9995
fi
10096

core/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,12 @@
338338
<dependency>
339339
<groupId>org.seleniumhq.selenium</groupId>
340340
<artifactId>selenium-java</artifactId>
341+
<exclusions>
342+
<exclusion>
343+
<groupId>com.google.guava</groupId>
344+
<artifactId>guava</artifactId>
345+
</exclusion>
346+
</exclusions>
341347
<scope>test</scope>
342348
</dependency>
343349
<!-- Added for selenium: -->
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.sort;
19+
20+
import java.io.File;
21+
import java.io.FileInputStream;
22+
import java.io.FileOutputStream;
23+
import java.io.IOException;
24+
25+
import scala.Product2;
26+
import scala.Tuple2;
27+
import scala.collection.Iterator;
28+
29+
import com.google.common.io.Closeables;
30+
import org.slf4j.Logger;
31+
import org.slf4j.LoggerFactory;
32+
33+
import org.apache.spark.Partitioner;
34+
import org.apache.spark.SparkConf;
35+
import org.apache.spark.TaskContext;
36+
import org.apache.spark.executor.ShuffleWriteMetrics;
37+
import org.apache.spark.serializer.Serializer;
38+
import org.apache.spark.serializer.SerializerInstance;
39+
import org.apache.spark.storage.*;
40+
import org.apache.spark.util.Utils;
41+
42+
/**
43+
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
44+
* writes incoming records to separate files, one file per reduce partition, then concatenates these
45+
* per-partition files to form a single output file, regions of which are served to reducers.
46+
* Records are not buffered in memory. This is essentially identical to
47+
* {@link org.apache.spark.shuffle.hash.HashShuffleWriter}, except that it writes output in a format
48+
* that can be served / consumed via {@link org.apache.spark.shuffle.IndexShuffleBlockResolver}.
49+
* <p>
50+
* This write path is inefficient for shuffles with large numbers of reduce partitions because it
51+
* simultaneously opens separate serializers and file streams for all partitions. As a result,
52+
* {@link SortShuffleManager} only selects this write path when
53+
* <ul>
54+
* <li>no Ordering is specified,</li>
55+
* <li>no Aggregator is specific, and</li>
56+
* <li>the number of partitions is less than
57+
* <code>spark.shuffle.sort.bypassMergeThreshold</code>.</li>
58+
* </ul>
59+
*
60+
* This code used to be part of {@link org.apache.spark.util.collection.ExternalSorter} but was
61+
* refactored into its own class in order to reduce code complexity; see SPARK-7855 for details.
62+
* <p>
63+
* There have been proposals to completely remove this code path; see SPARK-6026 for details.
64+
*/
65+
final class BypassMergeSortShuffleWriter<K, V> implements SortShuffleFileWriter<K, V> {
66+
67+
private final Logger logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
68+
69+
private final int fileBufferSize;
70+
private final boolean transferToEnabled;
71+
private final int numPartitions;
72+
private final BlockManager blockManager;
73+
private final Partitioner partitioner;
74+
private final ShuffleWriteMetrics writeMetrics;
75+
private final Serializer serializer;
76+
77+
/** Array of file writers, one for each partition */
78+
private BlockObjectWriter[] partitionWriters;
79+
80+
public BypassMergeSortShuffleWriter(
81+
SparkConf conf,
82+
BlockManager blockManager,
83+
Partitioner partitioner,
84+
ShuffleWriteMetrics writeMetrics,
85+
Serializer serializer) {
86+
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
87+
this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
88+
this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true);
89+
this.numPartitions = partitioner.numPartitions();
90+
this.blockManager = blockManager;
91+
this.partitioner = partitioner;
92+
this.writeMetrics = writeMetrics;
93+
this.serializer = serializer;
94+
}
95+
96+
@Override
97+
public void insertAll(Iterator<Product2<K, V>> records) throws IOException {
98+
assert (partitionWriters == null);
99+
if (!records.hasNext()) {
100+
return;
101+
}
102+
final SerializerInstance serInstance = serializer.newInstance();
103+
final long openStartTime = System.nanoTime();
104+
partitionWriters = new BlockObjectWriter[numPartitions];
105+
for (int i = 0; i < numPartitions; i++) {
106+
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
107+
blockManager.diskBlockManager().createTempShuffleBlock();
108+
final File file = tempShuffleBlockIdPlusFile._2();
109+
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
110+
partitionWriters[i] =
111+
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
112+
}
113+
// Creating the file to write to and creating a disk writer both involve interacting with
114+
// the disk, and can take a long time in aggregate when we open many files, so should be
115+
// included in the shuffle write time.
116+
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
117+
118+
while (records.hasNext()) {
119+
final Product2<K, V> record = records.next();
120+
final K key = record._1();
121+
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
122+
}
123+
124+
for (BlockObjectWriter writer : partitionWriters) {
125+
writer.commitAndClose();
126+
}
127+
}
128+
129+
@Override
130+
public long[] writePartitionedFile(
131+
BlockId blockId,
132+
TaskContext context,
133+
File outputFile) throws IOException {
134+
// Track location of the partition starts in the output file
135+
final long[] lengths = new long[numPartitions];
136+
if (partitionWriters == null) {
137+
// We were passed an empty iterator
138+
return lengths;
139+
}
140+
141+
final FileOutputStream out = new FileOutputStream(outputFile, true);
142+
final long writeStartTime = System.nanoTime();
143+
boolean threwException = true;
144+
try {
145+
for (int i = 0; i < numPartitions; i++) {
146+
final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
147+
boolean copyThrewException = true;
148+
try {
149+
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
150+
copyThrewException = false;
151+
} finally {
152+
Closeables.close(in, copyThrewException);
153+
}
154+
if (!blockManager.diskBlockManager().getFile(partitionWriters[i].blockId()).delete()) {
155+
logger.error("Unable to delete file for partition {}", i);
156+
}
157+
}
158+
threwException = false;
159+
} finally {
160+
Closeables.close(out, threwException);
161+
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
162+
}
163+
partitionWriters = null;
164+
return lengths;
165+
}
166+
167+
@Override
168+
public void stop() throws IOException {
169+
if (partitionWriters != null) {
170+
try {
171+
final DiskBlockManager diskBlockManager = blockManager.diskBlockManager();
172+
for (BlockObjectWriter writer : partitionWriters) {
173+
// This method explicitly does _not_ throw exceptions:
174+
writer.revertPartialWritesAndClose();
175+
if (!diskBlockManager.getFile(writer.blockId()).delete()) {
176+
logger.error("Error while deleting file for block {}", writer.blockId());
177+
}
178+
}
179+
} finally {
180+
partitionWriters = null;
181+
}
182+
}
183+
}
184+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.sort;
19+
20+
import java.io.File;
21+
import java.io.IOException;
22+
23+
import scala.Product2;
24+
import scala.collection.Iterator;
25+
26+
import org.apache.spark.annotation.Private;
27+
import org.apache.spark.TaskContext;
28+
import org.apache.spark.storage.BlockId;
29+
30+
/**
31+
* Interface for objects that {@link SortShuffleWriter} uses to write its output files.
32+
*/
33+
@Private
34+
public interface SortShuffleFileWriter<K, V> {
35+
36+
void insertAll(Iterator<Product2<K, V>> records) throws IOException;
37+
38+
/**
39+
* Write all the data added into this shuffle sorter into a file in the disk store. This is
40+
* called by the SortShuffleWriter and can go through an efficient path of just concatenating
41+
* binary files if we decided to avoid merge-sorting.
42+
*
43+
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
44+
* @param context a TaskContext for a running Spark task, for us to update shuffle metrics.
45+
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
46+
*/
47+
long[] writePartitionedFile(
48+
BlockId blockId,
49+
TaskContext context,
50+
File outputFile) throws IOException;
51+
52+
void stop() throws IOException;
53+
}

0 commit comments

Comments
 (0)