Skip to content

Conversation

@ghoto
Copy link
Contributor

@ghoto ghoto commented May 9, 2017

What changes were proposed in this pull request?

The current implementation of computePrincipalComponentsAndExplainedVariance in RowMatrix for tall and fat matrices usually crashes as it computes the covariance matrix locally.

Instead in this patch I used the same RowMatrix.computeSVD which is already optimized for big matrices, to compute the Principal components and Explained Variance.

It's known that if a matrix X with mean µ and covariance (X - µ)'(X - µ)
(X - µ) can be decomposed with SVD such that

(X - µ) = USV'
and
(X - µ)' = VS'U'

V and U are orthonormal, therefore V'V = I and U'U = I.
cov = (X - µ)'(X -µ) = VS'U'USV' = VS'SV'
and
cov*V = V(S'S), with V the eigenvectors of covariance and S'S contains the eigenvalues

How was this patch tested?

This patch has been tested running the current RowMatrixSuite, mllib.PCASuite and ml.PCASuite, passing all the tests.

Also, this patch allowed to run PCA over a matrix 56k x 12k without crashing from OutOfMemory errors (not included in testing as it takes long time to execute and it's generated by a private dataset)

Please review http://spark.apache.org/contributing.html before opening a pull request.

ghoto added 2 commits May 8, 2017 19:25
The previous computePrincipalComponentsAndExplainedVariance() evaluates
the covariance matrix in a local breeze matrix causing OutOfMemory exceptions
for tall and fat matrices.
The decomposition of the matrix X-mean(X) provides the eigenvectors and eigenvalues
for the covariance matrix.

X = U S V'  // (1)
X' = V S'U'

X'X = V S'U'U S V'
X'X = V S'S V' // U'U = I
(X'X)V = V (S'S)(V'V)
(X'X)V = V (S'S) // V'V = I
A*V = V*lambda // if A=X'X, V is the eigenvector matrix of X'X and can be obtained from (1)
Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that this could be much slower for a moderately large number of columns, like 1000 or so, especially in the sparse case. This makes the representation dense and then does a distributed SVD when currently it's handled fairly efficiently locally.

Vectors.dense(Arrays.copyOfRange(explainedVariance, 0, k)))
// X' = X - µ
def subPairs = (vPair: (Double, Double)) => vPair._1 - vPair._2
def subMean = (v: Vector) => Vectors.dense(v.toArray.zip(mean.toArray).map(subPairs))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty inefficient way to subtract the mean, and it's going to make sparse data dense.

Copy link
Contributor Author

@ghoto ghoto May 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible to keep sparse vectors sparse if we center them to the origin. However, as your concern for efficiency I could try using a mllib.features.StandardScaler().fit(data).setWithMean(true).setWithVariance(false) on the data.


if (k == n) {
(Matrices.dense(n, k, u.data), Vectors.dense(explainedVariance))
// Check matrix is standarized with mean 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scaladoc comments are no longer consistent with the impl.

@ghoto
Copy link
Contributor Author

ghoto commented May 9, 2017

My understanding is that the RowMatrix computes the SVD locally, when the data allows performance improvement, otherwise it does in a distributed fashion. Then, the suggested implementation NOT always relies on a distributed SVD, and sometimes it can be computed locally depending on the data.

It's to my knowledge that the current implementation can't handle PCA on tall&fat as it soon goes to OutOfMemory as it tries to compute the PC from eigenvector decomposition on X'X using Breeze SVD.

Moreover the SVD for PCA is preferred over X'X eigenvector decomposition for numerical reasons (https://math.stackexchange.com/questions/359397/why-svd-on-x-is-preferred-to-eigendecomposition-of-xx-top-in-pca), the SVD PCA in the current implementation wouldn't crash as easily as computing locally the eigenvectors of X'X.

This implementation might be a bit slower than the current (not verified), but it adds much more stability.

From https://spark.apache.org/docs/2.1.0/mllib-dimensionality-reduction.html

If n is small (n<100) or k is large compared with n (k>n/2), we compute the Gramian matrix first and then compute its top eigenvalues and eigenvectors locally on the driver. This requires a single pass with O(n2) storage on each executor and on the driver, and O(n2k)
time on the driver.
Otherwise, we compute (ATA)v
in a distributive way and send it to ARPACK to compute (ATA)’s top eigenvalues and eigenvectors on the driver node. This requires O(k) passes, O(n) storage on each executor, and O(nk) storage on the driver.

@srowen
Copy link
Member

srowen commented May 10, 2017

Yes, it is likely more accurate to not base the PCA on the Gramian. However it's probably going to be more efficient than what the SVD method does even when operating locally. If this change makes other cases very slow, that could be just as bad. However getting it to work with more than ~65535 columns is of course a good thing.

How much memory does your driver have? at the size you're computing, the matrix should only take < 1GB of memory even with some overhead. This isn't large enough to run out of memory, assuming you've given your driver more than the default amount of memory.

The real question is scaling past 65535, which isn't possible no matter what the heap size here. But then the question is what happens in the regime of thousands of columns -- your change may make it a lot slower when it's pretty reasonable to compute locally.

It could be that a threshold is needed here -- above some size, it's probably better to distribute vs compute the Gramian locally, but we don't know what that scale is here.

@ghoto
Copy link
Contributor Author

ghoto commented May 11, 2017

With classic Spark PCA, approx. 55Kx15K matrix and 10GB in driver I go out of memory. I chopped the matrix to be 55Kx3K and I can get the PCA. With the SVD distributed approach I could compute PCA with the original matrix and it took about 7 minutes to complete training.

I think as well that if local approach is faster for small to medium size matrices, we should be allowed to set a threshold, to therefore chose the PCA computation.

On the other side, I'm working on my spare time on implementing a Probabilistic PCA with EM. That should scale pretty well and converges pretty fast. Moreover, some flavors allow to have missing values in the matrix. But this is a separate business.

@srowen
Copy link
Member

srowen commented May 11, 2017

Hm, that sounds like a whole lot more memory being used than I'd imagine. How are you running the driver, and is it certainly the driver that runs out of memory? do you have timings for local vs distributed for a scale that works in both cases?

@ghoto
Copy link
Contributor Author

ghoto commented May 11, 2017

Originally the driver was set to 3GB, but since I was having this OutOfMemory in the driver I decided to give a try and increase the size.

For other benchmarks, I think I need more time.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@srowen srowen mentioned this pull request Jul 3, 2018
@asfgit asfgit closed this in 5bf95f2 Jul 4, 2018
@sammysheep
Copy link

@ghoto @srowen

  • One can increase executor, driver and spark.driver.maxResultSize limits and you'll still get these OOM errors using very wide matrices with PCA. It appears to be related to Java byte array limitations. This JIRA addressed some 2G limits, but returning large results to the driver was explicitly not addressed.
  • R has two routines, prcomp and princomp that do PCA with SVD and eigenvalue decomposition respectively. Incidentally the R authors also prefer SVD for numerical reasons. However, having both methods could be advantageous as discussed. One could add an "algorithm" argument that would let the user select the algorithm in the PCA constructor, otherwise defaulting to the legacy method.
  • In certain disciplines, like bioinformatics, wide matrices are very common. They are also common if you only have a distance matrix and can't get a skinny set of observations. Current Spark (2.4.4) PCA on such matrices will be 100% inefficient since they won't work without a fix.

zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 22, 2025
Closes apache#20932
Closes apache#17843
Closes apache#13477
Closes apache#14291
Closes apache#20919
Closes apache#17907
Closes apache#18766
Closes apache#20809
Closes apache#8849
Closes apache#21076
Closes apache#21507
Closes apache#21336
Closes apache#21681
Closes apache#21691

Author: Sean Owen <[email protected]>

Closes apache#21708 from srowen/CloseStalePRs.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants