Apache Spark tricky Questions?

Explain the significance of  “Stage Skipped” in Apache Spark web UI?

To simply put – it means that these stages are already evaluated before, and the result is available without further re-execution.

It signifies that data was fetched from cache and there was no requirement to re-execute the given stage. It is consistent with the DAG which shows that the subsequent next stage would require shuffling (reduceByKey). In Spark whenever shuffling necessity is involved, Spark will automatically cache the generated data.

How to pass an environment variable to a Spark job?

While submitting the Spark job using spark-submit use the below configs –

--driver-java-options "-Dconfig.resource=app"--files <folder_where_the_app_is_kept.conf>    ---> CUSTOM CONFIG FILE--conf 'spark.executor.extraJavaOptions=-Dconfig.resource=app'--conf 'spark.driver.extraJavaOptions=-Dconfig.resource=app

The “–conf” used in the command will overwrite any previous one – verify this at sparkUI after job started under Environment tab.

 

How to write\save a Spark DataFrame to Hive?

  • Use a HiveContext
import org.apache.spark.sql.hive.HiveContext;HiveContext sqlContext = new org.apache.spark.sql.hive.HiveContext(sc.sc());
  • Save the Spark dataframe to store as hive table
df.write.mode("<append\overwrite\ignore\ErrorIfExists>").saveAsTable("<Hive_schema_Name>.<Hive_table_Name>");

 

How does a HashPartitioner work?

HashPartitioner takes a single argument which defines number of partitions to be made.
Next the values are assigned to different partitions using hash of keys.  You can have different varieties of hash functions. It depends  on the coding language in which the spark application is developed . Such hash algos are – hashCode(Scala RDD), MurmurHash 3 (Datasets), PySpark, portable_hash etc.

Note if distribution of keys is not uniform you can end up in situations when part of your cluster is idle

The HashPartitioner.getPartition method takes a key as its argument and returns the index of the partition which the key belongs to. The partitioner has to know what the valid indices are, so it returns numbers in the right range. The number of partitions is specified through the numPartitions constructor argument.

How to choose – What Type of Cluster to choose in Spark?

Standalone cluster if the requirement and usage is Simple enough. This type of the cluster mode is the easiest to set up and it provides more or less same features as the other cluster managers. It is good enough if you are only running Spark.

If the requirement is to run Spark with other applications or superior resource management is sought for managing job queues , use either YARN or Mesos. Both YARN & Mesos are external resource managers . Many commercial setup like Cloudera come with pre-installed YARN along with the distro.

Ideally it is better to run Spark on the same nodes as HDFS . This allows for faster access to data storage. You can install Mesos or the standalone cluster manager on the same nodes manually, or most Hadoop distributions already install YARN and HDFS together.

 

What is the Difference between spark.sql.shuffle.partitions and spark.default.parallelism?

spark.sql.shuffle.partitions configures the number of partitions that are used when shuffling data for joins or aggregations.

spark.default.parallelism is the default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set explicitly by the user.

Note if the task you are performing is not a join or aggregation and you are using dataframes then these setting does not work. You could, however, set the number of partitions yourself by calling df.repartition(numOfPartitions) (don’t forget to assign it to a new val) in your code.

What does the details & numbers on the progress bar signify in Spark-shell?

[Stage7:===========> (12123 + 5) / 50000]
  • This is a Console Progress Bar.
  • [Stage 7: means the Progress stage at which the job is in currently and (12123 + 5) / 50000] is (numCompletedTasks + numActiveTasks) / totalNumOfTasksInThisStage].
  • The progress bar shows numCompletedTasks / totalNumOfTasksInThisStage.

Difference between Workers & Executors in Spark cluster?

  • Executors are worker nodes’ processes in charge of running individual tasks in a given Spark job. Theses are initiated at the beginning of any Spark application and usually run for the entire lifetime of the Spark job . Once they complete their assigned task , they send the results back to the driver. They also provide in-memory storage for RDDs that are cached by user programs through Block Manager.

 

  • The Spark driver is the process where the main method runs. First it converts the user program into tasks and after that it schedules the tasks on the executors.

 

  • Workers hold many executors, for many applications. One application has executors on many workers. In general, we call worker instance as a slave as it’s a process to execute spark tasks/jobs. Suggested mapping for a node(a physical or virtual machine) and a worker is,

Generally speaking 1 Node = 1 Worker process

But A worker node can also be holding multiple executors (processes) if it has sufficient CPU, Memory and Storage.

If you use Mesos or YARN as your cluster manager, you can run multiple executors on the same machine with just one worker, which reduces the need to run multiple workers per machine. However, in case of standalone cluster manager, currently it still only allows one executor per worker process on each physical machine. So if you have a Very Large machine and would like to run multiple exectuors on it, you have to start more than 1 worker process.  For this you can use , the config Spark_Worker_Instances in the spark-env.sh is for. The default value is 1. However If you do use this setting, make sure you set Spark_Worker_Cores explicitly to limit the cores per worker, or else each worker will try to use all the cores.

Spark installation required on all Slave nodes of the YARN cluster if running on YARN mode?

No, it is not mandatory to install Spark on all slave nodes if you submit a job through YARN mode .

This is because Spark runs on top of YARN and YARN acts as the resource manager. Basically spark uses YARN engine to get all the required resources. we have to install Spark only on one node.

 

