Word count program by MapReduce job

This is simple Map Reduce Job to process any text file and give us word with occurrences as an output

Program:

package com.dpq.retail;

mport java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCountDriver {

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

Configuration c=new Configuration();

String[] files=new GenericOptionsParser(c,args).getRemainingArgs();

Path input=new Path(files[0]);

Path output=new Path(files[1]);

Job job=new Job(c,“wordcount”);

job.setJarByClass(WordCountDriver.class);

job.setMapperClass(MapForWordCount.class);

job.setCombinerClass(ReduceForWordCount.class);

//job.setReducerClass(ReduceForWordCount.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, input);

FileOutputFormat.setOutputPath(job, output);

System.exit(job.waitForCompletion(true)?0:1);

}

public static class MapForWordCount extends Mapper<LongWritable, Text, Text, IntWritable> {

public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException {

String line = value.toString();

String[] words = line.split(“,”);

for (String word : words) {

Text outputKey = new Text(word.toUpperCase().trim());

IntWritable outputValue = new IntWritable(1);

con.write(outputKey, outputValue);

}

}

}

public static class ReduceForWordCount extends Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text word, Iterable<IntWritable> values, Context con)

throws IOException, InterruptedException {

int sum = 0;

for (IntWritable value : values) {

sum += value.get();

}

con.write(word, new IntWritable(sum));

}

}

}

While triggering Mapreduce job ‘WordCount’ will be supplied as driver class and below command will be triggered

we will push input data to hdfs on location /hdp/retail/inputData/

hdfs dfs -ls /hdp/retail/inputData/

/hdp/retail/process/news.txt

hadoop jar WordCountMRJob-0.0.1-SNAPSHOT.jar com.dpq.retail.WordCountDriver /hdp/retail/process/news.txt /temp/newsOutput1

 

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)