Skip to content

aviyehuda.com

Menu
  • Open Source
  • Android
  • Java
  • Others
  • Contact Me
  • About Me
Menu

The right way to use Spark and JDBC

Posted on 17/12/2018

A while ago I had to read data from a MySQL table, do a bit of manipulations on that data and store the results on the disk.
The obvious choice was to use Spark, I was already using it for other stuff and it seemed super easy to implement.

This is more or less what I had to do (I removed the part which does the manipulation for the sake of simplicity):

spark.read.format("jdbc"). 
   option("url", "jdbc:mysql://dbhost/sbschhema"). 
   option("dbtable", "mytable"). 
   option("user", "myuser"). 
   option("password", "mypassword").
 load().write.parquet("/data/out")

Looks good, only it didn’t quite work. Either it was super slow or it totally crashed depends on the size of the table.
Tuning Spark and the cluster properties helped a bit, but it didn’t solve the problems.

Since I was using AWS EMR, it made sense to give Sqoop a try since it is a part of the applications supported on EMR.

sqoop import --verbose 
--connect jdbc:mysql://dbhost/sbschhema 
--username myuser --table opportunity 
--password  mypassword --m 20 --as-parquetfile --target-dir /data/out

Sqoop performed so much better almost instantly, all you needed to do is to set the number of mappers according to the size of the data and it was working perfectly.

Since both Spark and Sqoop are a based on Hadoop map-reduce framework, it was clear that Spark can work at least as good as Sqoop, I only needed to find out how to do it. I decided to look closer at what Sqoop does to see if I can imitate that with Spark.

By turning on the verbose flag of Sqoop, you can get a lot more details.
What I found was that Sqoop is splitting the input to the different mappers which makes sense, this is map-reduce after all, Spark does the same thing.
But before doing that, Sqoop does something smart that Spark doesn’t do.

It first fetches the primary key (unless you give him another key to split the data by), it then checks it’s minimum and maximum values. Then it lets each of its mappers to query the data but with different boundaries for the key, so that the rows are split evenly between the mappers.

If for example the key maximum value is 100, and there are 5 mappers, than the query of the 1st mapper will look like this::

SELECT * FROM mytable WHERE mykey >= 1 AND mykey >= 20;

and the query for the second mapper will be like this:

SELECT * FROM mytable WHERE mykey >= 21 AND mykey >= 40;

and so on..

This totally made sense. Spark was not working properly because it didn’t know how to split the data between the mappers.

So it was time to implement the same logic with Spark.
This means I had to do these actions on my code to make Spark work properly.
1. Fetch the primary key of the table
2. Find the key minimum and maximum values
3. Execute spark with those values

This is the code I ended up with:

def main(args: Array[String]){

// parsing input parameters ...

val primaryKey = executeQuery(url, user, password, s"SHOW KEYS FROM ${config("schema")}.${config("table")} WHERE Key_name = 'PRIMARY'").getString(5)
val result = executeQuery(url, user, password, s"select min(${primaryKey}), max(${primaryKey}) from ${config("schema")}.${config("table")}")
val min = result.getString(1).toInt
val max = result.getString(2).toInt
val numPartitions = (max - min) / 5000 + 1

val spark = SparkSession.builder().appName("Spark reading jdbc").getOrCreate()

var df = spark.read.format("jdbc").
option("url", s"${url}${config("schema")}").
option("driver", "com.mysql.jdbc.Driver").
option("lowerBound", min).
option("upperBound", max).
option("numPartitions", numPartitions).
option("partitionColumn", primaryKey).
option("dbtable", config("table")).
option("user", user).
option("password", password).load()

// some data manipulations here ...

df.repartition(10).write.
   mode(SaveMode.Overwrite).parquet(outputPath)

}

And it worked perfectly.

Remarks:
1. The numPartitions I set for Spark is just a value I found to give good results according to the number of rows. This can be changed, since the size of the data is also effected by the column size and data types of course.
2. The repartition action at the end is to avoid having small files.

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *


About Me

REFCARD – Code Gems for Android Developers

Categories

  • Android
  • AWS
  • AWS EMR
  • bluetooth
  • Chrome extension
  • ClientSide
  • Clover
  • Coding Coventions
  • Data Lake
  • General
  • GreaseMonkey
  • Hacks
  • hibernate
  • hibernate validator
  • HTML5
  • HtmlUnit
  • Image Manipulation
  • Java
  • Java Technologies
  • JavaScript
  • Java_Mail
  • JEE/Network
  • Job searching
  • Open Source
  • Pivot
  • projects
  • Pure Java
  • software
  • Spark
  • Trivia
  • Web development

Archives

  • March 2022 (1)
  • January 2022 (1)
  • January 2021 (1)
  • December 2018 (1)
  • August 2018 (1)
  • October 2013 (1)
  • March 2013 (1)
  • January 2013 (2)
  • July 2012 (1)
  • April 2012 (1)
  • March 2012 (1)
  • December 2011 (1)
  • July 2011 (1)
  • June 2011 (1)
  • May 2011 (2)
  • January 2011 (1)
  • December 2010 (1)
  • November 2010 (3)
  • October 2010 (4)
  • July 2010 (1)
  • April 2010 (2)
  • March 2010 (1)
  • February 2010 (2)
  • January 2010 (5)
  • December 2009 (10)
  • September 2009 (1)
 RSS Feed
1d96f52e7159fe09c7a3dd2a9816d166-332
©2023 aviyehuda.com | Design: Newspaperly WordPress Theme