What is the Difference between HDFS  and NAS Storage(Network Attached Storage) ?

  • Network-attached storage (NAS) is a file-level storage architecture that makes stored data more accessible to networked devices. providing data access to a heterogeneous group of clients. . NAS is 1 of the 3 main storage architectures—along with storage area networks (SAN) and direct-attached storage (DAS). NAS gives networks a single access point for storage with built-in security, management, and fault tolerant capabilities. NAS can either be a hardware or software which provides services for storing and accessing files.
    • On the other hand , The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware.
  • In HDFS Data Blocks are distributed across all the machines in a cluster.
    • In NAS data is stored on a dedicated hardware. whereas a NAS is a high-end storage devices which includes high cost.

 

  • HDFS is designed to work with Distributed Computation like MapReduce , where computation is moved to the data.
    • NAS is not suitable for such Distributed computation.

 

  • HDFS uses commodity hardware which is cost-effective
    • NAS is not necessarily cost effective.

How to Handle Bad or Corrupt records in Apache Spark ?

In this post , we will see How to Handle Bad or Corrupt records in Apache Spark . When reading data from any file source, Apache Spark might face issues if the file contains any bad or corrupted records. Or in case Spark is unable to parse such records.

Lets see all the options we have to handle bad or corrupted records or data.

Option 1- Using badRecordsPath :

To handle such bad or corrupted records/files , we can use an Option called “badRecordsPath” while sourcing the data.

In this option, Spark processes only the correct records and the corrupted or bad records are excluded from the processing logic as explained below.

It has two main features –

  • The path to store exception files for recording the information about bad records (CSV and JSON sources) and
  • Bad files for all the file-based built-in sources (for example, Parquet).

In case of erros like network issue , IO exception etc. , the errors are ignored . But these are recorded under the badRecordsPath, and Spark will continue to run the tasks.

example:

//Consider an input csv file with below dataCountry, RankFrance,1Canada,2Netherlands,Netherlandsval df = spark.read              .option("badRecordsPath", "/tmp/badRecordsPath")              .schema("Country String, Rank Integer")              .csv("/tmp/inputFile.csv")df.show()

When we run the above command , there are two things we should note – The outFile and the data in the outFile (the outFile is a JSON file).

Points to note –

  • We have two correct records – “France ,1”, “Canada ,2” . The df.show() will show only these records
  • The other record which is a bad record or corrupt record (“Netherlands,Netherlands”) as per the schema, will be re-directed to the Exception file – outFile.json.
    • The exception file is located in /tmp/badRecordsPath as defined by “badrecordsPath” variable.
    • The exception file contains the bad record, the path of the file containing the record, and the exception/reason message.
    • We can use a JSON reader to process the exception file.

 

Option 2 – Using Permissive Mode:

In this option , Spark will load & process both the correct record as well as the corrupted\bad records i.e. Spark is “Permissive” even about the non-correct records.

But the results , corresponding to the, “Permitted” bad or corrupted records will not be accurate and Spark will process these in a non-traditional way (since Spark is not able to Parse these records but still needs to process these). Hence you might see inaccurate results like Null etc. for such records.

Let’s see an example –

//Consider an input csv file with below dataCountry, RankFrance,1Canada,2Netherlands,Netherlandsval df = spark.read         .option("mode", "PERMISSIVE")         .schema("Country String, Rank Integer")         .csv("/tmp/inputFile.csv")df.show()

Spark will not correctly process the second record since it contains corrupted data “baddata” instead of an Integer .

But Spark will still process the data.

 

Option 3 – Using Dropmalformed Mode:

Spark completely ignores the bad or corrupted record when you use “Dropmalformed” mode. In this case , whenever Spark encounters non-parsable record , it simply excludes such records and continues processing from the next record.

Let’s see an example –

//Consider an input csv file with below dataCountry, RankFrance,1Canada,2Netherlands,Netherlandsval df = spark.read         .option("mode", "DROPMALFORMED")         .schema("Country String, Rank Integer")         .csv("/tmp/inputFile.csv")df.show()

Option 4 – Using Failfast Mode:

If you expect the all data to be Mandatory and Correct and it is not Allowed to skip or re-direct any bad or corrupt records or in other words , the Spark job has to throw Exception even in case of a Single corrupt record , then we can use Failfast mode.

In this mode, Spark throws and exception and halts the data loading process when it finds any bad or corrupted records.

Let’s see an example –

//Consider an input csv file with below dataCountry, RankFrance,1Canada,2Netherlands,Netherlandsval df = spark.read         .option("mode", "FAILFAST")         .schema("Country String, Rank Integer")         .csv("/tmp/inputFile.csv")df.show()

Option 5 – Using columnNameOfCorruptRecord :

When using columnNameOfCorruptRecord option , Spark will implicitly create the column before dropping it during parsing.

If you want to retain the column, you have to explicitly add it to the schema.

Let’s see an example –

//Consider an input csv file with below dataCountry, RankFrance,1Canada,2Netherlands,NetherlandsdataSchema = "Country String, Rank Integer, CORRUPTED String"df = spark.read.csv('/tmp/inputFile.csv', header=True, schema=dataSchema, enforceSchema=True, columnNameOfCorruptRecord='CORRUPTED')print(df.show())

When you run the above snippet –

  • You can see the Corrupted records in the “CORRUPTED” column.
  • For the correct records , the corresponding column value will be Null.

Popular posts from this blog

How to change column name in Dataframe and selection of few columns in Dataframe using Pyspark with example

What is Garbage collection in Spark and its impact and resolution

Window function in PySpark with Joins example using 2 Dataframes (inner join)