-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-29461][SQL] Measure the number of records being updated for JDBC writer #26109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| assert(expected === runAndReturnMetrics(job, _.taskMetrics.outputMetrics.recordsWritten)) | ||
| } | ||
|
|
||
| private def runAndReturnMetrics(job: => Unit, collector: (SparkListenerTaskEnd) => Long): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is copied from InputOutputMetricsSuite - please let me know if it should be extracted with some utility class/object.
|
Test build #112011 has finished for PR 26109 at commit
|
|
Test build #112020 has finished for PR 26109 at commit
|
|
cc. @maropu @cloud-fan @wangyum initially according to commit history. |
|
Test build #112061 has finished for PR 26109 at commit
|
| dialect: JdbcDialect, | ||
| isolationLevel: Int, | ||
| options: JDBCOptions): Iterator[Byte] = { | ||
| options: JDBCOptions): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to update this instead of updating the metric inside this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it would be cleaner to handle the metric outside of the method, as it will not update metric if savePartition throws exception. We should add the metrics update logic to the end of finally statement which doesn't seem to be cleaner if we want to do the same but inside savePartition.
In other words, this approach doesn't support iterative updates on metric, as well as no update on partially written and failed. It would totally make sense to not update if it supports transaction, but if it doesn't support transaction and it leaves some records on failure, I'm not sure we should update the metric. How we are dealing with partial output?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've just revisited SparkHadoopWriter and realized it just updates the metric regardless of task success or not. Got it. I'll include metric update into savePartition method. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I though we took care of the metric only if transaction committed like this;
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
if (supportsTransactions) {
conn.rollback()
}
conn.close()
} else {
// If the transaction committed, updates the metric
outputMetrics.setRecordsWritten(recordsWritten)
// The stage must succeed. We cannot propagate any exception close() might throw.
try {
conn.close()
} catch {
case e: Exception => logWarning("Transaction succeeded, but closing failed", e)
}
}
cc: @HyukjinKwon @wangyum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah but looks like SparkHadoopWriter just updates the metric for any output being written - maybe that's because of nonexistence of transaction. If we take transaction into account, it would make sense to only update metric when the transaction is committed, but we might also want to update metric when both committed and supportsTransactions are false to reflect metric for dirty outputs. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that looks reasonable to me. So, can you brush up the code based on that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK let me update the patch. Thanks!
| sparkContext.removeSparkListener(listener) | ||
| taskMetrics.sum | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this blank.
| } | ||
| if (rowCount > 0) { | ||
| stmt.executeBatch() | ||
| totalUpdatedRows += stmt.executeBatch().sum |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot just sum up rowCont?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took the approach to ensure we only count for actual updates, but not sure how Spark has been doing for others. Same for number of bytes written. I was actually asked to update number of bytes written as well, but there's no way to get the actual value from JDBC, so skipped it.
Please let me know how Spark has been updating these metrics - I'll follow the approach. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkHadoopWriter uses a row count as the metric:
| recordsWritten += 1 |
Since the returned values of stmt.executeBatch seems to be JDBC implementation specific, IMHO its ok to just do the same with SparkHadoopWriter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. Thanks for the guide! What about number of bytes? Reading the length of file is easy, but measuring the size of row for every rows seems nontrivial.
| val totalUpdatedRows = savePartition( | ||
| getConnection, table, iterator, rddSchema, insertStmt, batchSize, dialect, isolationLevel, | ||
| options) | ||
| outMetrics.setRecordsWritten(outMetrics.recordsWritten + totalUpdatedRows) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
outMetrics.setRecordsWritten(totalUpdatedRows)?
|
Test build #112536 has finished for PR 26109 at commit
|
|
Test build #112582 has finished for PR 26109 at commit
|
|
retest this, please |
|
Test build #112595 has finished for PR 26109 at commit
|
| i = i + 1 | ||
| } | ||
| stmt.addBatch() | ||
| rowCount += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We cannot move rowCount outside try then just use it for the metric?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's used to determine whether it needs one more flush or not at the end of iterating. It can just be a boolean flag, but we should have one specific variable for taking this into account anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, I see.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you leave some comments somewhere about the policy to collect metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added it. 6e908d1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
|
cc: @HyukjinKwon |
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for cc'ing me @maropu. Looks good to me too
|
Test build #112634 has finished for PR 26109 at commit
|
|
Thanks, @HeartSaVioR and @HyukjinKwon ! Merged to master. |
|
Thanks all for reviewing and merging! |
### What changes were proposed in this pull request? Fix JDBC metrics counter data type. Related pull request [26109](#26109). ### Why are the changes needed? Avoid overflow. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Exists UT. Closes #26346 from ulysses-you/SPARK-29687. Authored-by: ulysses <[email protected]> Signed-off-by: Takeshi Yamamuro <[email protected]>
What changes were proposed in this pull request?
This patch adds the functionality to measure records being written for JDBC writer. In reality, the value is meant to be a number of records being updated from queries, as per JDBC spec it will return updated count.
Why are the changes needed?
Output metrics for JDBC writer are missing now. The value of "bytesWritten" is also missing, but we can't measure it from JDBC API.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Unit test added.