analytics
Step by Step Guide to Installing and Configuring Spark 2.0 to Connect with Cassandra 3.x
1/06/2017
In this guide, we will be installing Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector library on the client program. If you have any of these software packages installed and configured already you can skip that step. This guide assumes you have a Cassandra 3.x cluster that is already up and running. For more information on installing and using Cassandra visit http://cassandra.apache.org/.Note: The following steps should be performed on all the nodes in the cluster unless otherwise noted.
Install Scala 2.11.8
In standalone mode, Spark will have a master node (which is the cluster manager) and worker nodes. You should select one of the nodes in your cluster to be the master. Then on every worker node you must edit the
If you are writing ad-hoc queries / computations from the
If you are writing a Scala application; configure a new Scala project. Your
Install Scala 2.11
Ensure you have Java installedIf you don't have Java installed follow this tutorial to get Java 8 installed.$ java -version java version "1.8.0_91" Java(TM) SE Runtime Environment (build 1.8.0_91-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
Install Scala 2.11.8
$ wget www.scala-lang.org/files/archive/scala-2.11.8.deb $ sudo dpkg -i scala-2.11.8.deb $ scala -version
Install SBT 0.13
$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list $ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823 $ sudo apt-get update $ sudo apt-get install sbt
Install Spark 2.0
Download Spark 2.0 from https://spark.apache.org/downloads.html and unpack the tar fileUpdate system variables$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz $ tar zxf spark-2.0.2-bin-hadoop2.7.tgz $ sudo mv spark-2.0.2-bin-hadoop2.7/ /usr/local/spark/
Add an environment variable called SPARK_HOME$ sudo nano /etc/environment
At the end of the PATH variable addexport SPARK_HOME=/usr/local/spark
$SPARK_HOME/bin
Refresh the environmentPATH="<previous_entries>:/usr/local/spark/bin"
Create spark user and make it the owner of the SPARK_HOME directory$ source /etc/environment
Create Log and Pid directories$ sudo adduser spark --system --home /usr/local/spark/ --disabled-password $ sudo chown -R spark:root /usr/local/spark
$ sudo mkdir /var/log/spark $ sudo chown spark:root /var/log/spark $ sudo -u spark mkdir $SPARK_HOME/run
Create the Spark Configuration files
Create the Spark Configuration files by copying the templatesEdit the Spark Environment file$ sudo cp /usr/local/spark/conf/spark-env.sh.template /usr/local/spark/conf/spark-env.sh $ sudo cp /usr/local/spark/conf/spark-defaults.conf.template /usr/local/spark/conf/spark-defaults.conf $ sudo chown spark:root /usr/local/spark/conf/spark-*
spark-env.sh
export SPARK_LOG_DIR=/var/log/spark export SPARK_PID_DIR=${SPARK_HOME}/run
Configure Spark nodes to join cluster
If you will not be managing Spark using the Mesos or YARN cluster managers, you'll be running Spark in what is called "Standalone Mode".In standalone mode, Spark will have a master node (which is the cluster manager) and worker nodes. You should select one of the nodes in your cluster to be the master. Then on every worker node you must edit the
/etc/spark/conf/spark-env.sh
to point to the host where the Spark Master runs.You can also change other elements of the default configuration by editing the# Options for the daemons used in the standalone deploy mode export SPARK_MASTER_HOST=<spark_master_ip_or_hostname_here>
/etc/spark/conf/spark-env.sh
. Some other configs to consider are:- SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
- SPARK_WORKER_CORES, to set the number of cores to use on this machine
- SPARK_WORKER_MEMORY, to set how much memory to use (for example 1000MB, 2GB)
- SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
- SPARK_WORKER_INSTANCE, to set the number of worker processes per node
- SPARK_WORKER_DIR, to set the working directory of worker processes
Installing Spark as a service
Run the following commands to create a service for the spark-master and spark-workerEdit the$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-master $ sudo chmod 0755 /etc/init.d/spark-master $ sudo cp /etc/init.d/skeleton /etc/init.d/spark-worker $ sudo chmod 0755 /etc/init.d/spark-worker $ sudo update-rc.d spark-master defaults 99 $ sudo update-rc.d spark-worker defaults 99
/etc/init.d/spark-worker
file. If a variable or function already exists then replace it with the text below.Edit theDESC="Spark Worker" NAME=spark-worker SPARK_HOME=/usr/local/spark PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.worker.Worker-1.pid export SPARK_HOME# Exit if the package is not installed #[ -x "$DAEMON" ] || exit 0if [ -f $SPARK_HOME/conf/spark-env.sh ];then . $SPARK_HOME/conf/spark-env.sh else echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service." fi# # Function that returns 0 if process is running, or nonzero if not. # # The nonzero value is 3 if the process is simply not running, and 1 if the # process is not running but the pidfile exists (to match the exit codes for # the "status" command; see LSB core spec 3.1, section 20.2) # is_running() { CMD_PATT="org.apache.spark.deploy.worker.Worker" if [ -f $PIDFILE ]; then pid=`cat $PIDFILE` grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0 return 1 fi return 3 }# # Function that starts the daemon/service # do_start() { # Return # 0 if daemon has been started # 1 if daemon was already running # 2 if daemon could not be started[ -e `dirname "$PIDFILE"` ] || \ install -d -ospark -groot -m755 `dirname $PIDFILE`start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE \ --exec $SPARK_HOME/sbin/start-slave.sh \ --test > /dev/null \ || return 1 start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE \ --exec $SPARK_HOME/sbin/start-slave.sh -- spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \ || return 2}# # Function that stops the daemon/service # do_stop() { start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE RETVAL="$?" rm -f $PIDFILE return "$RETVAL" }# # Function that sends a SIGHUP to the daemon/service # do_reload() { # # If the daemon can reload its configuration without # restarting (for example, when it is sent a SIGHUP), # then implement that here. # start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE return 0 }... status) is_running stat=$? case "$stat" in 0) log_success_msg "$DESC is running" ;; 1) log_failure_msg "could not access pidfile for $DESC" ;; *) log_success_msg "$DESC is not running" ;; esac exit "$stat" ;; ...
/etc/init.d/spark-master
file. If a variable or function already exists then replace it with the text below.DESC="Spark Master" NAME=spark-master SPARK_HOME=/usr/local/spark PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.master.Master-1.pid export SPARK_HOME# Exit if the package is not installed #[ -x "$DAEMON" ] || exit 0if [ -f $SPARK_HOME/conf/spark-env.sh ];then . $SPARK_HOME/conf/spark-env.sh else echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service." fi# # Function that returns 0 if process is running, or nonzero if not. # # The nonzero value is 3 if the process is simply not running, and 1 if the # process is not running but the pidfile exists (to match the exit codes for # the "status" command; see LSB core spec 3.1, section 20.2) # is_running() { CMD_PATT="org.apache.spark.deploy.master.Master" if [ -f $PIDFILE ]; then pid=`cat $PIDFILE` grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0 return 1 fi return 3 }# # Function that starts the daemon/service # do_start() { # Return # 0 if daemon has been started # 1 if daemon was already running # 2 if daemon could not be started[ -e `dirname "$PIDFILE"` ] || \ install -d -ospark -groot -m755 `dirname $PIDFILE`start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh --test > /$ || return 1 start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh \ || return 2 }# # Function that stops the daemon/service # do_stop() { start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILERETVAL="$?" rm -f $PIDFILE return "$RETVAL" }# # Function that sends a SIGHUP to the daemon/service # do_reload() { # # If the daemon can reload its configuration without # restarting (for example, when it is sent a SIGHUP), # then implement that here. # start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE return 0 }... status) is_running stat=$? case "$stat" in 0) log_success_msg "$DESC is running" ;; 1) log_failure_msg "could not access pidfile for $DESC" ;; *) log_success_msg "$DESC is not running" ;; esac exit "$stat" ;; ...
Running Spark as a service
Start the Spark master node first. On whichever node you've selected to be master, run:On all the other nodes, start the workers:$ sudo service spark-master start
To stop Spark, run the following commands on the appropriate nodes$ sudo service spark-worker start
Service logs will be stored in$ sudo service spark-worker stop $ sudo service spark-master stop
/var/log/spark
.Testing the Spark service
To test the Spark service, startspark-shell
on one of the nodes.When the prompt comes up, execute the following line of code:$ spark-shell --master spark://<IP>:<Port>
$ scala> sc.parallelize( 1 to 1000 ).sum()
Get Spark-Cassandra-Connector on the client
The spark-cassandra-connector is a Scala library that exposes Cassandra tables as Spark RDDs, lets you write Spark RDDs to Cassandra tables, and allows you to execute arbitrary computations and CQL queries that are distributed to the Cassandra nodes holding the data, which allows them to be fast. Your code + the spark-cassandra-connector and all dependencies are packaged up and sent to the Spark nodes.If you are writing ad-hoc queries / computations from the
spark-shell
. Start up the shell by running the command:The$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11
--packages
option downloads the connector and all of its dependencies from the Spark Packages site and places it in the path of the Spark Driver and all Spark Executors.If you are writing a Scala application; configure a new Scala project. Your
build.sbt
file should look something like this:name := "MySparkProject"version := "1.0"scalaVersion := "2.11.8"val sparkVersion = "2.0.2"resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % sparkVersion, "org.apache.spark" % "spark-sql_2.11" % sparkVersion, "datastax" % "spark-cassandra-connector" % "2.0.0-M2-s_2.11" )
Testing the connector
To start out, create a simple keyspace and table in Cassandra. Run the following statements in cqlsh:Then insert some example data:CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; CREATE TABLE test.kv(key text PRIMARY KEY, value int);
For this test, we'll use theINSERT INTO test.kv(key, value) VALUES ('key1', 1); INSERT INTO test.kv(key, value) VALUES ('key2', 2);
spark-shell
.$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11
// import the spark connector namespace import com.datastax.spark.connector._// read Cassandra data into an RDD val rdd = sc.cassandraTable("test", "kv") println(rdd.count) println(rdd.first) println(rdd.map(_.getInt("value")).sum)// Add two more rows to the table val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4))) collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))