Spark Advanced Tutorials (Complete guide book)
Introduction
Apache Spark is a general-purpose cluster computing system to process big data workloads. What sets Spark apart from its predecessors, such as MapReduce, is its speed, ease-of-use, and sophisticated analytics.
Apache Spark was originally developed at AMPLab, UC Berkeley, in 2009. It was made open source in 2010 under the BSD license and switched to the Apache 2.0 license in 2013. Toward the later part of 2013, the creators of Spark founded Databricks to focus on Spark’s development and future releases.
Talking about speed, Spark can achieve sub-second latency on big data workloads. To achieve such low latency, Spark makes use of the memory for storage. In MapReduce, memory is primarily used for actual computation. Spark uses memory both to compute and store objects.
Spark also provides a unified runtime connecting to various big data storage sources, such as HDFS, Cassandra, HBase, and S3. It also provides a rich set of higher-level libraries
for different big data compute tasks, such as machine learning, SQL processing, graph processing, and real-time streaming. These libraries make development faster and can be combined in an arbitrary fashion.
Though Spark is written in Scala, Spark also supports Java and Python.
Spark is an open source community project, and everyone uses the pure open source Apache distributions for deployments, unlike Hadoop, which has multiple distributions available with vendor enhancements.
The following figure shows the Spark ecosystem:
The Spark runtime runs on top of a variety of cluster managers, including YARN (Hadoop’s compute framework), Mesos, and Spark’s own cluster manager called standalone mode. Tachyon is a memory-centric distributed file system that enables reliable file sharing at memory speed across cluster frameworks. In short, it is an off-heap storage layer in memory, which helps share data across jobs and users. Mesos is a cluster manager, which is evolving into a data center operating system. YARN is Hadoop’s compute framework that has a robust resource management feature that Spark can seamlessly use.
Spark can be either built from the source code or precompiled binaries can be downloaded from http://spark.apache.org. For a standard use case, binaries are good enough, and this recipe will focus on installing Spark using binaries.
Getting ready
All the recipes in this are developed using Ubuntu Linux but should work fine on any POSIX environment. Spark expects Java to be installed and the JAVA_HOME environment variable to be set.
In Linux/Unix systems, there are certain standards for the location of files and directories, which we are going to follow, The following is a quick cheat sheet:
Directory Description
/bin Essential command binaries
/etc Host-specific system configuration
/opt Add-on application software packages
/var Variable data
/tmp Temporary files
/home User home directories
How to do it…
At the time of writing this, Spark’s current version is 1.4. Please check the latest version from Spark’s download page at http://spark.apache.org/downloads.html. Binaries are developed with a most recent and stable version of Hadoop. To use a specific version of Hadoop, the recommended approach is to build from sources, which will be covered in the next recipe.
The following are the installation steps:
1. Open the terminal and download binaries using the following command:
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.4.0-bin- hadoop2.4.tgz
2. Unpack binaries:
$ tar -zxf spark-1.4.0-bin-hadoop2.4.tgz
- Rename the folder containing binaries by stripping the version information: $ sudo mv spark-1.4.0-bin-hadoop2.4 spark
- Move the configuration folder to the /etc folder so that it can be made a symbolic link later: $ sudo mv spark/conf/* /etc/spark
- Create your company-specific installation directory under /opt. As the recipes in this book are tested on infoobjects sandbox, we are going to use infoobjects as directory name. Create the /opt/infoobjects directory: $ sudo mkdir -p /opt/infoobjects
- Move the spark directory to /opt/infoobjects as it’s an add-on software package: $ sudo mv spark /opt/infoobjects/
- Change the ownership of the spark home directory to root: $ sudo chown -R root:root /opt/infoobjects/spark
- Change permissions of the spark home directory, 0755 = user:read-write- execute group:read-execute world:read-execute: $ sudo chmod -R 755 /opt/infoobjects/spark
- Move to the spark home directory: $ cd /opt/infoobjects/spark
- Create the symbolic link: $ sudo ln -s /etc/spark conf
- Append to PATH in .bashrc:
$ echo “export PATH=$PATH:/opt/infoobjects/spark/bin” >> /home/ hduser/.bashrc - Open a new terminal.
- Create the log directory in /var: $ sudo mkdir -p /var/log/spark
- Make hduser the owner of the Spark log directory.
$ sudo chown -R hduser:hduser /var/log/spark - Create the Spark tmp directory: $ mkdir /tmp/spark
16. Configure Spark with the help of the following command lines:
$ cd /etc/spark
$ echo "export HADOOP_CONF_DIR=/opt/infoobjects/hadoop/etc/hadoop" >> spark-env.sh
$ echo "export YARN_CONF_DIR=/opt/infoobjects/hadoop/etc/Hadoop" >> spark-env.sh
$ echo "export SPARK_LOG_DIR=/var/log/spark" >> spark-env.sh $ echo "export SPARK_WORKER_DIR=/tmp/spark" >> spark-env.sh
Building the Spark source code with Maven
Installing Spark using binaries works fine in most cases. For advanced cases, such as the following (but not limited to), compiling from the source code is a better option:
f Compiling for a specific Hadoop version f Adding the Hive integration
f Adding the YARN integration
Getting ready
The following are the prerequisites for this recipe to work: f Java 1.6 or a later version
f Maven 3.x
How to do it…
The following are the steps to build the Spark source code with Maven:
- Increase MaxPermSize for heap: $ echo “export _JAVA_OPTIONS=\”-XX:MaxPermSize=1G\”” >> /home/ hduser/.bashrc
- Open a new terminal window and download the Spark source code from GitHub: $ wget https://github.com/apache/spark/archive/branch-1.4.zip
Chapter 1
- Unpack the archive: $ gunzip branch-1.4.zip
- Move to the spark directory: $ cd spark
- Compile the sources with these flags: Yarn enabled, Hadoop version 2.4, Hive enabled, and skipping tests for faster compilation: $ mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -DskipTests clean package
- Move the conf folder to the etc folder so that it can be made a symbolic link: $ sudo mv spark/conf /etc/
- Move the spark directory to /opt as it’s an add-on software package: $ sudo mv spark /opt/infoobjects/spark
- Change the ownership of the spark home directory to root: $ sudo chown -R root:root /opt/infoobjects/spark
- Change the permissions of the spark home directory 0755 = user:rwx group:r-x world:r-x: $ sudo chmod -R 755 /opt/infoobjects/spark
- Move to the spark home directory: $ cd /opt/infoobjects/spark
- Create a symbolic link: $ sudo ln -s /etc/spark conf
- Put the Spark executable in the path by editing .bashrc:
$ echo “export PATH=$PATH:/opt/infoobjects/spark/bin” >> /home/ hduser/.bashrc - Create the log directory in /var:
$ sudo mkdir -p /var/log/spark - Make hduser the owner of the Spark log directory:
$ sudo chown -R hduser:hduser /var/log/spark - Create the Spark tmp directory: $ mkdir /tmp/spark
- Configure Spark with the help of the following command lines: $ cd /etc/spark $ echo “export HADOOP_CONF_DIR=/opt/infoobjects/hadoop/etc/hadoop” >> spark-env.sh $ echo “export YARN_CONF_DIR=/opt/infoobjects/hadoop/etc/Hadoop” >> spark-env.sh $ echo “export SPARK_LOG_DIR=/var/log/spark” >> spark-env.sh $ echo “export SPARK_WORKER_DIR=/tmp/spark” >> spark-env.sh
Launching Spark on Amazon EC2
Amazon Elastic Compute Cloud (Amazon EC2) is a web service that provides resizable compute instances in the cloud. Amazon EC2 provides the following features:
f On-demand delivery of IT resources via the Internet
f The provision of as many instances as you like
f Payment for the hours you use instances like your utility bill
f No setup cost, no installation, and no overhead at all
f When you no longer need instances, you either shut down or terminate and walk away f The availability of these instances on all familiar operating systems
EC2 provides different types of instances to meet all compute needs, such as general-purpose instances, micro instances, memory-optimized instances, storage-optimized instances, and others. They have a free tier of micro-instances to try.
Getting ready
The spark-ec2 script comes bundled with Spark and makes it easy to launch, manage, and shut down clusters on Amazon EC2.
Before you start, you need to do the following things:
- Log in to the Amazon AWS account (http://aws.amazon.com).
- Click on Security Credentials under your account name in the top-right corner.
- Click on Access Keys and Create New Access Key:
- Note down the access key ID and secret access key.
- Now go to Services | EC2.
- Click on Key Pairs in left-hand menu under NETWORK & SECURITY.
- Click on Create Key Pair and enter kp-spark as key-pair name:
- Download the private key file and copy it in the /home/hduser/keypairs folder.
- Set permissions on key file to 600.
- Set environment variables to reflect access key ID and secret access key (please replace sample values with your own values): $ echo “export AWS_ACCESS_KEY_ID=\”AKIAOD7M2LOWATFXFKQ\”” >> / home/hduser/.bashrc $ echo “export AWS_SECRET_ACCESS_KEY=\”+Xr4UroVYJxiLiY8DLT4DLT4D4s xc3ijZGMx1D3pfZ2q\”” >> /home/hduser/.bashrc $ echo “export PATH=$PATH:/opt/infoobjects/spark/ec2” >> /home/ hduser/.bashrc
How to do it…
1. Spark comes bundled with scripts to launch the Spark cluster on Amazon EC2. Let’s launch the cluster using the following command:
$ cd /home/hduser
$ spark-ec2 -k <key-pair> -i <key-file> -s <num-slaves> launch <cluster-name>
2. Launch the cluster with the example value:
$ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem --hadoop-major-version 2 -s 3 launch spark-cluster
f <key-pair>: This is the name of EC2 key-pair created in AWS f <key-file>: This is the private key file you downloaded
f <num-slaves>: This is the number of slave nodes to launch f <cluster-name>: This is the name of the cluster
- Sometimes, the default availability zones are not available; in that case, retry sending the request by specifying the specific availability zone you are requesting: $ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem -z us-east-1b –hadoop-major-version 2 -s 3 launch spark-cluster
- If your application needs to retain data after the instance shuts down, attach EBS volume to it (for example, a 10 GB space): $ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem –hadoop-major-version 2 -ebs-vol-size 10 -s 3 launch spark- cluster
- If you use Amazon spot instances, here’s the way to do it: $ spark-ec2 -k kp-spark -i /home/hduser/keypairs/kp-spark.pem -spot-price=0.15 –hadoop-major-version 2 -s 3 launch spark- cluster Spot instances allow you to name your own price for Amazon EC2 computing capacity. You simply bid on spare Amazon EC2 instances and run them whenever your bid exceeds the current spot price, which varies in real-time based on supply and demand (source: amazon.com).
- After everything is launched, check the status of the cluster by going to the web UI URL that will be printed at the end.
7. Check the status of the cluster:
8. Now, to access the Spark cluster on EC2, let’s connect to the master node using secure shell protocol (SSH):
$ spark-ec2 -k kp-spark -i /home/hduser/kp/kp-spark.pem login spark-cluster
You should get something like the following:
9. Check directories in the master node and see what they do:
Directory
ephemeral-hdfspersistent-hdfshadoop-native
10
Description
This is the Hadoop instance for which data is ephemeral and gets deleted when you stop or restart the machine.
Each node has a very small amount of persistent storage (approximately 3 GB). If you use this instance, data will be retained in that space.
These are native libraries to support Hadoop, such as snappy compression libraries.
Directory
Scala
shark
sparkspark-ec2tachyon
Description
This is Scala installation.
This is Shark installation (Shark is no longer supported and is replaced by Spark SQL).
This is Spark installation
These are files to support this cluster deployment. This is Tachyon installation
- Check the HDFS version in an ephemeral instance: $ ephemeral-hdfs/bin/hadoop version Hadoop 2.0.0-chd4.2.0
- Check the HDFS version in persistent instance with the following command: $ persistent-hdfs/bin/hadoop version Hadoop 2.0.0-chd4.2.0
- Change the configuration level in logs: $ cd spark/conf
- The default log level information is too verbose, so let’s change it to Error:
- Create the log4.properties file by renaming the template: $ mv log4j.properties.template log4j.properties
- Open log4j.properties in vi or your favorite editor: $ vi log4j.properties
- Change second line from | log4j.rootCategory=INFO, console to | log4j.rootCategory=ERROR, console.
14. Copy the configuration to all slave nodes after the change:
$ spark-ec2/copydir spark/conf
You should get something like this:
15. Destroy the Spark cluster:
$ spark-ec2 destroy spark-cluster
See also
f http://aws.amazon.com/ec2
Deploying on a cluster in standalone mode
Compute resources in a distributed environment need to be managed so that resource utilization is efficient and every job gets a fair chance to run. Spark comes along with its own cluster manager conveniently called standalone mode. Spark also supports working with YARN and Mesos cluster managers.
The cluster manager that should be chosen is mostly driven by both legacy concerns and whether other frameworks, such as MapReduce, are sharing the same compute resource pool. If your cluster has legacy MapReduce jobs running, and all of them cannot be converted to Spark jobs, it is a good idea to use YARN as the cluster manager. Mesos is emerging as a data center operating system to conveniently manage jobs across frameworks, and is very compatible with Spark.
If the Spark framework is the only framework in your cluster, then standalone mode is good enough. As Spark evolves as technology, you will see more and more use cases of Spark being used as the standalone framework serving all big data compute needs. For example, some jobs may be using Apache Mahout at present because MLlib does not have a specific machine-learning library, which the job needs. As soon as MLlib gets this library, this particular job can be moved to Spark.
Getting ready
Let’s consider a cluster of six nodes as an example setup: one master and five slaves (replace them with actual node names in your cluster):
Masterm1.zettabytes.comSlaves
s1.zettabytes.coms2.zettabytes.coms3.zettabytes.coms4.zettabytes.coms5.zettabytes.com
How to do it…
- Since Spark’s standalone mode is the default, all you need to do is to have Spark binaries installed on both master and slave machines. Put /opt/infoobjects/ spark/sbin in path on every node: $ echo “export PATH=$PATH:/opt/infoobjects/spark/sbin” >> /home/ hduser/.bashrc
- Start the standalone master server (SSH to master first): hduser@m1.zettabytes.com~] start-master.sh Master, by default, starts on port 7077, which slaves use to connect to it. It also has a web UI at port 8088.
- Please SSH to master node and start slaves: hduser@s1.zettabytes.com~] spark-class org.apache.spark.deploy. worker.Worker spark://m1.zettabytes.com:7077
Argument (for fine-grained configuration, the following parameters work with both master and slaves)
-i <ipaddress>,-ip <ipaddress>-p <port>, --port <port>
--webui-port <port>-c <cores>,--cores <cores>-m <memory>,--memory <memory>-d <dir>,--work-dir <dir>
Meaning
IP address/DNS service listens on Port service listens on
Port for web UI (by default, 8080 for master and 8081 for worker)
Total CPU cores Spark applications that can be used on a machine (worker only)
Total RAM Spark applications that can be used on a machine (worker only)
The directory to use for scratch space and job output logs
4. Rather than manually starting master and slave daemons on each node, it can also be accomplished using cluster launch scripts.
5. First, create the conf/slaves file on a master node and add one line per slave hostname (using an example of five slaves nodes, replace with the DNS of slave nodes in your cluster):
hduser@m1.zettabytes.com~] echo "s1.zettabytes.com" >> conf/slaves hduser@m1.zettabytes.com~] echo "s2.zettabytes.com" >> conf/slaves hduser@m1.zettabytes.com~] echo "s3.zettabytes.com" >> conf/slaves hduser@m1.zettabytes.com~] echo "s4.zettabytes.com" >> conf/slaves hduser@m1.zettabytes.com~] echo "s5.zettabytes.com" >> conf/slaves
Once the slave machine is set up, you can call the following scripts to start/stop cluster:
Script name
start-master.shstart-slaves.shstart-all.shstop-master.shstop-slaves.shstop-all.sh
Purpose
Starts a master instance on the host machine
Starts a slave instance on each node in the slaves file Starts both master and slaves
Stops the master instance on the host machine
Stops the slave instance on all nodes in the slaves file Stops both master and slaves
- Connect an application to the cluster through the Scala code: val sparkContext = new SparkContext(new SparkConf(). setMaster(“spark://m1.zettabytes.com:7077”)
- Connect to the cluster through Spark shell: $ spark-shell –master spark://master:7077
How it works…
In standalone mode, Spark follows the master slave architecture, very much like Hadoop, MapReduce, and YARN. The compute master daemon is called Spark master and runs on one master node. Spark master can be made highly available using ZooKeeper. You can also add more standby masters on the fly, if needed.
The compute slave daemon is called worker and is on each slave node. The worker daemon does the following:
Reports the availability of compute resources on a slave node, such as the number of cores, memory, and others, to Spark master
Spawns the executor when asked to do so by Spark master f Restarts the executor if it dies
Both Spark master and worker are very lightweight. Typically, memory allocation between 500 MB to 1 GB is sufficient. This value can be set in conf/spark-env.sh by setting the SPARK_DAEMON_MEMORY parameter. For example, the following configuration will set the memory to 1 gigabits for both master and worker daemon. Make sure you have sudo as the super user before running it:
$ echo "export SPARK_DAEMON_MEMORY=1g" >> /opt/infoobjects/spark/conf/spark-env.sh
By default, each slave node has one worker instance running on it. Sometimes, you may have a few machines that are more powerful than others. In that case, you can spawn more than one worker on that machine by the following configuration (only on those machines):
$ echo "export SPARK_WORKER_INSTANCES=2" >> /opt/infoobjects/spark/conf/spark-env.sh
Spark worker, by default, uses all cores on the slave machine for its executors. If you would like to limit the number of cores the worker can use, you can set it to that number (for example, 12) by the following configuration:
$ echo "export SPARK_WORKER_CORES=12" >> /opt/infoobjects/spark/conf/spark-env.sh
Spark worker, by default, uses all the available RAM (1 GB for executors). Note that you cannot allocate how much memory each specific executor will use (you can control this from the driver configuration). To assign another value for the total memory (for example, 24 GB) to be used by all executors combined, execute the following setting:
$ echo "export SPARK_WORKER_MEMORY=24g" >> /opt/infoobjects/spark/conf/spark-env.sh
There are some settings you can do at the driver level:
f To specify the maximum number of CPU cores to be used by a given application across the cluster, you can set the spark.cores.max configuration in Spark submit or Spark shell as follows:
$ spark-submit --conf spark.cores.max=12
f To specify the amount of memory each executor should be allocated (the minimum recommendation is 8 GB), you can set the spark.executor.memory configuration in Spark submit or Spark shell as follows:
$ spark-submit --conf spark.executor.memory=8g
See also
f http://spark.apache.org/docs/latest/spark-standalone.html to find more configuration options
Deploying on a cluster with Mesos
Mesos is slowly emerging as a data center operating system to manage all compute resources across a data center. Mesos runs on any computer running the Linux operating system. Mesos is built using the same principles as Linux kernel. Let’s see how we can install Mesos.
How to do it…
Mesosphere provides a binary distribution of Mesos. The most recent package for the Mesos distribution can be installed from the Mesosphere repositories by performing the following steps:
1. Execute Mesos on Ubuntu OS with the trusty version:
$ sudo apt-key adv --keyserver keyserver.ubuntu.com --recv E56151BF DISTRO=$(lsb_release -is | tr '[:upper:]' '[:lower:]') CODENAME=$(lsb_release -cs)
- Update the repositories: $ sudo apt-get -y update
- Install Mesos: $ sudo apt-get -y install mesos
- To connect Spark to Mesos to integrate Spark with Mesos, make Spark binaries available to Mesos and configure the Spark driver to connect to Mesos.
- Use Spark binaries from the first recipe and upload to HDFS: $ hdfs dfs -put spark-1.4.0-bin-hadoop2.4.tgz spark-1.4.0-bin- hadoop2.4.tgz
- The master URL for single master Mesos is mesos://host:5050, and for the ZooKeeper managed Mesos cluster, it is mesos://zk://host:2181.
- Set the following variables in spark-env.sh: $ sudo vi spark-env.sh export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so export SPARK_EXECUTOR_URI= hdfs://localhost:9000/user/hduser/ spark-1.4.0-bin-hadoop2.4.tgz
- Run from the Scala program: val conf = new SparkConf().setMaster(“mesos://host:5050”) val sparkContext = new SparkContext(conf)
- Run from the Spark shell: $ spark-shell –master mesos://host:5050 Mesos has two run modes: Fine-grained: In fine-grained (default) mode, every Spark task runs as a separate Mesos task Coarse-grained: This mode will launch only one long-running Spark task on each Mesos machine
10. To run in the coarse-grained mode, set the spark.mesos.coarse property: conf.set(“spark.mesos.coarse”,”true”)
Deploying on a cluster with YARN
Yet another resource negotiator (YARN) is Hadoop’s compute framework that runs on top of HDFS, which is Hadoop’s storage layer.
YARN follows the master slave architecture. The master daemon is called ResourceManager and the slave daemon is called NodeManager. Besides this application, life cycle management is done by ApplicationMaster, which can be spawned on any slave node and is alive for the lifetime of an application.
When Spark is run on YARN, ResourceManager performs the role of Spark master and NodeManagers work as executor nodes.
While running Spark with YARN, each Spark executor is run as YARN container.
Getting ready
Running Spark on YARN requires a binary distribution of Spark that has YARN support. In both Spark installation recipes, we have taken care of it.
How to do it…
1. To run Spark on YARN, the first step is to set the configuration:
HADOOP_CONF_DIR: to write to HDFS YARN_CONF_DIR: to connect to YARN ResourceManager $ cd /opt/infoobjects/spark/conf (or /etc/spark) $ sudo vi spark-env.sh export HADOOP_CONF_DIR=/opt/infoobjects/hadoop/etc/Hadoop export YARN_CONF_DIR=/opt/infoobjects/hadoop/etc/hadoop
- The following command launches YARN Spark in the yarn-client mode:
$ spark-submit –class path.to.your.Class –master yarn-client [options] <app jar> [app options] Here’s an example: $ spark-submit –class com.infoobjects.TwitterFireHose –master yarn-client –num-executors 3 –driver-memory 4g –executor-memory 2g –executor-cores 1 target/sparkio.jar 10 - The following command launches Spark shell in the yarn-client mode: $ spark-shell –master yarn-client
- The command to launch in the yarn-cluster mode is as follows:
$ spark-submit –class path.to.your.Class –master yarn-cluster [options] <app jar> [app options] Here’s an example: $ spark-submit –class com.infoobjects.TwitterFireHose –master yarn-cluster –num-executors 3 –driver-memory 4g –executor- memory 2g –executor-cores 1 target/sparkio.jar 10
In the YARN mode, the following configuration parameters can be set:
f –num-executors: Configure how many executors will be allocated f –executor-memory: RAM per executor
f –executor-cores: CPU cores per executor
Using Tachyon as an off-heap storage layer
Spark RDDs are a great way to store datasets in memory while ending up with multiple copies of the same data in different applications. Tachyon solves some of the challenges with Spark RDD management. A few of them are:
RDD only exists for the duration of the Spark application
The same process performs the compute and RDD in-memory storage; so, if a
process crashes, in-memory storage also goes away
Different jobs cannot share an RDD even if they are for the same underlying data, for example, an HDFS block that leads to:
- Slow writes to disk
- Duplication of data in memory, higher memory footprint
If the output of one application needs to be shared with the other application, it’s slow due to the replication in the disk
Tachyon provides an off-heap memory layer to solve these problems. This layer, being off-heap, is immune to process crashes and is also not subject to garbage collection. This also lets RDDs be shared across applications and outlive a specific job or session; in essence, one single copy of data resides in memory, as shown in the following figure:
How to do it…
- Let’s download and compile Tachyon (Tachyon, by default, comes configured for Hadoop 1.0.4, so it needs to be compiled from sources for the right Hadoop version). Replace the version with the current version. The current version at the time of writing this book is 0.6.4: $ wget https://github.com/amplab/tachyon/archive/v<version>.zip
- Unarchive the source code: $ unzip v-<version>.zip
- Remove the version from the tachyon source folder name for convenience: $ mv tachyon-<version> tachyon
- Change the directory to the tachyon folder: $ cd tachyon $ mvn -Dhadoop.version=2.4.0 clean package -DskipTests=true $ cd conf $ sudo mkdir -p /var/tachyon/journal $ sudo chown -R hduser:hduser /var/tachyon/journal
$ sudo mkdir -p /var/tachyon/ramdisk $ sudo chown -R hduser:hduser /var/tachyon/ramdisk
$ mv tachyon-env.sh.template tachyon-env.sh $ vi tachyon-env.sh
- Comment the following line: export TACHYON_UNDERFS_ADDRESS=$TACHYON_HOME/underfs
- Uncomment the following line: export TACHYON_UNDERFS_ADDRESS=hdfs://localhost:9000
- Change the following properties: -Dtachyon.master.journal.folder=/var/tachyon/journal/ export TACHYON_RAM_FOLDER=/var/tachyon/ramdisk $ sudo mkdir -p /var/log/tachyon $ sudo chown -R hduser:hduser /var/log/tachyon $ vi log4j.properties
- Replace ${tachyon.home} with /var/log/tachyon.
- Create a new core-site.xml file in the conf directory: $ sudo vi core-site.xml <configuration> <property> <name>fs.tachyon.impl</name> <value>tachyon.hadoop.TFS</value> </property> </configuration> $ cd ~ $ sudo mv tachyon /opt/infoobjects/ $ sudo chown -R root:root /opt/infoobjects/tachyon $ sudo chmod -R 755 /opt/infoobjects/tachyon
10. Add <tachyon home>/bin to the path:
$ echo "export PATH=$PATH:/opt/infoobjects/tachyon/bin" >> /home/hduser/.bashrc
11. Restart the shell and format Tachyon:
$ tachyon format
$ tachyon-start.sh local //you need to enter root password as RamFS needs to be formatted
Tachyon’s web interface is http://hostname:19999:
12. Run the sample program to see whether Tachyon is running fine:
$ tachyon runTest Basic CACHE_THROUGH
13. You can stop Tachyon any time by running the following command:
$ tachyon-stop.sh
$ spark-shell
scala> val words = sc.textFile("tachyon://localhost:19998/words")
scala> words.count
scala> words.saveAsTextFile("tachyon://localhost:19998/w2")
scala> val person = sc.textFile("hdfs://localhost:9000/user/ hduser/person")
scala> import org.apache.spark.api.java._ scala> person.persist(StorageLevels.OFF_HEAP)
See also
http://www.cs.berkeley.edu/~haoyuan/papers/2013_ladis_tachyon. pdf to learn about the origins of Tachyon
http://www.tachyonnexus.com
External Data Sources
One of the strengths of Spark is that it provides a single runtime that can connect with various underlying data sources.
In this chapter, we will connect to different data sources. This chapter is divided into the following recipes:
Loading data from the local filesystem
Loading data from HDFS
Loading data from HDFS using a custom InputFormat f Loading data from Amazon S3
Loading data from Apache Cassandra
Loading data from relational databases
Introduction
Spark provides a unified runtime for big data. HDFS, which is Hadoop’s filesystem, is the most used storage platform for Spark as it provides cost-effective storage for unstructured and semi-structured data on commodity hardware. Spark is not limited to HDFS and can work with any Hadoop-supported storage.
Hadoop supported storage means a storage format that can work with Hadoop’s InputFormat and OutputFormat interfaces. InputFormat is responsible for creating InputSplits from input data and dividing it further into records. OutputFormat is responsible for writing to storage.
We will start with writing to the local filesystem and then move over to loading data from HDFS. In the Loading data from HDFS recipe, we will cover the most common file format: regular text files. In the next recipe, we will cover how to use any InputFormat interface to load data in Spark. We will also explore loading data stored in Amazon S3, a leading cloud storage platform.
We will explore loading data from Apache Cassandra, which is a NoSQL database. Finally, we will explore loading data from a relational database.
Loading data from the local filesystem
Though the local filesystem is not a good fit to store big data due to disk size limitations and lack of distributed nature, technically you can load data in distributed systems using the local filesystem. But then the file/directory you are accessing has to be available on each node.
Please note that if you are planning to use this feature to load side data, it is not a good idea. To load side data, Spark has a broadcast variable feature, which will be discussed in upcoming chapters.
In this recipe, we will look at how to load data in Spark from the local filesystem.
How to do it…
Let’s start with the example of Shakespeare’s “to be or not to be”:
- Create the words directory by using the following command: $ mkdir words
- Get into the words directory: $ cd words
- Create the sh.txt text file and enter “to be or not to be” in it: $ echo “to be or not to be” > sh.txt
- Start the Spark shell: $ spark-shell
- Load the words directory as RDD:
scala> val words = sc.textFile(“file:///home/hduser/words”) - Count the number of lines: scala> words.count
- Divide the line (or lines) into multiple words: scala> val wordsFlatMap = words.flatMap(_.split(“\\W+”))
- Convert word to (word,1)—that is, output 1 as the value for each occurrence of word as a key: scala> val wordsMap = wordsFlatMap.map( w => (w,1))
- Use the reduceByKey method to add the number of occurrences for each word as a key (this function works on two consecutive values at a time, represented by a and b): scala> val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))
- Print the RDD: scala> wordCount.collect.foreach(println)
- Doing all of the preceding operations in one step is as follows: scala> sc.textFile(“file:///home/hduser/ words”). flatMap(_. split(“\\W+”)).map( w => (w,1)). reduceByKey( (a,b) => (a+b)). foreach(println)
This gives the following output:
Loading data from HDFS
HDFS is the most widely used big data storage system. One of the reasons for the wide adoption of HDFS is schema-on-read. What this means is that HDFS does not put any restriction on data when data is being written. Any and all kinds of data are welcome and can be stored in a raw format. This feature makes it ideal storage for raw unstructured data and semi-structured data.
When it comes to reading data, even unstructured data needs to be given some structure to make sense. Hadoop uses InputFormat to determine how to read the data. Spark provides complete support for Hadoop’s InputFormat so anything that can be read by Hadoop can be read by Spark as well.
The default InputFormat is TextInputFormat. TextInputFormat takes the byte offset of a line as a key and the content of a line as a value. Spark uses the sc.textFile method to read using TextInputFormat. It ignores the byte offset and creates an RDD of strings.
Sometimes the filename itself contains useful information, for example, time-series data. In that case, you may want to read each file separately. The sc.wholeTextFiles method allows you to do that. It creates an RDD with the filename and path (for example, hdfs://localhost:9000/ user/hduser/words) as a key and the content of the whole file as the value.
Spark also supports reading various serialization and compression-friendly formats such as Avro, Parquet, and JSON using DataFrames. These formats will be covered in coming chapters.
In this recipe, we will look at how to load data in the Spark shell from HDFS.
Let’s do the word count, which counts the number of occurrences of each word. In this recipe, we will load data from HDFS:
- Create the words directory by using the following command: $ mkdir words
- Change the directory to words: $ cd words
- Create the sh.txt text file and enter “to be or not to be” in it: $ echo “to be or not to be” > sh.txt
- Start the Spark shell: $ spark-shell
- Load the words directory as the RDD:
scala> val words = sc.textFile(“hdfs://localhost:9000/user/hduser/ words”)
The sc.textFile method also supports passing an additional argument for the number of partitions. By default, Spark creates one partition for each InputSplit class, which roughly corresponds to one block.
You can ask for a higher number of partitions. It works really well for compute-intensive jobs such as in machine learning. As one partition cannot contain more than one block, having fewer partitions than blocks is not allowed.
- Count the number of lines (the result will be 1): scala> words.count
- Divide the line (or lines) into multiple words: scala> val wordsFlatMap = words.flatMap(_.split(“\\W+”))
- Convert word to (word,1)—that is, output 1 as a value for each occurrence of word as a key: scala> val wordsMap = wordsFlatMap.map( w => (w,1))
- Use the reduceByKey method to add the number of occurrences of each word as a key (this function works on two consecutive values at a time, represented by a and b): scala> val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))
10. Print the RDD:
scala> wordCount.collect.foreach(println)
11. Doing all of the preceding operations in one step is as follows:
scala> sc.textFile("hdfs://localhost:9000/user/hduser/words"). flatMap(_.split("\\W+")).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).foreach(println)
Data here is divided in such a way that each filename contains useful information, that is, USAF-WBAN-year, where USAF is the US air force station number and WBAN is the weather bureau army navy location number.
You will also notice that all files are compressed as gzip with a .gz extension. Compression is handled automatically so all you need to do is to upload data in HDFS. We will come back to this dataset in the coming chapters.
Since the whole dataset is not large, it can be uploaded in HDFS in the pseudo-distributed mode also:
- Download data: $ wget -r ftp://ftp.ncdc.noaa.gov/pub/data/noaa/
- Load the weather data in HDFS: $ hdfs dfs -put ftp.ncdc.noaa.gov/pub/data/noaa weather/
- Start the Spark shell: $ spark-shell
- Load weather data for 1901 in the RDD: scala> val weatherFileRDD = sc.wholeTextFiles(“hdfs:// localhost:9000/user/hduser/weather/1901”)
- Cache weather in the RDD so that it is not recomputed every time it’s accessed: scala> val weatherRDD = weatherFileRDD.cache
In Spark, there are various StorageLevels at which the RDD can be persisted. rdd.cache is a shorthand for the rdd. persist(MEMORY_ONLY) StorageLevel.
6. Count the number of elements:
scala> weatherRDD.count
- Since the whole contents of a file are loaded as an element, we need to manually interpret the data, so let’s load the first element: scala> val firstElement = weatherRDD.first
- Read the value of the first RDD: scala> val firstValue = firstElement._2 The firstElement contains tuples in the form (string, string). Tuples can be accessed in two ways:
- Using a positional function starting with _1.
- Using the productElement method, for example, tuple. productElement(0). Indexes here start with 0 like most other methods.
- Split firstValue by lines: scala> val firstVals = firstValue.split(“\\n”)
10. Count the number of elements in firstVals: scala> firstVals.size
11. The schema of weather data is very rich with the position of the text working as a delimiter. You can get more information about schemas at the national weather service website. Let’s get wind speed, which is from section 66-69 (in meter/sec):
scala> val windSpeed = firstVals.map(line => line.substring(65,69)
Loading data from HDFS using a custom InputFormat
Sometimes you need to load data in a specific format and TextInputFormat is not a good fit for that. Spark provides two methods for this purpose:
f sparkContext.hadoopFile: This supports the old MapReduce API
f sparkContext.newAPIHadoopFile: This supports the new MapReduce API
These two methods provide support for all of Hadoop’s built-in InputFormats interfaces as well as any custom InputFormat.
We are going to load text data in key-value format and load it in Spark using KeyValueTextInputFormat:
- Create the currency directory by using the following command: $ mkdir currency
- Change the current directory to currency: $ cd currency
- Create the na.txt text file and enter currency values in key-value format delimited by tab (key: country, value: currency):
$ vi na.txt
United States of America US Dollar Canada Canadian Dollar Mexico Peso You can create more files for each continent. - Upload the currency folder to HDFS: $ hdfs dfs -put currency /user/hduser/currency
- Start the Spark shell: $ spark-shell
- Import statements: scala> import org.apache.hadoop.io.Text scala> import org.apache.hadoop.mapreduce.lib.input. KeyValueTextInputFormat
- Load the currency directory as the RDD: val currencyFile = sc.newAPIHadoopFile(“hdfs://localhost:9000/ user/hduser/currency”,classOf[KeyValueTextInputFormat],classOf[Tex t],classOf[Text])
- Convert it from tuple of (Text,Text) to tuple of (String,String): val currencyRDD = currencyFile.map( t => (t._1.toString,t._2. toString))
- Count the number of elements in the RDD: scala> currencyRDD.count
10. Print the values:
scala> currencyRDD.collect.foreach(println)
Amazon Simple Storage Service (S3) provides developers and IT teams with a secure, durable, and scalable storage platform. The biggest advantage of Amazon S3 is that there is no up-front IT investment and companies can build capacity (just by clicking a button a button) as they need.
Though Amazon S3 can be used with any compute platform, it integrates really well with Amazon’s cloud services such as Amazon Elastic Compute Cloud (EC2) and Amazon Elastic Block Storage (EBS). For this reason, companies who use Amazon Web Services (AWS) are likely to have significant data is already stored on Amazon S3.
This makes a good case for loading data in Spark from Amazon S3 and that is exactly what this recipe is about.
- Enter the bucket name—for example, com.infoobjects.wordcount. Please make sure you enter a unique bucket name (no two S3 buckets can have the same name globally).
- Select Region, click on Create, and then on the bucket name you created and you will see the following screen
- Click on Create Folder and enter words as the folder name.
- Create the sh.txt text file on the local filesystem: $ echo “to be or not to be” > sh.txt
- Navigate to Words | Upload | Add Files and choose sh.txt from the dialog box, as shown in the following screenshot: