Skip to content

Conversation

@raofu
Copy link
Contributor

@raofu raofu commented Aug 20, 2018

What changes were proposed in this pull request?

[SPARK-25126] (https://issues.apache.org/jira/browse/SPARK-25126)
reports loading a large number of orc files consumes a lot of memory
in both 2.0 and 2.3. The issue is caused by creating a Reader for every
orc file in order to infer the schema.

In OrFileOperator.ReadSchema, a Reader is created for every file
although only the first valid one is used. This uses significant
amount of memory when there paths have a lot of files. In 2.3
a different code path (OrcUtils.readSchema) is used for inferring
schema for orc files. This commit changes both functions to create
Reader lazily.

How was this patch tested?

Pass the Jenkins with a newly added test case by @dongjoon-hyun

In OrFileOperator.ReadSchema, a Reader is created for every file
although only the first valid one is used. This uses significant
amount of memory when there `paths` have a lot of files. In 2.3
a different code path OrcUtils.readSchema is used for inferring
schema for orc files. This commit change both function to creat
Reader lazily.
Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK yeah I get what you're doing here. Looks OK.

@rxin
Copy link
Contributor

rxin commented Aug 20, 2018

Do we have a similar issue for Parquet?

@SparkQA
Copy link

SparkQA commented Aug 20, 2018

Test build #4282 has finished for PR 22157 at commit 5a86b36.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Do we have a similar issue for Parquet?

Looks not since we explicitly pick up one file before reading in schema inference:

// Tries any "_common_metadata" first. Parquet files written by old versions or Parquet
// don't have this.
filesByType.commonMetadata.headOption
// Falls back to "_metadata"
.orElse(filesByType.metadata.headOption)
// Summary file(s) not found, the Parquet file is either corrupted, or different part-
// files contain conflicting user defined metadata (two or more values are associated
// with a same key in different files). In either case, we fall back to any of the
// first part-file, and just assume all schemas are consistent.
.orElse(filesByType.data.headOption)
.toSeq

@SparkQA
Copy link

SparkQA commented Aug 21, 2018

Test build #4283 has finished for PR 22157 at commit 5a86b36.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Aug 22, 2018

The failure in OrcQuerySuite looks legitimate. It's because it corrupts the third file of three, then sets the reader to not ignore corrupt files, but never actually reads the third file now with this change. I think that might be a good thing. @dongjoon-hyun do you have an opinion?

@raofu
Copy link
Contributor Author

raofu commented Aug 22, 2018

I fixed the test by making the first file the corrupted file. @srowen, can you help kick off a Jenkins run?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Aug 22, 2018

Thank you for pinging me, @srowen .

@raofu Instead of changing the existing test coverage, we had better add additional test cases which all files are corrupted. Could you check raofu#1 ?

Also, could you update title prefix [SPARK-25126] -> [SPARK-25126][SQL] ?

@raofu raofu changed the title [SPARK-25126] Avoid creating Reader for all orc files [SPARK-25126][sql] Avoid creating Reader for all orc files Aug 22, 2018
@raofu raofu changed the title [SPARK-25126][sql] Avoid creating Reader for all orc files [SPARK-25126][SQL] Avoid creating Reader for all orc files Aug 22, 2018
@raofu
Copy link
Contributor Author

raofu commented Aug 22, 2018

@dongjoon-hyun Title updated. Thanks for adding the test coverage! I've merged your commit. Can you help kick off another Jenkins run? I don't think I have the permission to do it.

@dongjoon-hyun
Copy link
Member

Retest this please.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thanks, @raofu .

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Aug 22, 2018

Jenkins is usually retriggered when it detects the change. Maybe, it seems to be busy.

BTW, it seems to be your first contribution. Welcome! You need to update Github PR description like the other commits, e.g., 883f3af? Please put your description into the placeholder (Please fill in changes proposed in this fix) and put Pass the Jenkins with the updated test case instead of the followings placeholders.

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

@raofu
Copy link
Contributor Author

raofu commented Aug 22, 2018

@dongjoon-hyun, thanks lot for the pointers! I've update the PR description. Please let me know if there is any other information you'd like me to add.

@dongjoon-hyun
Copy link
Member

cc @gatorsmile and @cloud-fan .

@SparkQA
Copy link

SparkQA commented Aug 22, 2018

Test build #4285 has finished for PR 22157 at commit 9afdac6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

testIgnoreCorruptFiles()
}.getMessage
assert(m1.contains("Could not read footer for file"))
assert(m1.contains("Malformed ORC file"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the error message changed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is because of the behavior change #22157 (comment).

Previously Orc source reads the third file which is corrupt and throws the exception of could not read footer for file.

Now Orc source reads the first file for valid schema and skips other two files. When Orc source uses the schema to read the second Orc file, the schema is not consistent, so the exception of Malformed ORC file is thrown.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make sure we don't backport it ... then I think it's fine. I sounds rather a bug to read and validate all schemas (which is inconsistent with Parquet) where we only needs to pick up single file. I don't think we make a guarantee about the pinking order.

The possible behaviour change is when only read its schema. Previous code would throw an exception but after this PR it wouldn't.

The previous behaviour is something we should expect when mergeSchema option is implemented within ORC side as you guys talked below.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this take

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. It's reasonable.

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
Nit: Consider Changing the title to "Avoid scanning all ORC files for schema inference"

files.toIterator.map(file => readSchema(file.getPath, conf, ignoreCorruptFiles)).collectFirst {
case Some(schema) =>
logDebug(s"Reading schema from file $files, got Hive schema string: $schema")
CatalystSqlParser.parseDataType(schema.toString).asInstanceOf[StructType]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a behavior change.

Previously if there are corrupt files, once SQLConf.IGNORE_CORRUPT_FILES is false, Orc source will throw exception when reading those files.

Now if Orc source reads the first valid schema, it doesn't read other Orc files further. So the corrupt files are ignored when SQLConf.IGNORE_CORRUPT_FILES is false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I think we have to create a reader for each file when implementing schema merging like parquet, right?

Copy link
Member

@viirya viirya Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think so. But in Parquet, schema merging is done in parallel. So it won't create all readers at one place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya . The corrupt files are not ignored. Spark will throw SparkException while reading the content.

Now if Orc source reads the first valid schema, it doesn't read other Orc files further. So the corrupt files are ignored when SQLConf.IGNORE_CORRUPT_FILES is false.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is only ignored during reading schema.

The change is the timing when the corrupt files are detected. Now it is postponed to actually reading file contents.

That might not be a big deal, though in user experience it is better to throw such exception early.

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in 5d572fc Aug 23, 2018
@dongjoon-hyun
Copy link
Member

Thank you for merging, @HyukjinKwon . And, thank you for making a PR, @raofu .

otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
[SPARK-25126] (https://issues.apache.org/jira/browse/SPARK-25126)
reports loading a large number of orc files consumes a lot of memory
in both 2.0 and 2.3. The issue is caused by creating a Reader for every
orc file in order to infer the schema.

In OrFileOperator.ReadSchema, a Reader is created for every file
although only the first valid one is used. This uses significant
amount of memory when there `paths` have a lot of files. In 2.3
a different code path (OrcUtils.readSchema) is used for inferring
schema for orc files. This commit changes both functions to create
Reader lazily.

Pass the Jenkins with a newly added test case by dongjoon-hyun

Closes apache#22157 from raofu/SPARK-25126.

Lead-authored-by: Rao Fu <[email protected]>
Co-authored-by: Dongjoon Hyun <[email protected]>
Co-authored-by: Rao Fu <[email protected]>
Signed-off-by: hyukjinkwon <[email protected]>
(cherry picked from commit 5d572fc)

RB=1413528
BUG=LIHADOOP-40159
G=superfriends-reviewers
R=fli,mshen,yezhou,edlu
A=edlu
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants