Show practical example to list files, Insert data, retrieving data and shutting down HDFS.

Assignment  – 3

Question 3: Show practical example to list files, Insert data, retrieving data and shutting down HDFS.

Initially, you have to format the configured HDFS file system, open namenode (HDFS server), and execute the following command.

$ hadoop namenode -format

After formatting the HDFS, start the distributed file system. The following command will start the namenode as well as the data nodes as cluster.

$ start-dfs.sh 

Listing Files in HDFS

After loading the information in the server, we can find the list of files in a directory, status of a file, using ‘ls’. Given below is the syntax of ls that you can pass to a directory or a filename as an argument.

$ $HADOOP_HOME/bin/hadoop fs -ls <args>

Inserting Data into HDFS

Assume we have data in the file called file.txt in the local system which is ought to be saved in the hdfs file system. Follow the steps given below to insert the required file in the Hadoop file system.

Step 1

You have to create an input directory.

$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/input 

Step 2

Transfer and store a data file from local systems to the Hadoop file system using the put command.

$ $HADOOP_HOME/bin/hadoop fs -put /home/file.txt /user/input 

Step 3

You can verify the file using ls command.

$ $HADOOP_HOME/bin/hadoop fs -ls /user/input 

Retrieving Data from HDFS

Assume we have a file in HDFS called outfile. Given below is a simple demonstration for retrieving the required file from the Hadoop file system.

Step 1

Initially, view the data from HDFS using cat command.

$ $HADOOP_HOME/bin/hadoop fs -cat /user/output/outfile 

Step 2

Get the file from HDFS to the local file system using get command.

$ $HADOOP_HOME/bin/hadoop fs -get /user/output/ /home/hadoop_tp/ 

Shutting Down the HDFS

You can shut down the HDFS by using the following command.

$ stop-dfs.sh

 

Q4. Building on the simple WordCount example done in class and Hadoop tutorial, your task is to perform simple processing on provided COVID-19 dataset. 

The task is to count the total number of reported cases for every country/location till April 8th, 2020 (NOTE: There data does contain case rows for Dec 2019, you will have to filter that data)

 

COVID – Analysis from the csv file using MapReduce Programming. 

 

File: MapperClass.java

import java.io.IOException;

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.*;

 

public class MapperClass extends MapReduceBase implements Mapper <LongWritable, Text, Text, IntWritable> {

 

    // initialize the field variable

    private final static IntWritable one = new IntWritable(1);

    private final static int LOCATION = 1;

    private final static int NEW_CASES = 2;

    private final static String CSV_SEPARATOR = “,”;

 

    

    public void map(LongWritable key, Text value, OutputCollector <Text, IntWritable> output, Reporter reporter) throws IOException {

 

        // initiate the variable

        String valueString = value.toString();

 

        // split the data with CSV_SEPARATOR

        String[] columnData = valueString.split(CSV_SEPARATOR);

    

        // collect the data with defined column

        output.collect(new Text(columnData[LOCATION]), new IntWritable(Integer.parseInt(columnData[NEW_CASES])));

    }

}

 

File: ReducerClass.java

import java.io.IOException;

import java.util.*;

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.*;

 

public class ReducerClass extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

 

    

    public void reduce(Text t_key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output, Reporter reporter) throws IOException {

 

        // determine key object and counter variable

        Text key = t_key;

        int counter = 0;

 

        // as long that the values inside the data being mapped,

        // will counting how many data with the same key

        while (values.hasNext()) {

 

            // replace type of value with the actual type of our value

            IntWritable value = (IntWritable) values.next();

            counter += value.get();

        }

        output.collect(key, new IntWritable(counter));

    }

}

 

File: MainClass.java

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.mapred.*;

 

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

 

public class MainClass {

 

    

    public static void main(String[] args) {

 

        // create new JobClient

        JobClient my_client = new JobClient();

 

        // Create a configuration object for the job

        JobConf job_conf = new JobConf(MainClass.class);

 

        // Set a name of the Job

        job_conf.setJobName(“MapReduceCSV”);

 

        // Specify data type of output key and value

        job_conf.setOutputKeyClass(Text.class);

        job_conf.setOutputValueClass(IntWritable.class);

 

        // Specify names of Mapper and Reducer Class

        job_conf.setMapperClass(MapperClass.class);

        job_conf.setReducerClass(ReducerClass.class);

 

        // Specify formats of the data type of Input and output

        job_conf.setInputFormat(TextInputFormat.class);

        job_conf.setOutputFormat(TextOutputFormat.class);

 

        // Set input and output directories using command line arguments, 

        // arg[0] = name of input directory on HDFS, and

        // arg[1] =  name of output directory to be created to store the output file.

 

        // called the input path for file and defined the output path

        FileInputFormat.setInputPaths(job_conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));

 

        my_client.setConf(job_conf);

        try {

            // Run the job 

            JobClient.runJob(job_conf);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

}

 

How to execute:

Step 1. Creating a directory named as classes/

$ mkdir classes

 

Step 2. Compiling the source file using the following command. 

$ javac -cp hadoop-common-2.2.0.jar:hadoop-mapreduce-client-core-2.7.1.jar:classes:. -d classes/ *.java

 

Step 3: Creating Jar file for the above created classes which are stored in classes folder.

$  jar -cvf CountMe.jar -C classes/ .

# do not forget to put <space> dot at the end inthe above command. 

 

Output: 

added manifest

adding: MainClass.class(in = 1609) (out= 806)(deflated 49%)

adding: MapperClass.class(in = 1911) (out= 754)(deflated 60%)

adding: ReducerClass.class(in = 1561) (out= 628)(deflated 59%)

 

Step 4: upload the csv file to hadoop distributed file system

$ hadoop fs -put covid.csv .

# do not forget to remove the header line from the covid file provided before uploading to HDFS

# do not forget to put <space> dot in the above command to upload it to the home folder. 

 

Step 5: Running the Hadoop file using hadoop jar command. 

$ hadoop jar CountMe.jar MainClass covid.csv output/

 

Step 6: Checking the output folder has been populated or not and also printing the output on terminal

 

$ hadoop fs -ls output/

$ hadoop fs -cat output/part-00000

 

Afghanistan     367

Albania 383

Algeria 1468

Andorra 545

Angola  17

Anguilla        3

Antigua and Barbuda     15

Argentina       1715

Armenia 853

Aruba   74

Australia       5956

Austria 12640

Azerbaijan      717

Bahamas 36

Bahrain 811

Bangladesh      164

Barbados        63

 

Output is trimmed here. 

 

Popular posts from this blog

What is Garbage collection in Spark and its impact and resolution

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)