Suppose you are an avid R user, and you would like to use SparkR in Cloudera Hadoop; unfortunately, as of the latest CDH version (5.7), SparkR is still not supported (and, according to a recent discussion in the Cloudera forums, we shouldn’t expect this to happen anytime soon). Is there anything you can do?
Well, indeed there is. In this post we will demonstrate how to use SparkR in a Cloudera Hadoop cluster. The assumptions are the following:
- A Cloudera Hadoop cluster, with R installed in all worker nodes
- A gateway node, through which you connect to the cluster to submit jobs, and in which you naturally have a user account (i.e. no superuser privileges needed)
Step 1: download Spark locally
The first step is to download Spark locally in your gateway home folder; this is very simple actually, and I have provided detailed instructions elsewhere. The best part? You don’t need to download a Spark version that matches the version in your CDH distribution; in our cluster, the CDH version is 5.6, which comes with Spark 1.5.0, while locally I have downloaded Spark 1.6.1, prebuilt for Hadoop 2.6. Here is my local home directory in my gateway:
[ctsats@dev-hd-01 ~]$ ll total 282640 drwxrwxr-x 2 ctsats ctsats 52 Mar 1 18:16 kaggle drwxr-xr-x. 3 ctsats ctsats 91 Apr 12 18:30 R drwxrwxr-x 2 ctsats ctsats 25 Feb 5 13:21 scripts drwxr-xr-x 12 ctsats ctsats 4096 Feb 27 07:02 spark-1.6.1-bin-hadoop2.6 -rw-rw-r-- 1 ctsats ctsats 289405702 Apr 11 12:55 spark-1.6.1-bin-hadoop2.6.tgz -rw-rw-r--. 1 ctsats ctsats 90 Feb 3 14:29 test.data
Step 2: run SparkR scripts locally from RStudio
Next, we will run a SparkR script locally from an RStudio session. Here is a simple example script, reading a CSV file from HDFS and printing its first elements (detailed explanations below):
Sys.setenv(HADOOP_CONF_DIR='/etc/hadoop/conf.cloudera.hdfs') Sys.setenv(YARN_CONF_DIR='/etc/hadoop/conf.cloudera.yarn') library(SparkR, lib.loc = "/home/ctsats/spark-1.6.1-bin-hadoop2.6/R/lib") library(magrittr) sc <- sparkR.init(sparkHome = "/home/ctsats/spark-1.6.1-bin-hadoop2.6", sparkPackages="com.databricks:spark-csv_2.10:1.4.0") sqlContext <- sparkRSQL.init(sc) df <- read.df(sqlContext, 'recsys/data/interactions_clean.csv', "com.databricks.spark.csv", header="true") df %>% head %>% print sparkR.stop()
Let’s see the script line by line:
- In lines 1 & 2, we set the required Hadoop environment variables (you can obtain them from your cluster administrator)
- In line 4, we load the
SparkR
package, providing its location in our local machine - Spark context is initialized in lines 7 & 8. In line 7, we provide the sparkHome directory, while in line 8 we include any additional Spark packages with the sparkPackages argument (in our case, the
spark-csv
external package) - In line 10, we initialize the
sqlContext
, necessary because we are going to use Spark dataframes - In line 12, we read the CSV file interactions_clean.csv (which includes a header) from the HDFS directory recsys/data/ using the
spark-csv
package. The result is a Spark dataframedf
. - In line 13, we use the pipe operators from the R package
magrittr
(loaded in line 5), to print out the first elements of our Spark dataframe (including its header). - Finally, in line 15 we stop the Spark context.
We can run the script from RStudio, either the whole of it or line by line in an interactive manner. We can also save it (sparkr-test.R) and run it from the command line:
[ctsats@dev-hd-01 ~]$ Rscript 'R/sparkr_test.R' [...] user_id item_id interaction_type 1 7 1006839 1 2 9 944146 3 3 9 1053485 1 4 13 2444782 1 5 23 501722 1 6 23 1305844 1 [...]
So far so good. But you may be thinking “OK, this is nothing new; you have just downloaded again Spark locally, and are using this to run SparkR. We already knew this, so what is the fuss about?“
So, here comes the really nice stuff…
Step 3: submit SparkR applications to YARN
It turns out that you can indeed submit SparkR applications to YARN, despite the fact that SparkR is not available in your (Cloudera) Hadoop cluster. Here is how.
First, we need to make some changes to our script above; here is the new script, saved as sparkr-submit_test.R:
library(SparkR, lib.loc = "/home/ctsats/spark-1.6.1-bin-hadoop2.6/R/lib") library(magrittr) sc <- sparkR.init(appName = 'SparkR-submit-test', sparkEnvir = list(spark.executor.cores='2', # STRINGS here for spark-submit!!! spark.executor.instances='12')) sqlContext <- sparkRSQL.init(sc) df <- read.df(sqlContext, 'recsys/data/interactions_clean.csv', "com.databricks.spark.csv", header="true") df %>% head %>% print sparkR.stop()
The changes in our new script are:
- We don’t set the environment variables
HADOOP_CONF_DIR
andYARN_CONF_DIR
in-script - We also don’t declare any additional Spark packages, like spark-csv
- We include further configuration settings in our Spark context (lines 4-6)
Now, let’s try to submit this script to YARN; to do so, we have to:
- Set the necessary environment variables from the terminal
- Change our working directory to where our local Spark executables are, i.e. spark-1.6.1-bin-hadoop2.6/bin in our case
- Precede the
spark-submit
command with./
- Include the spark-csv package in the command line
Here is the procedure and the results:
[ctsats@dev-hd-01 ~]$ export HADOOP_CONF_DIR=/etc/hadoop/conf.cloudera.hdfs [ctsats@dev-hd-01 ~]$ export YARN_CONF_DIR=/etc/hadoop/conf.cloudera.yarn [ctsats@dev-hd-01 ~]$ cd spark-1.6.1-bin-hadoop2.6/bin/ [ctsats@dev-hd-01 bin]$ ./spark-submit --master yarn-client --driver-memory 2g --packages com.databricks:spark-csv_2.10:1.4.0 ~/R/sparkr-submit_test.R [...] 16/04/22 19:20:04 INFO spark.SparkContext: Running Spark version 1.6.1 [...] user_id item_id interaction_type 1 7 1006839 1 2 9 944146 3 3 9 1053485 1 4 13 2444782 1 5 23 501722 1 6 23 1305844 1
As we can see, not only the job is submitted to YARN and executed successfully, but it also used Spark 1.6.1 (highlighted line above), and not the existing Spark version in our Cloudera Hadoop cluster (1.5.0).
Notice also that neither SparkR
nor magrittr
R packages are installed in our cluster workers, the only requirement for them being an R installation.
Finally, we notice that we can use a similar approach to run other Spark components (e.g. PySpark jobs) in a Spark version newer than the one shipped with our Cloudera Hadoop version, which might also come handy (for instance, I have found a significant performance boost in some MLlib algorithms of Spark 1.6.1 compared to Spark 1.5.0).
Conclusions
We have presented a simple and straightforward way for running SparkR applications in a Cloudera Hadoop cluster, both locally and in YARN. Apart from R being present to the worker nodes, no modification to the cluster software is required, neither superuser privileges in the local working client machine (gateway). The approach demonstrated can also be used to “bypass” the Spark version shipped with Cloudera Hadoop (which usually lags behind the latest one) with a newer one, if necessary.
- Streaming data from Raspberry Pi to Oracle NoSQL via Node-RED - February 13, 2017
- Dynamically switch Keras backend in Jupyter notebooks - January 10, 2017
- sparklyr: a test drive on YARN - November 7, 2016
Thanks Christos for this. Would you be able to share inputs on how the setup would work against a secure (kerberos enabled) CDH cluster? Also, wondering how jobs developed on a local RStudio IDE on Windows can be pushed down to a remote YARN RM running on a Red Hat cluster.
Hi spary. I have yet to try this with a Kerberos-enabled cluster. As for Windows, I’ll probably try it next week – I’ll let you know here.
Hi Christos, thanks for your sharing. Do you have any update for Windows one?
Unfortunately no, and no plans for the near future either…
Hi Christos, thanks for your sharing.
Would you like to share how to connect to hive and do some SQL query through sparkR? Thanks.
let me get this clear – you’ve to install r on all nodes of your cluster – for any non-trivial installation that’s more than you can count on your fingers!
Cannot see your point – this is clearly mentioned in the post assumptions.
Thanks Christos for inspiration.
Your recipe works with SparkR-2.0 too, with little adjusts, follow bellow.
cloudera –> 2.6.0-cdh5.7.0, rc00978c67b0d3fe9f3b896b5030741bd40bf541a
bin/spark-submit –master yarn-client –driver-memory 1g –num-executors 5 –packages com.databricks:spark-avro_2.11:3.0.0 ../sparkr-submit_test2.0.R
===================
sparkr-submit_test2.0.R
==================
library(SparkR, lib.loc = “/home/admnet/spark-2.0.0-bin-hadoop2.6/R/lib”)
library(magrittr)
sparkR.session(appName = ‘SparkR-submit-test’, sparkEnvir = list(spark.executor.cores=’4′, spark.executor.instances=’4′))
df % head %>% print
createOrReplaceTempView(df, “voos”)
airport <- sql("select Year,Dest, count(*) as cnt from voos group by Year, Dest order by Dest, cnt desc")
result <- head(airport,15000)
write.csv(result,"airport9.agg")
Using a RITA dataset from U.S. Air Traffic
Nice lab!
Cool Luiz, thanks!
My only (secondary) remark is that you seem to set the number of executors both in-script and in the command line (the latter setting will override the former).
Yes,
Exact I’m trying to discover this, while increasing node number until 5, where is limit of my ClusterToy.
Thanks once more.
So, this works for me in local mode using Rscript. In yarn-cluster mode, it won’t work because the R task is trying to access the SparkR library which is only on my client. In yarn-client mode, I’m getting an exception from Spark, “Failed to connect to driver!” Looks like my worker node is trying to connect back to my client (driver) on port 53211. Is that normal? Any idea why I can’t connect?
Hi,
I received this error when trying to run the stuff above :
Error: sparkr.zip does not exist for R application in YARN mode.
Do you know how to fix this?
hi bob,
i face this same error on spark2. were you able to resolve it?
thanks.
Hey guys,
Did you follow Step 1? It is necessary, as Cloudera distributions will not include sparkr.zip…
Yes Christos, That solved the problem.
Glad to hear it 🙂
Enviroment variables are set ?
export YARN_CONF_DIR=”/etc/hadoop/conf”
export HADOOP_CONF_DIR=”/etc/hadoop/conf”
Hi Christos,
Any limitations you are aware of? I am very Hadoop savy person, but I like to know if this allows parallelization of ANY R analysis (excluding those impossible cases!).
Thanks
hello,thank you,i use this way test my R script, but i found it’s use yarn-client mode when i submit my R script.
even i use ‘spark-submit –master yarn –deploy-mode cluster test.R’. then it’s still use yarn-client mode. Would you give me some advices?
Thank you very much, I try this again. And then i find that it is work well on yarn cluster mode. hahaa