Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
miao
  • Loading branch information
CodingCat committed Jun 3, 2014
commit 7930f83f7ec448b58980f2b9fc73b4dc015ab9e4
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
Expand Down Expand Up @@ -756,7 +756,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")

if (conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey the fact that self.conf and conf have the same name is pretty bad and it could easily lead to issues down the road. I realize it's not part of your patch, but would you mind changing the input to be called hadoopConf so that there is no overloading?

def saveAsNewAPIHadoopDataset(hadoopConf: Configuration) {

hadoopConf: JobConf = new JobConf(self.context.hadoopConfiguration),

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually maybe that can be in a separate patch. I can merge this and we can add it later.

outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(conf)
Expand Down