Surprisingly enough, although the relatively new Spark ML library (not to be confused with Spark MLlib) includes a method for principal components analysis (PCA), there is no way to extract some very useful information regarding the PCA transformation, namely the resulting eigenvalues (check the Python API documentation); and, without the eigenvalues, one cannot compute the proportion of variance explained (PVE), which is a key diagnostic when utilizing PCA for dimensionality reduction. Simply speaking, the proportion of variance explained by the k first principal components is the ratio of the sum of the k first eigenvalues (sorted in decreasing order) to the sum of all eigenvalues (see here for the mathematical details); and this information, which is crucial during the analysis, is unfortunately not available through the Spark ML API (at least as of version 1.5.1).
Having realized the issue due to a question in Stack Overflow (which I answered), I expose here my workaround solution for the Python API of Spark (Pyspark). Let me notice that the solution is efficient and operational, since it takes fully into account the relevant Spark programming patterns (e.g. no collect()
statements), so you can hopefully include it in your projects as-is, or with slight only modifications. It has been developed and tested with Spark 1.5.0 & 1.5.1.
The input to our pca
procedure consists of a Spark dataframe, which includes a column named features containing the features as DenseVectors. The reason for this particular choice is the compatibility with the format preferred in the existing Spark ML PCA implementation; nevertheless, it can be easily modified so as to be suitable to different input formats.
In order to construct a pca
procedure, we must first define an intermediate function for computing the covariance matrix, as follows:
import numpy as np def estimateCovariance(df): """Compute the covariance matrix for a given dataframe. Note: The multi-dimensional covariance array should be calculated using outer products. Don't forget to normalize the data by first subtracting the mean. Args: df: A Spark dataframe with a column named 'features', which (column) consists of DenseVectors. Returns: np.ndarray: A multi-dimensional array where the number of rows and columns both equal the length of the arrays in the input dataframe. """ m = df.select(df['features']).map(lambda x: x[0]).mean() dfZeroMean = df.select(df['features']).map(lambda x: x[0]).map(lambda x: x-m) # subtract the mean return dfZeroMean.map(lambda x: np.outer(x,x)).sum()/df.count()
The above function returns the rectangular covariance matrix of the input variables, after having first normalized them by subtracting their respective mean values (this is a standard procedure when computing the PCA and, as we will see below, it is also included in the existing implementation, although not mentioned in the documentation).
Now, we can write our main pca
function, as follows:
from numpy.linalg import eigh from pyspark.mllib.linalg import * def pca(df, k=2): """Computes the top `k` principal components, corresponding scores, and all eigenvalues. Note: All eigenvalues should be returned in sorted order (largest to smallest). `eigh` returns each eigenvectors as a column. This function should also return eigenvectors as columns. Args: df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors. k (int): The number of principal components to return. Returns: tuple of (np.ndarray, DF of DenseVector, np.ndarray): A tuple of (eigenvectors, DF of scores, eigenvalues). Eigenvectors is a multi-dimensional array where the number of rows equals the length of the arrays in the input `DF` and the number of columns equals `k`. The `DF` of scores has the same number of rows as `df` and consists of DenseVectors of length `k`. Eigenvalues is an array of length d (the number of features). """ cov = estimateCovariance(df) col = cov.shape[1] eigVals, eigVecs = eigh(cov) inds = np.argsort(eigVals) eigVecs = eigVecs.T[inds[-1:-(col+1):-1]] components = eigVecs[0:k] eigVals = eigVals[inds[-1:-(col+1):-1]] # sort eigenvalues score = df.select(df['features']).map(lambda x: x[0]).map(lambda x: np.dot(x, components.T) ) scoreDF = sqlContext.createDataFrame(score.map(lambda x: (DenseVector(x),)), ['pca_features']) # Return the `k` principal components, `k` scores, and all eigenvalues return components.T, scoreDF, eigVals
Testing
First, let’s have at look at the results provided by the existing implementation:
>>> pyspark [...] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.5.1 /_/ Using Python version 2.7.6 (default, Mar 22 2014 22:59:38) SparkContext available as sc, HiveContext available as sqlContext. >>> from pyspark.ml.feature import PCA >>> from pyspark.mllib.linalg import * >>> data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),), ... (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),), ... (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)] >>> df = sqlContext.createDataFrame(data,["features"]) >>> pca_extracted = PCA(k=2, inputCol="features", outputCol="pca_features") >>> model = pca_extracted.fit(df) >>> df_pca = model.transform(df) >>> df_pca.collect() [Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0]), pca_features=DenseVector([1.6486, -4.0133])), Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0]), pca_features=DenseVector([-4.6451, -1.1168])), Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]), pca_features=DenseVector([-6.4289, -5.338]))]
Now, the resulting score (PCA features) using our implementation is:
>>> comp, score, eigVals = pca(df) >>> score.collect() [Row(pca_features=DenseVector([1.6486, 4.0133])), Row(pca_features=DenseVector([-4.6451, 1.1168])), Row(pca_features=DenseVector([-6.4289, 5.338]))]
Notice that the signs of our second column are all opposite from the ones derived by the existing method; but this is not an issue: according to the (freely downloadable & highly recommended) An Introduction to Statistical Learning, co-authored by Hastie & Tibshirani, p. 382,
Each principal component loading vector is unique, up to a sign flip. This means that two different software packages will yield the same principal component loading vectors, although the signs of those loading vectors may differ. The signs may differ because each principal component loading vector specifies a direction in p-dimensional space: flipping the sign has no effect as the direction does not change. […] Similarly, the score vectors are unique up to a sign flip, since the variance of Z is the same as the variance of −Z.
Our solution has an additional advantage over the existing implementation: the latter unnecessarily binds together in a dataframe both the existing features and the PCA scores, a not so wise choice, especially in a real-world situation, where we may have dataset records in the tens of millions; in such a situation, carrying around dataframes with all these columns seems an unnecessary waste of resources.
Proportion of variance explained
Now that we have our eigenvalues available, it is trivial to write a function for the proportion of variance explained:
def varianceExplained(df, k=1): """Calculate the fraction of variance explained by the top `k` eigenvectors. Args: df: A Spark dataframe with a 'features' column, which (column) consists of DenseVectors. k: The number of principal components to consider. Returns: float: A number between 0 and 1 representing the percentage of variance explained by the top `k` eigenvectors. """ eigenvalues = pca(df,k)[2] return sum(eigenvalues[0:k])/sum(eigenvalues)
Applying it to our example data, we get
>>> varianceExplained(df,1) 0.79439325322305299 >>> varianceExplained(df,2) 1.0
Having the 100% of variance explained with only 2 principal components (out of 5 in total) is not strange, if we look at the eigenvalues:
>>> eigVals array([ 1.20041647e+01, 3.10694640e+00, 1.12683068e-15, -3.63127567e-16, -8.99228389e-16])
i.e. the first 2 eigenvalues are much bigger (15 orders of magnitude) than the remaining 3 ones.
Summary
We have presented methods to amend some missing functionality when performing PCA in Spark 1.5; specifically, with the methods presented we are able to:
- explicitly provide the eigenvalues resulting from the PCA decomposition
- estimate the proportion of variance explained
Additionally, our methods result in a lighter dataframe carrying only the PCA scores, instead of binding them with the original features (which sounds terribly impractical).
Since being able to compute the proportion of variance explained is of primary importance during PCA dimensionality reduction, we would strongly suggest that this functionality is included in future versions of Spark.
[EDIT Nov 6: see this Stack Overflow question for how to implement this functionality in Scala; I have also opened an issue at Spark JIRA: SPARK-11530]
[EDIT Dec 14: as a result of the JIRA issue I opened, the issue has now been resolved, and it will be included in Spark 2.0]
- Streaming data from Raspberry Pi to Oracle NoSQL via Node-RED - February 13, 2017
- Dynamically switch Keras backend in Jupyter notebooks - January 10, 2017
- sparklyr: a test drive on YARN - November 7, 2016
Do the Sprak MLlib of PCA fit the R language?
At least up to the current version of Spark (1.6.1), Spark’s R API (SparkR) does not include any functionality for PCA or SVD – check the documentation here: https://spark.apache.org/docs/1.6.1/api/R/index.html
Yeah , I had already checked it and found nothing for PCA, So i guess that the SparkR do not support PCA currently, Thanks for your explain.
How the eigen vectors from different nodes are ensemble into the final set of eigen vectors as it is distributed framework and different nodes results in different set of eigen values?
Your question has to do with the internals of Spark and its resilient distributed dataset (RDD) concept, and it is far beyond the scope of this post – I suggest you look at the Spark source code at Github
Hi Christos, Thanks for this post, I find it very useful and it, indeed, caused added functionality in the Spark 2.0.0. At the beginning of the post you mentioned the data type “can be easily modified so as to be suitable to different input formats.” Could you please elaborate more on this or give me some references, as I cannot find a workaround for this – keep getting the error “Input column features must be a vector colum” when running PCA on my custom DataFrame with Spark 1.6.1. If the solution for this is not very straightforward (which I tend… Read more »
Thank you Rajko,
The general idea is simply that you can modify the estimateCovariance function along with the score computation in pca, so that the df variable conforms with what you have in hand – nothing more specific, I am afraid…