Step by Step Guide to Installing and Configuring Spark 2.0 to Connect with Cassandra 3.x

1/06/2017

In this guide, we will be installing Scala 2.11, Spark 2.0 as a service, and the DataStax spark-cassandra-connector library on the client program. If you have any of these software packages installed and configured already you can skip that step. This guide assumes you have a Cassandra 3.x cluster that is already up and running. For more information on installing and using Cassandra visit http://cassandra.apache.org/.Note: The following steps should be performed on all the nodes in the cluster unless otherwise noted.

Install Scala 2.11

Ensure you have Java installed
$ java -version java version "1.8.0_91" Java(TM) SE Runtime Environment (build 1.8.0_91-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
If you don't have Java installed follow this tutorial to get Java 8 installed.
Install Scala 2.11.8
$ wget www.scala-lang.org/files/archive/scala-2.11.8.deb $ sudo dpkg -i scala-2.11.8.deb $ scala -version

Install SBT 0.13

echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list $ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823 $ sudo apt-get update $ sudo apt-get install sbt

Install Spark 2.0

Download Spark 2.0 from https://spark.apache.org/downloads.html and unpack the tar file
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.2-bin-hadoop2.7.tgz $ tar zxf spark-2.0.2-bin-hadoop2.7.tgz $ sudo mv spark-2.0.2-bin-hadoop2.7/ /usr/local/spark/
Update system variables
$ sudo nano /etc/environment
Add an environment variable called SPARK_HOME
export SPARK_HOME=/usr/local/spark
At the end of the PATH variable add $SPARK_HOME/bin
PATH="<previous_entries>:/usr/local/spark/bin"
Refresh the environment
source /etc/environment
Create spark user and make it the owner of the SPARK_HOME directory
$ sudo adduser spark --system --home /usr/local/spark/ --disabled-password $ sudo chown -R spark:root /usr/local/spark
Create Log and Pid directories
$ sudo mkdir /var/log/spark $ sudo chown spark:root /var/log/spark $ sudo -u spark mkdir $SPARK_HOME/run

Create the Spark Configuration files

Create the Spark Configuration files by copying the templates
$ sudo cp /usr/local/spark/conf/spark-env.sh.template /usr/local/spark/conf/spark-env.sh $ sudo cp /usr/local/spark/conf/spark-defaults.conf.template /usr/local/spark/conf/spark-defaults.conf $ sudo chown spark:root /usr/local/spark/conf/spark-*
Edit the Spark Environment file spark-env.sh
export SPARK_LOG_DIR=/var/log/spark export SPARK_PID_DIR=${SPARK_HOME}/run

Configure Spark nodes to join cluster

If you will not be managing Spark using the Mesos or YARN cluster managers, you'll be running Spark in what is called "Standalone Mode".
In standalone mode, Spark will have a master node (which is the cluster manager) and worker nodes. You should select one of the nodes in your cluster to be the master. Then on every worker node you must edit the /etc/spark/conf/spark-env.sh to point to the host where the Spark Master runs.
# Options for the daemons used in the standalone deploy mode export SPARK_MASTER_HOST=<spark_master_ip_or_hostname_here>
You can also change other elements of the default configuration by editing the /etc/spark/conf/spark-env.sh. Some other configs to consider are:
  • SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
  • SPARK_WORKER_CORES, to set the number of cores to use on this machine
  • SPARK_WORKER_MEMORY, to set how much memory to use (for example 1000MB, 2GB)
  • SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
  • SPARK_WORKER_INSTANCE, to set the number of worker processes per node
  • SPARK_WORKER_DIR, to set the working directory of worker processes

Installing Spark as a service

Run the following commands to create a service for the spark-master and spark-worker
$ sudo cp /etc/init.d/skeleton /etc/init.d/spark-master $ sudo chmod 0755 /etc/init.d/spark-master $ sudo cp /etc/init.d/skeleton /etc/init.d/spark-worker $ sudo chmod 0755 /etc/init.d/spark-worker $ sudo update-rc.d spark-master defaults 99 $ sudo update-rc.d spark-worker defaults 99
Edit the /etc/init.d/spark-worker file. If a variable or function already exists then replace it with the text below.
DESC="Spark Worker" NAME=spark-worker SPARK_HOME=/usr/local/spark PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.worker.Worker-1.pid export SPARK_HOME
# Exit if the package is not installed #[ -x "$DAEMON" ] || exit 0
if [ -f $SPARK_HOME/conf/spark-env.sh ];then         . $SPARK_HOME/conf/spark-env.sh else         echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service." fi
# # Function that returns 0 if process is running, or nonzero if not. # # The nonzero value is 3 if the process is simply not running, and 1 if the # process is not running but the pidfile exists (to match the exit codes for # the "status" command; see LSB core spec 3.1, section 20.2) # is_running() {     CMD_PATT="org.apache.spark.deploy.worker.Worker"     if [ -f $PIDFILE ]; then         pid=`cat $PIDFILE`         grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0         return 1     fi     return 3 }
# # Function that starts the daemon/service # do_start() {         # Return         #   0 if daemon has been started         #   1 if daemon was already running         #   2 if daemon could not be started
        [ -e `dirname "$PIDFILE"` ] || \                 install -d -ospark -groot -m755 `dirname $PIDFILE`
        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE  \                 --exec $SPARK_HOME/sbin/start-slave.sh  \                 --test > /dev/null \                 || return 1         start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE \                 --exec $SPARK_HOME/sbin/start-slave.sh -- spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT \                 || return 2
}
# # Function that stops the daemon/service # do_stop() {         start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE         RETVAL="$?"         rm -f $PIDFILE         return "$RETVAL" }
# # Function that sends a SIGHUP to the daemon/service # do_reload() {         #         # If the daemon can reload its configuration without         # restarting (for example, when it is sent a SIGHUP),         # then implement that here.         #         start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE         return 0 }
... status)       is_running       stat=$?       case "$stat" in               0) log_success_msg "$DESC is running" ;;               1) log_failure_msg "could not access pidfile for $DESC" ;;               *) log_success_msg "$DESC is not running" ;;       esac       exit "$stat"       ;; ...
Edit the /etc/init.d/spark-master file. If a variable or function already exists then replace it with the text below.
DESC="Spark Master" NAME=spark-master SPARK_HOME=/usr/local/spark PIDFILE=$SPARK_HOME/run/spark--org.apache.spark.deploy.master.Master-1.pid export SPARK_HOME
# Exit if the package is not installed #[ -x "$DAEMON" ] || exit 0
if [ -f $SPARK_HOME/conf/spark-env.sh ];then         . $SPARK_HOME/conf/spark-env.sh else         echo "$SPARK_HOME/conf/spark-env.sh not found. Cannot start service." fi
# # Function that returns 0 if process is running, or nonzero if not. # # The nonzero value is 3 if the process is simply not running, and 1 if the # process is not running but the pidfile exists (to match the exit codes for # the "status" command; see LSB core spec 3.1, section 20.2) # is_running() {     CMD_PATT="org.apache.spark.deploy.worker.Worker"     if [ -f $PIDFILE ]; then         pid=`cat $PIDFILE`         grep -Eq "$CMD_PATT" "/proc/$pid/cmdline" 2>/dev/null && return 0         return 1     fi     return 3 }
# # Function that starts the daemon/service # do_start() {         # Return         #   0 if daemon has been started         #   1 if daemon was already running         #   2 if daemon could not be started
        [ -e `dirname "$PIDFILE"` ] || \                 install -d -ospark -groot -m755 `dirname $PIDFILE`
        start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh --test > /$                 || return 1         start-stop-daemon --start --quiet --chuid spark --pidfile $PIDFILE --exec $SPARK_HOME/sbin/start-master.sh  \                 || return 2 }
# # Function that stops the daemon/service # do_stop() {         start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE
        RETVAL="$?"         rm -f $PIDFILE         return "$RETVAL" }
# # Function that sends a SIGHUP to the daemon/service # do_reload() {         #         # If the daemon can reload its configuration without         # restarting (for example, when it is sent a SIGHUP),         # then implement that here.         #         start-stop-daemon --stop --signal 1 --quiet --pidfile $PIDFILE         return 0 }
... status)       is_running       stat=$?       case "$stat" in               0) log_success_msg "$DESC is running" ;;               1) log_failure_msg "could not access pidfile for $DESC" ;;               *) log_success_msg "$DESC is not running" ;;       esac       exit "$stat"       ;; ...

Running Spark as a service

Start the Spark master node first. On whichever node you've selected to be master, run:
$ sudo service spark-master start
On all the other nodes, start the workers:
$ sudo service spark-worker start
To stop Spark, run the following commands on the appropriate nodes
$ sudo service spark-worker stop $ sudo service spark-master stop
Service logs will be stored in /var/log/spark.

Testing the Spark service

To test the Spark service, start spark-shell on one of the nodes.
$ spark-shell --master spark://<IP>:<Port>
When the prompt comes up, execute the following line of code:
$ scala> sc.parallelize( 1 to 1000 ).sum()

