How to load data from Relations Database?

A lot of important data lies in relational databases that Spark needs to query. JdbcRDD is a Spark feature that allows relational tables to be loaded as RDDs. This recipe will explain how to use JdbcRDD.

Spark SQL to be another option includes a data source for JDBC. This should be preferred over the current recipe as results are returned as DataFrames (to be introduced in the next chapter), which can be easily processed by Spark SQL and also joined with other data sources.

Please make sure that the JDBC driver JAR is visible on the client node and all slaves nodes on which executor will run.

Perform the following steps to load data from relational databases:
1. Create a table named person in MySQL using the following DDL:

       CREATE TABLE 'person' (         'person_id' int(11) NOT NULL AUTO_INCREMENT,         'first_name' varchar(30) DEFAULT NULL,         'last_name' varchar(30) DEFAULT NULL,         'gender' char(1) DEFAULT NULL,         PRIMARY KEY ('person_id');

)

2. Insert some data:

   Insert into person values('Barack','Obama','M');   Insert into person values('Bill','Clinton','M');   Insert into person values('Hillary','Clinton','F');
  1. Download mysql-connector-java-x.x.xx-bin.jar from http://dev. mysql.com/downloads/connector/j/.
  2. Make the MySQL driver available to the Spark shell and launch it:
    $ spark-shell --jars /path-to-mysql-jar/mysql-connector-java-   5.1.29-bin.jar
    1. Create variables for the username, password, and JDBC URL:
         scala> val url="jdbc:mysql://localhost:3306/hadoopdb"   scala> val username = "hduser"   scala> val password = "******"
    2. Import JdbcRDD:
         scala> import org.apache.spark.rdd.JdbcRDD
    3. Import JDBC-related classes:
         scala> import java.sql.{Connection, DriverManager, ResultSet}
    4. Create an instance of the JDBC driver:
         scala> Class.forName("com.mysql.jdbc.Driver").newInstance
    5. Load JdbcRDD:
         scala> val myRDD = new JdbcRDD( sc, () =>
         DriverManager.getConnection(url,username,password) ,
         "select first_name,last_name,gender from person limit ?, ?",
         1, 5, 2, r => r.getString("last_name") + ", " +   r.getString("first_name"))

    10. Now query the results:

       scala> myRDD.count   scala> myRDD.foreach(println)

    11. Save the RDD to HDFS:

       scala> myRDD.saveAsTextFile("hdfs://localhost:9000/user/hduser/   person")

    JdbcRDD is an RDD that executes a SQL query on a JDBC connection and retrieves the results. The following is a JdbcRDD constructor:

           JdbcRDD( SparkContext, getConnection: () => Connection,       sql: String, lowerBound: Long, upperBound: Long,       numPartitions: Int,  mapRow: (ResultSet) => T =
            JdbcRDD.resultSetToObjectArray)

    The two ?’s are bind variables for a prepared statement inside JdbcRDD. The first ? is for the offset (lower bound), that is, which row should we start computing with, the second ? is for the limit (upper bound), that is, how many rows should we read.

    JdbcRDD is a great way to load data in Spark directly from relational databases on an ad-hoc basis. If you would like to load data in bulk from RDBMS, there are other approaches that would work better, for example, Apache Sqoop is a powerful tool that imports and exports data from relational databases to HDFS.

Popular posts from this blog

Window function in PySpark with Joins example using 2 Dataframes (inner join)

Complex SQL: fetch the users who logged in consecutively 3 or more times (lead perfect example)

Credit Card Data Analysis using PySpark (how to use auto broadcast join after disabling it)