Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ class SparkHadoopUtil extends Logging {
if (key.startsWith("spark.hadoop.")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment above does not apply for the whole loop anymore and should be moved to this if statement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
// Copy any "fs.swift2d.foo=bar" properties into conf as "fs.swift2d.foo=bar"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why filtering, what if we make it a little more generic and pass all properties as is if they are not spark.hadoop.* ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intention to filter out Swift and S3 specific properties and add them to new Hadoop conf is just to make sure that my changes are not impacting existing spark behavior for Hadoop interaction and add more functionality for S3 and swift. SparkHadoopUtil.get.conf is used many other places for reading data specifically from Hadoop, if user updates some config for Hadoop and unintentionally he may cause some disruptive behavior. I wanted to keep the impact of changes as minimum as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may want to add fs.wasb for azure on Hadoop 2.7+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats nice suggestion we add configs for azure also. I am not familiar with azure do you have a sample code that to understand and run and test

else if (key.startsWith("fs.swift2d")){
hadoopConf.set(key, value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's swift2d? It's not the swift client in hadoop-openstack, which is fs.swift

Copy link
Contributor Author

@agsachin agsachin Aug 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

swift2d is used when u are using Stocator --> https://github.com/SparkTC/stocator. now I have updated for hadoop-openstack also

}
// Copy any "fs.s3x.foo=bar" properties into conf as "fs.s3x.foo=bar"
else if (key.startsWith("fs.s3")){
hadoopConf.set(key, value)
}
Copy link
Contributor

@steveloughran steveloughran Sep 13, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s3 is the AWS EMR filesystem, but an obsolete one on ASF Hadoop. I would recommend the list of

s3, s3n, s3a, swift, adl, wasb, oss, gs

(edited 10/oct, set list in sync with what I believe is current set)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I would copy everything under fs.s3a, fs.s3, etc. Why? there's a lot more than passwords: for s3a we include: proxy info and passwords, a list of alternate s3 auth mechanisms (e.g. declaring using IAM), etc, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steveloughran can u help me with some default setting for adl, wasb, oss, gs to test or just syntax for them so that i can decide on filter conditions

}
val bufferSize = conf.get("spark.buffer.size", "65536")
hadoopConf.set("io.file.buffer.size", bufferSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
case PhysicalOperation(projects, filters, l @ LogicalRelation(t: HadoopFsRelation, _)) =>
// See buildPartitionedTableScan for the reason that we need to create a shard
// broadcast HadoopConf.
val sharedHadoopConf = SparkHadoopUtil.get.conf
// fix added for SPARK-13979
// val sharedHadoopConf = SparkHadoopUtil.get.conf
val sharedHadoopConf = t.sqlContext.sparkContext.hadoopConfiguration
val confBroadcast =
t.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
pruneFilterProject(
Expand Down Expand Up @@ -156,7 +158,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

// Because we are creating one RDD per partition, we need to have a shared HadoopConf.
// Otherwise, the cost of broadcasting HadoopConf in every RDD will be high.
val sharedHadoopConf = SparkHadoopUtil.get.conf
// fix added for SPARK-13979
// val sharedHadoopConf = SparkHadoopUtil.get.conf
val sharedHadoopConf = relation.sqlContext.sparkContext.hadoopConfiguration
val confBroadcast =
relation.sqlContext.sparkContext.broadcast(new SerializableConfiguration(sharedHadoopConf))
val partitionColumnNames = partitionColumns.fieldNames.toSet
Expand Down