Get Spark-Cassandra-Connector on the client

The spark-cassandra-connector is a Scala library that exposes Cassandra tables as Spark RDDs, lets you write Spark RDDs to Cassandra tables, and allows you to execute arbitrary computations and CQL queries that are distributed to the Cassandra nodes holding the data, which allows them to be fast. Your code + the spark-cassandra-connector and all dependencies are packaged up and sent to the Spark nodes.
If you are writing ad-hoc queries / computations from the spark-shell. Start up the shell by running the command:
$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11
The --packages option downloads the connector and all of its dependencies from the Spark Packages site and places it in the path of the Spark Driver and all Spark Executors.
If you are writing a Scala application; configure a new Scala project. Your build.sbt file should look something like this:
name := "MySparkProject"
version := "1.0"
scalaVersion := "2.11.8"
val sparkVersion = "2.0.2"
resolvers += "Spark Packages Repo" at "https://dl.bintray.com/spark-packages/maven"
libraryDependencies ++= Seq(   "org.apache.spark"      % "spark-core_2.11"  % sparkVersion,   "org.apache.spark"      % "spark-sql_2.11"   % sparkVersion,   "datastax"              % "spark-cassandra-connector" % "2.0.0-M2-s_2.11" )

Testing the connector

To start out, create a simple keyspace and table in Cassandra. Run the following statements in cqlsh:
CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1 }; CREATE TABLE test.kv(key text PRIMARY KEY, value int);
Then insert some example data:
INSERT INTO test.kv(key, value) VALUES ('key1', 1); INSERT INTO test.kv(key, value) VALUES ('key2', 2);
For this test, we'll use the spark-shell.
$ spark-shell --conf spark.cassandra.connection.host=<master-ip-address> --packages datastax:spark-cassandra-connector:2.0.0-M2-s_2.11
// import the spark connector namespace import com.datastax.spark.connector._
// read Cassandra data into an RDD val rdd = sc.cassandraTable("test", "kv") println(rdd.count) println(rdd.first) println(rdd.map(_.getInt("value")).sum)  
// Add two more rows to the table val collection = sc.parallelize(Seq(("key3", 3), ("key4", 4))) collection.saveToCassandra("test", "kv", SomeColumns("key", "value"))

You Might Also Like

1 comments

  1. Hello Τim,

    Nice blog! I am editor at Java Code Geeks (www.javacodegeeks.com). We have the JCG program (see www.javacodegeeks.com/join-us/jcg/), that I think you’d be perfect for.

    If you’re interested, send me an email to eleftheria.drosopoulou@javacodegeeks.com and we can discuss further.

    Best regards,
    Eleftheria Drosopoulou

    ReplyDelete