Getting started with Spark's Word Count program by Scala
Hadoop MapReduce’s word count becomes very simple with the Spark shell. In this recipe, we are going to create a simple 1-line text file, upload it to the Hadoop distributed file system (HDFS), and use Spark to count occurrences of words.
Let’s see how:
1. Create the words directory by using the following command:
$ mkdir words
2. Get into the words directory:
$ cd words
3. Create a sh.txt text file and enter “to be or not to be” in it:
$ echo “to be or not to be” > sh.txt 4.
Start the Spark shell:
$ spark-shell
5. Load the words directory as RDD:
Scala> val words = sc.textFile(“hdfs://localhost:9000/user/hduser/ words”)
6. Count the number of lines ( result: 1):
Scala> words.count
7. Divide the line (or lines) into multiple words:
Scala> val wordsFlatMap = words.flatMap(_.split(“\\W+”))
8. Convert word to (word,1)—that is, output 1 as the value for each occurrence of word as a key:
Scala> val wordsMap = wordsFlatMap.map( w => (w,1))
9. Use the reduceByKey method to add the number of occurrences for each word as a key (the function works on two consecutive values at a time represented by a and b):
Scala> val wordCount = wordsMap.reduceByKey( (a,b) => (a+b))
10. Sort the results:
Scala> val wordCountSorted = wordCount.sortByKey(true)
11. Print the RDD:
Scala> wordCountSorted.collect.foreach(println)
12. Doing all of the preceding operations in one step is as follows:
Scala> sc.textFile(“hdfs://localhost:9000/user/hduser/words”). flatMap(_.split(“\\W+”)).map( w => (w,1)). reduceByKey( (a,b) => (a+b)).sortByKey(true).collect.foreach(println)
This gives us the following output: (or,1) (to,2) (not,1) (be,2) Now you understand the basics, load HDFS with a large amount of text—for example, stories—and see the magic. If you have the files in a compressed format, you can load them as is in HDFS. Both Hadoop and Spark have codecs for unzipping, which they use based on file extensions. When wordsFlatMap was converted to wordsMap RDD, there was an implicit conversion. This converts RDD into PairRDD. This is an implicit conversion, which does not require anything to be done. If you are doing it in Scala code, please add the following import statement: import org.apache.spark.SparkContext._